aboutsummaryrefslogtreecommitdiffstats
path: root/yt_dlp/utils.py
diff options
context:
space:
mode:
authorpukkandan <pukkandan.ytdlp@gmail.com>2022-04-11 20:40:28 +0530
committerpukkandan <pukkandan.ytdlp@gmail.com>2022-04-12 05:32:51 +0530
commit86e5f3ed2e6e71eb81ea4c9e26288f16119ffd0c (patch)
tree12558e4c8f24c1a8d16ccb63e2455b26c301285a /yt_dlp/utils.py
parentf9934b96145af8ac5dfdcbf684827aeaea9912a7 (diff)
downloadhypervideo-pre-86e5f3ed2e6e71eb81ea4c9e26288f16119ffd0c.tar.lz
hypervideo-pre-86e5f3ed2e6e71eb81ea4c9e26288f16119ffd0c.tar.xz
hypervideo-pre-86e5f3ed2e6e71eb81ea4c9e26288f16119ffd0c.zip
[cleanup] Upgrade syntax
Using https://github.com/asottile/pyupgrade 1. `__future__` imports and `coding: utf-8` were removed 2. Files were rewritten with `pyupgrade --py36-plus --keep-percent-format` 3. f-strings were cherry-picked from `pyupgrade --py36-plus` Extractors are left untouched (except removing header) to avoid unnecessary merge conflicts
Diffstat (limited to 'yt_dlp/utils.py')
-rw-r--r--yt_dlp/utils.py103
1 files changed, 49 insertions, 54 deletions
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py
index 3f70b1f60..91e1a9870 100644
--- a/yt_dlp/utils.py
+++ b/yt_dlp/utils.py
@@ -1,8 +1,4 @@
#!/usr/bin/env python3
-# coding: utf-8
-
-from __future__ import unicode_literals
-
import asyncio
import atexit
import base64
@@ -311,7 +307,7 @@ def write_json_file(obj, fn):
def find_xpath_attr(node, xpath, key, val=None):
""" Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z_-]+$', key)
- expr = xpath + ('[@%s]' % key if val is None else "[@%s='%s']" % (key, val))
+ expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']")
return node.find(expr)
# On python2.6 the xml.etree.ElementTree.Element methods don't support
@@ -374,7 +370,7 @@ def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT):
if default is not NO_DEFAULT:
return default
elif fatal:
- name = '%s[@%s]' % (xpath, key) if name is None else name
+ name = f'{xpath}[@{key}]' if name is None else name
raise ExtractorError('Could not find XML attribute %s' % name)
else:
return None
@@ -443,15 +439,15 @@ def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value
attribute in the passed HTML document
"""
- value_quote_optional = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
+ quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?'
value = re.escape(value) if escape_value else value
- partial_element_re = r'''(?x)
+ partial_element_re = rf'''(?x)
<(?P<tag>[a-zA-Z0-9:._-]+)
(?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
- \s%(attribute)s\s*=\s*(?P<_q>['"]%(vqo)s)(?-x:%(value)s)(?P=_q)
- ''' % {'attribute': re.escape(attribute), 'value': value, 'vqo': value_quote_optional}
+ \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
+ '''
for m in re.finditer(partial_element_re, html):
content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():])
@@ -644,7 +640,7 @@ def sanitize_open(filename, open_mode):
except LockingUnsupportedError:
stream = open(filename, open_mode)
return (stream, filename)
- except (IOError, OSError) as err:
+ except OSError as err:
if attempt or err.errno in (errno.EACCES,):
raise
old_filename, filename = filename, sanitize_path(filename)
@@ -853,7 +849,7 @@ class Popen(subprocess.Popen):
_startupinfo = None
def __init__(self, *args, **kwargs):
- super(Popen, self).__init__(*args, **kwargs, startupinfo=self._startupinfo)
+ super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
return process_communicate_or_kill(self, *args, **kwargs)
@@ -1013,7 +1009,7 @@ class ExtractorError(YoutubeDLError):
self.ie = ie
self.exc_info = sys.exc_info() # preserve original exception
- super(ExtractorError, self).__init__(''.join((
+ super().__init__(''.join((
format_field(ie, template='[%s] '),
format_field(video_id, template='%s: '),
msg,
@@ -1029,7 +1025,7 @@ class ExtractorError(YoutubeDLError):
class UnsupportedError(ExtractorError):
def __init__(self, url):
- super(UnsupportedError, self).__init__(
+ super().__init__(
'Unsupported URL: %s' % url, expected=True)
self.url = url
@@ -1048,7 +1044,7 @@ class GeoRestrictedError(ExtractorError):
def __init__(self, msg, countries=None, **kwargs):
kwargs['expected'] = True
- super(GeoRestrictedError, self).__init__(msg, **kwargs)
+ super().__init__(msg, **kwargs)
self.countries = countries
@@ -1062,7 +1058,7 @@ class DownloadError(YoutubeDLError):
def __init__(self, msg, exc_info=None):
""" exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """
- super(DownloadError, self).__init__(msg)
+ super().__init__(msg)
self.exc_info = exc_info
@@ -1156,9 +1152,7 @@ class ContentTooShortError(YoutubeDLError):
"""
def __init__(self, downloaded, expected):
- super(ContentTooShortError, self).__init__(
- 'Downloaded {0} bytes, expected {1} bytes'.format(downloaded, expected)
- )
+ super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes')
# Both in bytes
self.downloaded = downloaded
self.expected = expected
@@ -1166,7 +1160,7 @@ class ContentTooShortError(YoutubeDLError):
class XAttrMetadataError(YoutubeDLError):
def __init__(self, code=None, msg='Unknown error'):
- super(XAttrMetadataError, self).__init__(msg)
+ super().__init__(msg)
self.code = code
self.msg = msg
@@ -1202,7 +1196,7 @@ def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
ip_addrs = [addr for addr in addrs if addr[0] == af]
if addrs and not ip_addrs:
ip_version = 'v4' if af == socket.AF_INET else 'v6'
- raise socket.error(
+ raise OSError(
"No remote IP%s addresses available for connect, can't use '%s' as source address"
% (ip_version, source_address[0]))
for res in ip_addrs:
@@ -1216,14 +1210,14 @@ def _create_http_connection(ydl_handler, http_class, is_https, *args, **kwargs):
sock.connect(sa)
err = None # Explicitly break reference cycle
return sock
- except socket.error as _:
+ except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
- raise socket.error('getaddrinfo returns an empty list')
+ raise OSError('getaddrinfo returns an empty list')
if hasattr(hc, '_create_connection'):
hc._create_connection = _create_connection
hc.source_address = (source_address, 0)
@@ -1235,7 +1229,7 @@ def handle_youtubedl_headers(headers):
filtered_headers = headers
if 'Youtubedl-no-compression' in filtered_headers:
- filtered_headers = dict((k, v) for k, v in filtered_headers.items() if k.lower() != 'accept-encoding')
+ filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
del filtered_headers['Youtubedl-no-compression']
return filtered_headers
@@ -1327,14 +1321,14 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
try:
uncompressed = io.BytesIO(gz.read())
- except IOError as original_ioerror:
+ except OSError as original_ioerror:
# There may be junk add the end of the file
# See http://stackoverflow.com/q/4928560/35070 for details
for i in range(1, 1024):
try:
gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
uncompressed = io.BytesIO(gz.read())
- except IOError:
+ except OSError:
continue
break
else:
@@ -1474,7 +1468,7 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
if cookie.expires is None:
cookie.expires = 0
- with io.open(filename, 'w', encoding='utf-8') as f:
+ with open(filename, 'w', encoding='utf-8') as f:
f.write(self._HEADER)
now = time.time()
for cookie in self:
@@ -1530,7 +1524,7 @@ class YoutubeDLCookieJar(compat_cookiejar.MozillaCookieJar):
return line
cf = io.StringIO()
- with io.open(filename, encoding='utf-8') as f:
+ with open(filename, encoding='utf-8') as f:
for line in f:
try:
cf.write(prepare_line(line))
@@ -1612,8 +1606,7 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):
CONTENT_HEADERS = ("content-length", "content-type")
# NB: don't use dict comprehension for python 2.6 compatibility
- newheaders = dict((k, v) for k, v in req.headers.items()
- if k.lower() not in CONTENT_HEADERS)
+ newheaders = {k: v for k, v in req.headers.items() if k.lower() not in CONTENT_HEADERS}
return compat_urllib_request.Request(
newurl, headers=newheaders, origin_req_host=req.origin_req_host,
unverifiable=True)
@@ -1657,7 +1650,7 @@ def parse_iso8601(date_str, delimiter='T', timezone=None):
timezone, date_str = extract_timezone(date_str)
try:
- date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter)
+ date_format = f'%Y-%m-%d{delimiter}%H:%M:%S'
dt = datetime.datetime.strptime(date_str, date_format) - timezone
return calendar.timegm(dt.timetuple())
except ValueError:
@@ -1839,7 +1832,7 @@ def hyphenate_date(date_str):
return date_str
-class DateRange(object):
+class DateRange:
"""Represents a time interval between two dates"""
def __init__(self, start=None, end=None):
@@ -1867,7 +1860,7 @@ class DateRange(object):
return self.start <= date <= self.end
def __str__(self):
- return '%s - %s' % (self.start.isoformat(), self.end.isoformat())
+ return f'{self.start.isoformat()} - {self.end.isoformat()}'
def platform_name():
@@ -2012,7 +2005,7 @@ else:
raise LockingUnsupportedError()
-class locked_file(object):
+class locked_file:
locked = False
def __init__(self, filename, mode, block=True, encoding=None):
@@ -2039,7 +2032,7 @@ class locked_file(object):
try:
_lock_file(self.f, exclusive, self.block)
self.locked = True
- except IOError:
+ except OSError:
self.f.close()
raise
if 'w' in self.mode:
@@ -2510,14 +2503,14 @@ def parse_duration(s):
def prepend_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
return (
- '{0}.{1}{2}'.format(name, ext, real_ext)
+ f'{name}.{ext}{real_ext}'
if not expected_real_ext or real_ext[1:] == expected_real_ext
- else '{0}.{1}'.format(filename, ext))
+ else f'{filename}.{ext}')
def replace_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
- return '{0}.{1}'.format(
+ return '{}.{}'.format(
name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
ext)
@@ -2700,6 +2693,7 @@ class PagedList:
class OnDemandPagedList(PagedList):
"""Download pages until a page with less than maximum results"""
+
def _getslice(self, start, end):
for pagenum in itertools.count(start // self._pagesize):
firstid = pagenum * self._pagesize
@@ -2740,6 +2734,7 @@ class OnDemandPagedList(PagedList):
class InAdvancePagedList(PagedList):
"""PagedList with total number of pages known in advance"""
+
def __init__(self, pagefunc, pagecount, pagesize):
PagedList.__init__(self, pagefunc, pagesize, True)
self._pagecount = pagecount
@@ -2994,10 +2989,10 @@ def strip_jsonp(code):
def js_to_json(code, vars={}):
# vars is a dict of var, val pairs to substitute
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
- SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
+ SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
INTEGER_TABLE = (
- (r'(?s)^(0[xX][0-9a-fA-F]+){skip}:?$'.format(skip=SKIP_RE), 16),
- (r'(?s)^(0+[0-7]+){skip}:?$'.format(skip=SKIP_RE), 8),
+ (fr'(?s)^(0[xX][0-9a-fA-F]+){SKIP_RE}:?$', 16),
+ (fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
)
def fix_kv(m):
@@ -3518,7 +3513,7 @@ def dfxp2srt(dfxp_data):
styles = {}
default_style = {}
- class TTMLPElementParser(object):
+ class TTMLPElementParser:
_out = ''
_unclosed_elements = []
_applied_styles = []
@@ -3703,7 +3698,7 @@ def _configuration_args(main_key, argdict, exe, keys=None, default=[], use_compa
return cli_configuration_args(argdict, keys, default, use_compat)
-class ISO639Utils(object):
+class ISO639Utils:
# See http://www.loc.gov/standards/iso639-2/ISO-639-2_utf-8.txt
_lang_map = {
'aa': 'aar',
@@ -3908,7 +3903,7 @@ class ISO639Utils(object):
return short_name
-class ISO3166Utils(object):
+class ISO3166Utils:
# From http://data.okfn.org/data/core/country-list
_country_map = {
'AF': 'Afghanistan',
@@ -4168,7 +4163,7 @@ class ISO3166Utils(object):
return cls._country_map.get(code.upper())
-class GeoUtils(object):
+class GeoUtils:
# Major IPv4 address blocks per country
_country_ip_map = {
'AD': '46.172.224.0/19',
@@ -4605,7 +4600,7 @@ def decode_png(png_data):
header = png_data[8:]
if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
- raise IOError('Not a valid PNG file.')
+ raise OSError('Not a valid PNG file.')
int_map = {1: '>B', 2: '>H', 4: '>I'}
unpack_integer = lambda x: compat_struct_unpack(int_map[len(x)], x)[0]
@@ -4642,7 +4637,7 @@ def decode_png(png_data):
idat += chunk['data']
if not idat:
- raise IOError('Unable to read PNG data.')
+ raise OSError('Unable to read PNG data.')
decompressed_data = bytearray(zlib.decompress(idat))
@@ -4730,7 +4725,7 @@ def write_xattr(path, key, value):
try:
setxattr(path, key, value)
- except EnvironmentError as e:
+ except OSError as e:
raise XAttrMetadataError(e.errno, e.strerror)
except ImportError:
@@ -4744,7 +4739,7 @@ def write_xattr(path, key, value):
try:
with open(ads_fn, 'wb') as f:
f.write(value)
- except EnvironmentError as e:
+ except OSError as e:
raise XAttrMetadataError(e.errno, e.strerror)
else:
user_has_setfattr = check_executable('setfattr', ['--version'])
@@ -4767,7 +4762,7 @@ def write_xattr(path, key, value):
try:
p = Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
- except EnvironmentError as e:
+ except OSError as e:
raise XAttrMetadataError(e.errno, e.strerror)
stdout, stderr = p.communicate_or_kill()
stderr = stderr.decode('utf-8', 'replace')
@@ -4923,7 +4918,7 @@ def make_dir(path, to_screen=None):
if dn and not os.path.exists(dn):
os.makedirs(dn)
return True
- except (OSError, IOError) as err:
+ except OSError as err:
if callable(to_screen) is not None:
to_screen('unable to create directory ' + error_to_compat_str(err))
return False
@@ -5155,7 +5150,7 @@ def scale_thumbnails_to_max_format_width(formats, thumbnails, url_width_re):
"""
_keys = ('width', 'height')
max_dimensions = max(
- [tuple(format.get(k) or 0 for k in _keys) for format in formats],
+ (tuple(format.get(k) or 0 for k in _keys) for format in formats),
default=(0, 0))
if not max_dimensions[0]:
return thumbnails
@@ -5220,7 +5215,7 @@ class Config:
def read_file(filename, default=[]):
try:
optionf = open(filename)
- except IOError:
+ except OSError:
return default # silently skip if file is not present
try:
# FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
@@ -5232,7 +5227,7 @@ class Config:
@staticmethod
def hide_login_info(opts):
- PRIVATE_OPTS = set(['-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'])
+ PRIVATE_OPTS = {'-p', '--password', '-u', '--username', '--video-password', '--ap-password', '--ap-username'}
eqre = re.compile('^(?P<key>' + ('|'.join(re.escape(po) for po in PRIVATE_OPTS)) + ')=.+$')
def _scrub_eq(o):