diff options
Diffstat (limited to 'yt_dlp/utils.py')
-rw-r--r-- | yt_dlp/utils.py | 103 |
1 files changed, 49 insertions, 54 deletions
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py index 3f70b1f60..91e1a9870 100644 --- a/yt_dlp/utils.py +++ b/yt_dlp/utils.py @@ -1,8 +1,4 @@ #!/usr/bin/env python3 -# coding: utf-8 - -from __future__ import unicode_literals - import asyncio import atexit import base64 @@ -311,7 +307,7 @@ def write_json_file(obj, fn): def find_xpath_attr(node, xpath, key, val=None): """ Find the xpath xpath[@key=val] """ assert re.match(r'^[a-zA-Z_-]+$', key) - expr = xpath + ('[@%s]' % key if val is None else "[@%s='%s']" % (key, val)) + expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']") return node.find(expr) # On python2.6 the xml.etree.ElementTree.Element methods don't support @@ -374,7 +370,7 @@ def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT): if default is not NO_DEFAULT: return default elif fatal: - name = '%s[@%s]' % (xpath, key) if name is None else name + name = f'{xpath}[@{key}]' if name is None else name raise ExtractorError('Could not find XML attribute %s' % name) else: return None @@ -443,15 +439,15 @@ def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value attribute in the passed HTML document """ - value_quote_optional = '' if re.match(r'''[\s"'`=<>]''', value) else '?' + quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?' value = re.escape(value) if escape_value else value - partial_element_re = r'''(?x) + partial_element_re = rf'''(?x) <(?P<tag>[a-zA-Z0-9:._-]+) (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)? - \s%(attribute)s\s*=\s*(?P<_q>['"]%(vqo)s)(?-x:%(value)s)(?P=_q) - ''' % {'attribute': re.escape(attribute), 'value': value, 'vqo': value_quote_optional} + \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q) + ''' for m in re.finditer(partial_element_re, html): content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():]) @@ -644,7 +640,7 @@ def sanitize_open(filename, open_mode): except LockingUnsupportedError: stream = open(filename, open_mode) return (stream, filename) - except (IOError, OSError) as err: + except OSError as err: if attempt or err.errno in (errno.EACCES,): raise old_filename, filename = filename, sanitize_path(filename) @@ -853,7 +849,7 @@ class Popen(subprocess.Popen): _startupinfo = None def __init__(self, *args, **kwargs): - super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo) + super().__init__(*args, **kwargs, startupinfo=self._startupinfo) def communicate_or_kill(self, *args, **kwargs): return process_communicate_or_kill(self, *args, **kwargs) @@ -1013,7 +1009,7 @@ class ExtractorError(YoutubeDLError): self.ie = ie self.exc_info = sys.exc_info() # preserve original exception - super(ExtractorError, self).__init__(''.join(( + super().__init__(''.join(( format_field(ie, template='[%s] '), format_field(video_id, template='%s: '), msg, @@ -1029,7 +1025,7 @@ class ExtractorError(YoutubeDLError): class UnsupportedError(ExtractorError): def __init__(self, url): - super(UnsupportedError, self).__init__( + super().__init__( 'Unsupported URL: %s' % url, expected=True) self.url = url @@ -1048,7 +1044,7 @@ class GeoRestrictedError(ExtractorError): def __init__(self, msg, countries=None, **kwargs): kwargs['expected'] = True - super(GeoRestrictedError, self).__init__(msg, **kwargs) + super().__init__(msg, **kwargs) self.countries = countries @@ -1062,7 +1058,7 @@ class DownloadError(YoutubeDLError): def __init__(self, msg, exc_info=None): """ exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """ - super(DownloadError, self).__init__(msg) + super().__init__(msg) self.exc_info = exc_info @@ -1156,9 +1152,7 @@ class ContentTooShortError(YoutubeDLError): """ def __init__(self, downloaded, expected): - super(ContentTooShortError, self).__init__( - 'Downloaded {0} bytes, expected {1} bytes'.format(downloaded, expected) - ) + super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes') # Both in bytes self.downloaded = downloaded self.expected = expected @@ -1166,7 +1160,7 @@ class ContentTooShortError(YoutubeDLError): class XAttrMetadataError(YoutubeDLError): def __init__(self, code=None, msg='Unknown error'): - super(XAttrMetadataError, self).__init__(msg) + super().__init__(msg) self.code = code self.msg = msg @@ -1202,7 +1196,7 @@ def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs): ip_addrs = [addr for addr in addrs if addr[0] == af] if addrs and not ip_addrs: ip_version = 'v4' if af == socket.AF_INET else 'v6' - raise socket.error( + raise OSError( "No remote IP%s addresses available for connect, can't use '%s' as source address" % (ip_version, source_address[0])) for res in ip_addrs: @@ -1216,14 +1210,14 @@ def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs): sock.connect(sa) err = None # Explicitly break reference cycle return sock - except socket.error as _: + except OSError as _: err = _ if sock is not None: sock.close() if err is not None: raise err else: - raise socket.error('getaddrinfo returns an empty list') + raise OSError('getaddrinfo returns an empty list') if hasattr(hc, '_create_connection'): hc._create_connection = _create_connection hc.source_address = (source_address, 0) @@ -1235,7 +1229,7 @@ def handle_youtubedl_headers(headers): filtered_headers = headers if 'Youtubedl-no-compression' in filtered_headers: - filtered_headers = dict((k, v) for k, v in filtered_headers.items() if k.lower() != 'accept-encoding') + filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'} del filtered_headers['Youtubedl-no-compression'] return filtered_headers @@ -1327,14 +1321,14 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler): gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb') try: uncompressed = io.BytesIO(gz.read()) - except IOError as original_ioerror: + except OSError as original_ioerror: # There may be junk add the end of the file # See http://stackoverflow.com/q/4928560/35070 for details for i in range(1, 1024): try: gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb') uncompressed = io.BytesIO(gz.read()) - except IOError: + except OSError: continue break else: @@ -1474,7 +1468,7 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar): if cookie.expires is None: cookie.expires = 0 - with io.open(filename, 'w', encoding='utf-8') as f: + with open(filename, 'w', encoding='utf-8') as f: f.write(self._HEADER) now = time.time() for cookie in self: @@ -1530,7 +1524,7 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar): return line cf = io.StringIO() - with io.open(filename, encoding='utf-8') as f: + with open(filename, encoding='utf-8') as f: for line in f: try: cf.write(prepare_line(line)) @@ -1612,8 +1606,7 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler): CONTENT_HEADERS = ("content-length", "content-type") # NB: don't use dict comprehension for python 2.6 compatibility - newheaders = dict((k, v) for k, v in req.headers.items() - if k.lower() not in CONTENT_HEADERS) + newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS} return compat_urllib_request.Request( newurl, headers=newheaders, origin_req_host=req.origin_req_host, unverifiable=True) @@ -1657,7 +1650,7 @@ def parse_iso8601(date_str, delimiter='T', timezone=None): timezone, date_str = extract_timezone(date_str) try: - date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter) + date_format = f'%Y-%m-%d{delimiter}%H:%M:%S' dt = datetime.datetime.strptime(date_str, date_format) - timezone return calendar.timegm(dt.timetuple()) except ValueError: @@ -1839,7 +1832,7 @@ def hyphenate_date(date_str): return date_str -class DateRange(object): +class DateRange: """Represents a time interval between two dates""" def __init__(self, start=None, end=None): @@ -1867,7 +1860,7 @@ class DateRange(object): return self.start <= date <= self.end def __str__(self): - return '%s - %s' % (self.start.isoformat(), self.end.isoformat()) + return f'{self.start.isoformat()} - {self.end.isoformat()}' def platform_name(): @@ -2012,7 +2005,7 @@ else: raise LockingUnsupportedError() -class locked_file(object): +class locked_file: locked = False def __init__(self, filename, mode, block=True, encoding=None): @@ -2039,7 +2032,7 @@ class locked_file(object): try: _lock_file(self.f, exclusive, self.block) self.locked = True - except IOError: + except OSError: self.f.close() raise if 'w' in self.mode: @@ -2510,14 +2503,14 @@ def parse_duration(s): def prepend_extension(filename, ext, expected_real_ext=None): name, real_ext = os.path.splitext(filename) return ( - '{0}.{1}{2}'.format(name, ext, real_ext) + f'{name}.{ext}{real_ext}' if not expected_real_ext or real_ext[1:] == expected_real_ext - else '{0}.{1}'.format(filename, ext)) + else f'{filename}.{ext}') def replace_extension(filename, ext, expected_real_ext=None): name, real_ext = os.path.splitext(filename) - return '{0}.{1}'.format( + return '{}.{}'.format( name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename, ext) @@ -2700,6 +2693,7 @@ class PagedList: class OnDemandPagedList(PagedList): """Download pages until a page with less than maximum results""" + def _getslice(self, start, end): for pagenum in itertools.count(start // self._pagesize): firstid = pagenum * self._pagesize @@ -2740,6 +2734,7 @@ class OnDemandPagedList(PagedList): class InAdvancePagedList(PagedList): """PagedList with total number of pages known in advance""" + def __init__(self, pagefunc, pagecount, pagesize): PagedList.__init__(self, pagefunc, pagesize, True) self._pagecount = pagecount @@ -2994,10 +2989,10 @@ def strip_jsonp(code): def js_to_json(code, vars={}): # vars is a dict of var, val pairs to substitute COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n' - SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE) + SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*' INTEGER_TABLE = ( - (r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16), - (r'(?s)^(0+[0-7]+){skip}:?$'.format(skip=SKIP_RE), 8), + (fr'(?s)^(0[xX][0-9a-fA-F]+){SKIP_RE}:?$', 16), + (fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8), ) def fix_kv(m): @@ -3518,7 +3513,7 @@ def dfxp2srt(dfxp_data): styles = {} default_style = {} - class TTMLPElementParser(object): + class TTMLPElementParser: _out = '' _unclosed_elements = [] _applied_styles = [] @@ -3703,7 +3698,7 @@ def _configuration_args(main_key, argdict, exe, keys=None, default=[], use_compa return cli_configuration_args(argdict, keys, default, use_compat) -class ISO639Utils(object): +class ISO639Utils: # See http://www.loc.gov/standards/iso639-2/ISO-639-2_utf-8.txt _lang_map = { 'aa': 'aar', @@ -3908,7 +3903,7 @@ class ISO639Utils(object): return short_name -class ISO3166Utils(object): +class ISO3166Utils: # From http://data.okfn.org/data/core/country-list _country_map = { 'AF': 'Afghanistan', @@ -4168,7 +4163,7 @@ class ISO3166Utils(object): return cls._country_map.get(code.upper()) -class GeoUtils(object): +class GeoUtils: # Major IPv4 address blocks per country _country_ip_map = { 'AD': '46.172.224.0/19', @@ -4605,7 +4600,7 @@ def decode_png(png_data): header = png_data[8:] if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR': - raise IOError('Not a valid PNG file.') + raise OSError('Not a valid PNG file.') int_map = {1: '>B', 2: '>H', 4: '>I'} unpack_integer = lambda x: compat_struct_unpack(int_map[len(x)], x)[0] @@ -4642,7 +4637,7 @@ def decode_png(png_data): idat += chunk['data'] if not idat: - raise IOError('Unable to read PNG data.') + raise OSError('Unable to read PNG data.') decompressed_data = bytearray(zlib.decompress(idat)) @@ -4730,7 +4725,7 @@ def write_xattr(path, key, value): try: setxattr(path, key, value) - except EnvironmentError as e: + except OSError as e: raise XAttrMetadataError(e.errno, e.strerror) except ImportError: @@ -4744,7 +4739,7 @@ def write_xattr(path, key, value): try: with open(ads_fn, 'wb') as f: f.write(value) - except EnvironmentError as e: + except OSError as e: raise XAttrMetadataError(e.errno, e.strerror) else: user_has_setfattr = check_executable('setfattr', ['--version']) @@ -4767,7 +4762,7 @@ def write_xattr(path, key, value): try: p = Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) - except EnvironmentError as e: + except OSError as e: raise XAttrMetadataError(e.errno, e.strerror) stdout, stderr = p.communicate_or_kill() stderr = stderr.decode('utf-8', 'replace') @@ -4923,7 +4918,7 @@ def make_dir(path, to_screen=None): if dn and not os.path.exists(dn): os.makedirs(dn) return True - except (OSError, IOError) as err: + except OSError as err: if callable(to_screen) is not None: to_screen('unable to create directory ' + error_to_compat_str(err)) return False @@ -5155,7 +5150,7 @@ def scale_thumbnails_to_max_format_width(formats, thumbnails, url_width_re): """ _keys = ('width', 'height') max_dimensions = max( - [tuple(format.get(k) or 0 for k in _keys) for format in formats], + (tuple(format.get(k) or 0 for k in _keys) for format in formats), default=(0, 0)) if not max_dimensions[0]: return thumbnails @@ -5220,7 +5215,7 @@ class Config: def read_file(filename, default=[]): try: optionf = open(filename) - except IOError: + except OSError: return default # silently skip if file is not present try: # FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56 @@ -5232,7 +5227,7 @@ class Config: @staticmethod def hide_login_info(opts): - PRIVATE_OPTS = set(['-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username']) + PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'} eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$') def _scrub_eq(o): |