diff options
Diffstat (limited to 'yt_dlp/utils.py')
-rw-r--r-- | yt_dlp/utils.py | 156 |
1 files changed, 112 insertions, 44 deletions
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py index e70c5f909..36597d41a 100644 --- a/yt_dlp/utils.py +++ b/yt_dlp/utils.py @@ -2006,6 +2006,23 @@ class HTMLAttributeParser(compat_HTMLParser): self.attrs = dict(attrs) +class HTMLListAttrsParser(compat_HTMLParser): + """HTML parser to gather the attributes for the elements of a list""" + + def __init__(self): + compat_HTMLParser.__init__(self) + self.items = [] + self._level = 0 + + def handle_starttag(self, tag, attrs): + if tag == 'li' and self._level == 0: + self.items.append(dict(attrs)) + self._level += 1 + + def handle_endtag(self, tag): + self._level -= 1 + + def extract_attributes(html_element): """Given a string for an HTML element such as <el @@ -2032,6 +2049,15 @@ def extract_attributes(html_element): return parser.attrs +def parse_list(webpage): + """Given a string for an series of HTML <li> elements, + return a dictionary of their attributes""" + parser = HTMLListAttrsParser() + parser.feed(webpage) + parser.close() + return parser.items + + def clean_html(html): """Clean an HTML snippet into a readable string""" @@ -2433,7 +2459,14 @@ def bug_reports_message(before=';'): class YoutubeDLError(Exception): """Base exception for YoutubeDL errors.""" - pass + msg = None + + def __init__(self, msg=None): + if msg is not None: + self.msg = msg + elif self.msg is None: + self.msg = type(self).__name__ + super().__init__(self.msg) network_exceptions = [compat_urllib_error.URLError, compat_http_client.HTTPException, socket.error] @@ -2518,7 +2551,7 @@ class EntryNotInPlaylist(YoutubeDLError): This exception will be thrown by YoutubeDL when a requested entry is not found in the playlist info_dict """ - pass + msg = 'Entry not found in info' class SameFileError(YoutubeDLError): @@ -2527,7 +2560,12 @@ class SameFileError(YoutubeDLError): This exception will be thrown by FileDownloader objects if they detect multiple files would have to be downloaded to the same file on disk. """ - pass + msg = 'Fixed output name but more than one file to download' + + def __init__(self, filename=None): + if filename is not None: + self.msg += f': {filename}' + super().__init__(self.msg) class PostProcessingError(YoutubeDLError): @@ -2546,11 +2584,6 @@ class DownloadCancelled(YoutubeDLError): """ Exception raised when the download queue should be interrupted """ msg = 'The download was cancelled' - def __init__(self, msg=None): - if msg is not None: - self.msg = msg - YoutubeDLError.__init__(self, self.msg) - class ExistingVideoReached(DownloadCancelled): """ --break-on-existing triggered """ @@ -2569,7 +2602,7 @@ class MaxDownloadsReached(DownloadCancelled): class ThrottledDownload(YoutubeDLError): """ Download speed below --throttled-rate. """ - pass + msg = 'The download speed is below throttle limit' class UnavailableVideoError(YoutubeDLError): @@ -2578,7 +2611,12 @@ class UnavailableVideoError(YoutubeDLError): This exception will be thrown when a video is requested in a format that is not available for that video. """ - pass + msg = 'Unable to download video' + + def __init__(self, err=None): + if err is not None: + self.msg += f': {err}' + super().__init__(self.msg) class ContentTooShortError(YoutubeDLError): @@ -3871,7 +3909,7 @@ def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1): return default try: return int(v) * invscale // scale - except (ValueError, TypeError): + except (ValueError, TypeError, OverflowError): return default @@ -4007,10 +4045,7 @@ def check_executable(exe, args=[]): return exe -def get_exe_version(exe, args=['--version'], - version_re=None, unrecognized='present'): - """ Returns the version of the specified executable, - or False if the executable is not present """ +def _get_exe_version_output(exe, args): try: # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers # SIGTTOU if yt-dlp is run in the background. @@ -4022,7 +4057,7 @@ def get_exe_version(exe, args=['--version'], return False if isinstance(out, bytes): # Python 2.x out = out.decode('ascii', 'ignore') - return detect_exe_version(out, version_re, unrecognized) + return out def detect_exe_version(output, version_re=None, unrecognized='present'): @@ -4036,6 +4071,14 @@ def detect_exe_version(output, version_re=None, unrecognized='present'): return unrecognized +def get_exe_version(exe, args=['--version'], + version_re=None, unrecognized='present'): + """ Returns the version of the specified executable, + or False if the executable is not present """ + out = _get_exe_version_output(exe, args) + return detect_exe_version(out, version_re, unrecognized) if out else False + + class LazyList(collections.abc.Sequence): ''' Lazy immutable list from an iterable Note that slices of a LazyList are lists and not LazyList''' @@ -4043,10 +4086,10 @@ class LazyList(collections.abc.Sequence): class IndexError(IndexError): pass - def __init__(self, iterable): + def __init__(self, iterable, *, reverse=False, _cache=None): self.__iterable = iter(iterable) - self.__cache = [] - self.__reversed = False + self.__cache = [] if _cache is None else _cache + self.__reversed = reverse def __iter__(self): if self.__reversed: @@ -4112,9 +4155,17 @@ class LazyList(collections.abc.Sequence): self.__exhaust() return len(self.__cache) - def reverse(self): - self.__reversed = not self.__reversed - return self + def __reversed__(self): + return type(self)(self.__iterable, reverse=not self.__reversed, _cache=self.__cache) + + def __copy__(self): + return type(self)(self.__iterable, reverse=self.__reversed, _cache=self.__cache) + + def __deepcopy__(self, memo): + # FIXME: This is actually just a shallow copy + id_ = id(self) + memo[id_] = self.__copy__() + return memo[id_] def __repr__(self): # repr and str should mimic a list. So we exhaust the iterable @@ -4125,6 +4176,10 @@ class LazyList(collections.abc.Sequence): class PagedList: + + class IndexError(IndexError): + pass + def __len__(self): # This is only useful for tests return len(self.getslice()) @@ -4136,7 +4191,9 @@ class PagedList: self._cache = {} def getpage(self, pagenum): - page_results = self._cache.get(pagenum) or list(self._pagefunc(pagenum)) + page_results = self._cache.get(pagenum) + if page_results is None: + page_results = list(self._pagefunc(pagenum)) if self._use_cache: self._cache[pagenum] = page_results return page_results @@ -4152,7 +4209,9 @@ class PagedList: if not isinstance(idx, int) or idx < 0: raise TypeError('indices must be non-negative integers') entries = self.getslice(idx, idx + 1) - return entries[0] if entries else None + if not entries: + raise self.IndexError() + return entries[0] class OnDemandPagedList(PagedList): @@ -4656,19 +4715,18 @@ def parse_codecs(codecs_str): str.strip, codecs_str.strip().strip(',').split(',')))) vcodec, acodec, hdr = None, None, None for full_codec in split_codecs: - codec = full_codec.split('.')[0] - if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora', 'dvh1', 'dvhe'): + parts = full_codec.split('.') + codec = parts[0].replace('0', '') + if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', + 'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'): if not vcodec: - vcodec = full_codec + vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1') else full_codec if codec in ('dvh1', 'dvhe'): hdr = 'DV' - elif codec == 'vp9' and vcodec.startswith('vp9.2'): + elif codec == 'av1' and len(parts) > 3 and parts[3] == '10': + hdr = 'HDR10' + elif full_codec.replace('0', '').startswith('vp9.2'): hdr = 'HDR10' - elif codec == 'av01': - parts = full_codec.split('.') - if len(parts) > 3 and parts[3] == '10': - hdr = 'HDR10' - vcodec = '.'.join(parts[:4]) elif codec in ('mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'): if not acodec: acodec = full_codec @@ -4759,10 +4817,11 @@ def determine_protocol(info_dict): return compat_urllib_parse_urlparse(url).scheme -def render_table(header_row, data, delim=False, extraGap=0, hideEmpty=False): - """ Render a list of rows, each as a list of values """ +def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False): + """ Render a list of rows, each as a list of values. + Text after a \t will be right aligned """ def width(string): - return len(remove_terminal_sequences(string)) + return len(remove_terminal_sequences(string).replace('\t', '')) def get_max_lens(table): return [max(width(str(v)) for v in col) for col in zip(*table)] @@ -4770,21 +4829,24 @@ def render_table(header_row, data, delim=False, extraGap=0, hideEmpty=False): def filter_using_list(row, filterArray): return [col for (take, col) in zip(filterArray, row) if take] - if hideEmpty: + if hide_empty: max_lens = get_max_lens(data) header_row = filter_using_list(header_row, max_lens) data = [filter_using_list(row, max_lens) for row in data] table = [header_row] + data max_lens = get_max_lens(table) - extraGap += 1 + extra_gap += 1 if delim: - table = [header_row] + [[delim * (ml + extraGap) for ml in max_lens]] + data - max_lens[-1] = 0 + table = [header_row, [delim * (ml + extra_gap) for ml in max_lens]] + data + table[1][-1] = table[1][-1][:-extra_gap] # Remove extra_gap from end of delimiter for row in table: for pos, text in enumerate(map(str, row)): - row[pos] = text + (' ' * (max_lens[pos] - width(text) + extraGap)) - ret = '\n'.join(''.join(row) for row in table) + if '\t' in text: + row[pos] = text.replace('\t', ' ' * (max_lens[pos] - width(text))) + ' ' * extra_gap + else: + row[pos] = text + ' ' * (max_lens[pos] - width(text) + extra_gap) + ret = '\n'.join(''.join(row).rstrip() for row in table) return ret @@ -6412,10 +6474,10 @@ def traverse_obj( def _traverse_obj(obj, path, _current_depth=0): nonlocal depth - if obj is None: - return None path = tuple(variadic(path)) for i, key in enumerate(path): + if obj is None: + return None if isinstance(key, (list, tuple)): obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key] key = ... @@ -6540,3 +6602,9 @@ def remove_terminal_sequences(string): def number_of_digits(number): return len('%d' % number) + + +def join_nonempty(*values, delim='-', from_dict=None): + if from_dict is not None: + values = map(from_dict.get, values) + return delim.join(map(str, filter(None, values))) |