diff options
Diffstat (limited to 'yt_dlp/utils.py')
-rw-r--r-- | yt_dlp/utils.py | 89 |
1 files changed, 42 insertions, 47 deletions
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py index 7327f3150..fd6c20682 100644 --- a/yt_dlp/utils.py +++ b/yt_dlp/utils.py @@ -39,6 +39,7 @@ import tempfile import time import traceback import types +import urllib.error import urllib.parse import urllib.request import xml.etree.ElementTree @@ -49,14 +50,8 @@ from .compat import ( compat_etree_fromstring, compat_expanduser, compat_HTMLParseError, - compat_HTTPError, compat_os_name, - compat_parse_qs, compat_shlex_quote, - compat_str, - compat_urllib_parse_urlencode, - compat_urllib_parse_urlparse, - compat_urlparse, ) from .dependencies import brotli, certifi, websockets, xattr from .socks import ProxyType, sockssocket @@ -67,8 +62,8 @@ def register_socks_protocols(): # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904 # URLs with protocols not in urlparse.uses_netloc are not handled correctly for scheme in ('socks', 'socks4', 'socks4a', 'socks5'): - if scheme not in compat_urlparse.uses_netloc: - compat_urlparse.uses_netloc.append(scheme) + if scheme not in urllib.parse.uses_netloc: + urllib.parse.uses_netloc.append(scheme) # This is not clearly defined otherwise @@ -311,7 +306,7 @@ def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT): def _find_xpath(xpath): return node.find(xpath) - if isinstance(xpath, (str, compat_str)): + if isinstance(xpath, str): n = _find_xpath(xpath) else: for xp in xpath: @@ -741,10 +736,10 @@ def sanitize_url(url): def extract_basic_auth(url): - parts = compat_urlparse.urlsplit(url) + parts = urllib.parse.urlsplit(url) if parts.username is None: return url, None - url = compat_urlparse.urlunsplit(parts._replace(netloc=( + url = urllib.parse.urlunsplit(parts._replace(netloc=( parts.hostname if parts.port is None else '%s:%d' % (parts.hostname, parts.port)))) auth_payload = base64.b64encode( @@ -889,7 +884,7 @@ def decodeFilename(b, for_subprocess=False): def encodeArgument(s): # Legacy code that uses byte strings # Uncomment the following line after fixing all post processors - # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s)) + # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s)) return s if isinstance(s, str) else s.decode('ascii') @@ -903,7 +898,7 @@ def decodeOption(optval): if isinstance(optval, bytes): optval = optval.decode(preferredencoding()) - assert isinstance(optval, compat_str) + assert isinstance(optval, str) return optval @@ -1395,7 +1390,7 @@ def make_socks_conn_class(base_class, socks_proxy): assert issubclass(base_class, ( http.client.HTTPConnection, http.client.HTTPSConnection)) - url_components = compat_urlparse.urlparse(socks_proxy) + url_components = urllib.parse.urlparse(socks_proxy) if url_components.scheme.lower() == 'socks5': socks_type = ProxyType.SOCKS5 elif url_components.scheme.lower() in ('socks', 'socks4'): @@ -1639,7 +1634,7 @@ class YoutubeDLRedirectHandler(urllib.request.HTTPRedirectHandler): m = req.get_method() if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD") or code in (301, 302, 303) and m == "POST")): - raise compat_HTTPError(req.full_url, code, msg, headers, fp) + raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp) # Strictly (according to RFC 2616), 301 or 302 in response to # a POST MUST NOT cause a redirection without confirmation # from the user (of urllib.request, in this case). In practice, @@ -1739,7 +1734,7 @@ def unified_strdate(date_str, day_first=True): with contextlib.suppress(ValueError): upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d') if upload_date is not None: - return compat_str(upload_date) + return str(upload_date) def unified_timestamp(date_str, day_first=True): @@ -1913,12 +1908,12 @@ class DateRange: def platform_name(): - """ Returns the platform name as a compat_str """ + """ Returns the platform name as a str """ res = platform.platform() if isinstance(res, bytes): res = res.decode(preferredencoding()) - assert isinstance(res, compat_str) + assert isinstance(res, str) return res @@ -2144,7 +2139,7 @@ def smuggle_url(url, data): url, idata = unsmuggle_url(url, {}) data.update(idata) - sdata = compat_urllib_parse_urlencode( + sdata = urllib.parse.urlencode( {'__youtubedl_smuggle': json.dumps(data)}) return url + '#' + sdata @@ -2153,7 +2148,7 @@ def unsmuggle_url(smug_url, default=None): if '#__youtubedl_smuggle' not in smug_url: return smug_url, default url, _, sdata = smug_url.rpartition('#') - jsond = compat_parse_qs(sdata)['__youtubedl_smuggle'][0] + jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0] data = json.loads(jsond) return url, data @@ -2313,7 +2308,7 @@ def parse_resolution(s, *, lenient=False): def parse_bitrate(s): - if not isinstance(s, compat_str): + if not isinstance(s, str): return mobj = re.search(r'\b(\d+)\s*kbps', s) if mobj: @@ -2350,7 +2345,7 @@ def fix_xml_ampersands(xml_str): def setproctitle(title): - assert isinstance(title, compat_str) + assert isinstance(title, str) # ctypes in Jython is not complete # http://bugs.jython.org/issue2148 @@ -2398,7 +2393,7 @@ def get_domain(url): def url_basename(url): - path = compat_urlparse.urlparse(url).path + path = urllib.parse.urlparse(url).path return path.strip('/').split('/')[-1] @@ -2409,16 +2404,16 @@ def base_url(url): def urljoin(base, path): if isinstance(path, bytes): path = path.decode() - if not isinstance(path, compat_str) or not path: + if not isinstance(path, str) or not path: return None if re.match(r'^(?:[a-zA-Z][a-zA-Z0-9+-.]*:)?//', path): return path if isinstance(base, bytes): base = base.decode() - if not isinstance(base, compat_str) or not re.match( + if not isinstance(base, str) or not re.match( r'^(?:https?:)?//', base): return None - return compat_urlparse.urljoin(base, path) + return urllib.parse.urljoin(base, path) class HEADRequest(urllib.request.Request): @@ -2441,14 +2436,14 @@ def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1): def str_or_none(v, default=None): - return default if v is None else compat_str(v) + return default if v is None else str(v) def str_to_int(int_str): """ A more relaxed version of int_or_none """ if isinstance(int_str, int): return int_str - elif isinstance(int_str, compat_str): + elif isinstance(int_str, str): int_str = re.sub(r'[,\.\+]', '', int_str) return int_or_none(int_str) @@ -2467,11 +2462,11 @@ def bool_or_none(v, default=None): def strip_or_none(v, default=None): - return v.strip() if isinstance(v, compat_str) else default + return v.strip() if isinstance(v, str) else default def url_or_none(url): - if not url or not isinstance(url, compat_str): + if not url or not isinstance(url, str): return None url = url.strip() return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None @@ -2489,7 +2484,7 @@ def strftime_or_none(timestamp, date_format, default=None): try: if isinstance(timestamp, (int, float)): # unix timestamp datetime_object = datetime.datetime.utcfromtimestamp(timestamp) - elif isinstance(timestamp, compat_str): # assume YYYYMMDD + elif isinstance(timestamp, str): # assume YYYYMMDD datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d') return datetime_object.strftime(date_format) except (ValueError, TypeError, AttributeError): @@ -2592,7 +2587,7 @@ def _get_exe_version_output(exe, args, *, to_screen=None): def detect_exe_version(output, version_re=None, unrecognized='present'): - assert isinstance(output, compat_str) + assert isinstance(output, str) if version_re is None: version_re = r'version\s+([-0-9._a-zA-Z]+)' m = re.search(version_re, output) @@ -2973,7 +2968,7 @@ def escape_rfc3986(s): def escape_url(url): """Escape URL as suggested by RFC 3986""" - url_parsed = compat_urllib_parse_urlparse(url) + url_parsed = urllib.parse.urlparse(url) return url_parsed._replace( netloc=url_parsed.netloc.encode('idna').decode('ascii'), path=escape_rfc3986(url_parsed.path), @@ -2984,12 +2979,12 @@ def escape_url(url): def parse_qs(url): - return compat_parse_qs(compat_urllib_parse_urlparse(url).query) + return urllib.parse.parse_qs(urllib.parse.urlparse(url).query) def read_batch_urls(batch_fd): def fixup(url): - if not isinstance(url, compat_str): + if not isinstance(url, str): url = url.decode('utf-8', 'replace') BOM_UTF8 = ('\xef\xbb\xbf', '\ufeff') for bom in BOM_UTF8: @@ -3007,17 +3002,17 @@ def read_batch_urls(batch_fd): def urlencode_postdata(*args, **kargs): - return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii') + return urllib.parse.urlencode(*args, **kargs).encode('ascii') def update_url_query(url, query): if not query: return url - parsed_url = compat_urlparse.urlparse(url) - qs = compat_parse_qs(parsed_url.query) + parsed_url = urllib.parse.urlparse(url) + qs = urllib.parse.parse_qs(parsed_url.query) qs.update(query) - return compat_urlparse.urlunparse(parsed_url._replace( - query=compat_urllib_parse_urlencode(qs, True))) + return urllib.parse.urlunparse(parsed_url._replace( + query=urllib.parse.urlencode(qs, True))) def update_Request(req, url=None, data=None, headers={}, query={}): @@ -3046,9 +3041,9 @@ def _multipart_encode_impl(data, boundary): out = b'' for k, v in data.items(): out += b'--' + boundary.encode('ascii') + b'\r\n' - if isinstance(k, compat_str): + if isinstance(k, str): k = k.encode() - if isinstance(v, compat_str): + if isinstance(v, str): v = v.encode() # RFC 2047 requires non-ASCII field names to be encoded, while RFC 7578 # suggests sending UTF-8 directly. Firefox sends UTF-8, too @@ -3129,7 +3124,7 @@ def merge_dicts(*dicts): def encode_compat_str(string, encoding=preferredencoding(), errors='strict'): - return string if isinstance(string, compat_str) else compat_str(string, encoding, errors) + return string if isinstance(string, str) else str(string, encoding, errors) US_RATINGS = { @@ -3509,7 +3504,7 @@ def determine_protocol(info_dict): elif ext == 'f4m': return 'f4m' - return compat_urllib_parse_urlparse(url).scheme + return urllib.parse.urlparse(url).scheme def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False): @@ -4632,7 +4627,7 @@ class GeoUtils: addr, preflen = block.split('/') addr_min = struct.unpack('!L', socket.inet_aton(addr))[0] addr_max = addr_min | (0xffffffff >> int(preflen)) - return compat_str(socket.inet_ntoa( + return str(socket.inet_ntoa( struct.pack('!L', random.randint(addr_min, addr_max)))) @@ -4653,7 +4648,7 @@ class PerRequestProxyHandler(urllib.request.ProxyHandler): if proxy == '__noproxy__': return None # No Proxy - if compat_urlparse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'): + if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'): req.add_header('Ytdl-socks-proxy', proxy) # yt-dlp's http/https handlers do wrapping the socket with socks return None @@ -5036,7 +5031,7 @@ def iri_to_uri(iri): The function doesn't add an additional layer of escaping; e.g., it doesn't escape `%3C` as `%253C`. Instead, it percent-escapes characters with an underlying UTF-8 encoding *besides* those already escaped, leaving the URI intact. """ - iri_parts = compat_urllib_parse_urlparse(iri) + iri_parts = urllib.parse.urlparse(iri) if '[' in iri_parts.netloc: raise ValueError('IPv6 URIs are not, yet, supported.') |