aboutsummaryrefslogtreecommitdiffstats
path: root/yt_dlp/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'yt_dlp/utils.py')
-rw-r--r--yt_dlp/utils.py89
1 files changed, 42 insertions, 47 deletions
diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py
index 7327f3150..fd6c20682 100644
--- a/yt_dlp/utils.py
+++ b/yt_dlp/utils.py
@@ -39,6 +39,7 @@ import tempfile
import time
import traceback
import types
+import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree
@@ -49,14 +50,8 @@ from .compat import (
compat_etree_fromstring,
compat_expanduser,
compat_HTMLParseError,
- compat_HTTPError,
compat_os_name,
- compat_parse_qs,
compat_shlex_quote,
- compat_str,
- compat_urllib_parse_urlencode,
- compat_urllib_parse_urlparse,
- compat_urlparse,
)
from .dependencies import brotli, certifi, websockets, xattr
from .socks import ProxyType, sockssocket
@@ -67,8 +62,8 @@ def register_socks_protocols():
# In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
# URLs with protocols not in urlparse.uses_netloc are not handled correctly
for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
- if scheme not in compat_urlparse.uses_netloc:
- compat_urlparse.uses_netloc.append(scheme)
+ if scheme not in urllib.parse.uses_netloc:
+ urllib.parse.uses_netloc.append(scheme)
# This is not clearly defined otherwise
@@ -311,7 +306,7 @@ def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
def _find_xpath(xpath):
return node.find(xpath)
- if isinstance(xpath, (str, compat_str)):
+ if isinstance(xpath, str):
n = _find_xpath(xpath)
else:
for xp in xpath:
@@ -741,10 +736,10 @@ def sanitize_url(url):
def extract_basic_auth(url):
- parts = compat_urlparse.urlsplit(url)
+ parts = urllib.parse.urlsplit(url)
if parts.username is None:
return url, None
- url = compat_urlparse.urlunsplit(parts._replace(netloc=(
+ url = urllib.parse.urlunsplit(parts._replace(netloc=(
parts.hostname if parts.port is None
else '%s:%d' % (parts.hostname, parts.port))))
auth_payload = base64.b64encode(
@@ -889,7 +884,7 @@ def decodeFilename(b, for_subprocess=False):
def encodeArgument(s):
# Legacy code that uses byte strings
# Uncomment the following line after fixing all post processors
- # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, compat_str, type(s))
+ # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s))
return s if isinstance(s, str) else s.decode('ascii')
@@ -903,7 +898,7 @@ def decodeOption(optval):
if isinstance(optval, bytes):
optval = optval.decode(preferredencoding())
- assert isinstance(optval, compat_str)
+ assert isinstance(optval, str)
return optval
@@ -1395,7 +1390,7 @@ def make_socks_conn_class(base_class, socks_proxy):
assert issubclass(base_class, (
http.client.HTTPConnection, http.client.HTTPSConnection))
- url_components = compat_urlparse.urlparse(socks_proxy)
+ url_components = urllib.parse.urlparse(socks_proxy)
if url_components.scheme.lower() == 'socks5':
socks_type = ProxyType.SOCKS5
elif url_components.scheme.lower() in ('socks', 'socks4'):
@@ -1639,7 +1634,7 @@ class YoutubeDLRedirectHandler(urllib.request.HTTPRedirectHandler):
m = req.get_method()
if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD")
or code in (301, 302, 303) and m == "POST")):
- raise compat_HTTPError(req.full_url, code, msg, headers, fp)
+ raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
# Strictly (according to RFC 2616), 301 or 302 in response to
# a POST MUST NOT cause a redirection without confirmation
# from the user (of urllib.request, in this case). In practice,
@@ -1739,7 +1734,7 @@ def unified_strdate(date_str, day_first=True):
with contextlib.suppress(ValueError):
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
if upload_date is not None:
- return compat_str(upload_date)
+ return str(upload_date)
def unified_timestamp(date_str, day_first=True):
@@ -1913,12 +1908,12 @@ class DateRange:
def platform_name():
- """ Returns the platform name as a compat_str """
+ """ Returns the platform name as a str """
res = platform.platform()
if isinstance(res, bytes):
res = res.decode(preferredencoding())
- assert isinstance(res, compat_str)
+ assert isinstance(res, str)
return res
@@ -2144,7 +2139,7 @@ def smuggle_url(url, data):
url, idata = unsmuggle_url(url, {})
data.update(idata)
- sdata = compat_urllib_parse_urlencode(
+ sdata = urllib.parse.urlencode(
{'__youtubedl_smuggle': json.dumps(data)})
return url + '#' + sdata
@@ -2153,7 +2148,7 @@ def unsmuggle_url(smug_url, default=None):
if '#__youtubedl_smuggle' not in smug_url:
return smug_url, default
url, _, sdata = smug_url.rpartition('#')
- jsond = compat_parse_qs(sdata)['__youtubedl_smuggle'][0]
+ jsond = urllib.parse.parse_qs(sdata)['__youtubedl_smuggle'][0]
data = json.loads(jsond)
return url, data
@@ -2313,7 +2308,7 @@ def parse_resolution(s, *, lenient=False):
def parse_bitrate(s):
- if not isinstance(s, compat_str):
+ if not isinstance(s, str):
return
mobj = re.search(r'\b(\d+)\s*kbps', s)
if mobj:
@@ -2350,7 +2345,7 @@ def fix_xml_ampersands(xml_str):
def setproctitle(title):
- assert isinstance(title, compat_str)
+ assert isinstance(title, str)
# ctypes in Jython is not complete
# http://bugs.jython.org/issue2148
@@ -2398,7 +2393,7 @@ def get_domain(url):
def url_basename(url):
- path = compat_urlparse.urlparse(url).path
+ path = urllib.parse.urlparse(url).path
return path.strip('/').split('/')[-1]
@@ -2409,16 +2404,16 @@ def base_url(url):
def urljoin(base, path):
if isinstance(path, bytes):
path = path.decode()
- if not isinstance(path, compat_str) or not path:
+ if not isinstance(path, str) or not path:
return None
if re.match(r'^(?:[a-zA-Z][a-zA-Z0-9+-.]*:)?//', path):
return path
if isinstance(base, bytes):
base = base.decode()
- if not isinstance(base, compat_str) or not re.match(
+ if not isinstance(base, str) or not re.match(
r'^(?:https?:)?//', base):
return None
- return compat_urlparse.urljoin(base, path)
+ return urllib.parse.urljoin(base, path)
class HEADRequest(urllib.request.Request):
@@ -2441,14 +2436,14 @@ def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
def str_or_none(v, default=None):
- return default if v is None else compat_str(v)
+ return default if v is None else str(v)
def str_to_int(int_str):
""" A more relaxed version of int_or_none """
if isinstance(int_str, int):
return int_str
- elif isinstance(int_str, compat_str):
+ elif isinstance(int_str, str):
int_str = re.sub(r'[,\.\+]', '', int_str)
return int_or_none(int_str)
@@ -2467,11 +2462,11 @@ def bool_or_none(v, default=None):
def strip_or_none(v, default=None):
- return v.strip() if isinstance(v, compat_str) else default
+ return v.strip() if isinstance(v, str) else default
def url_or_none(url):
- if not url or not isinstance(url, compat_str):
+ if not url or not isinstance(url, str):
return None
url = url.strip()
return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
@@ -2489,7 +2484,7 @@ def strftime_or_none(timestamp, date_format, default=None):
try:
if isinstance(timestamp, (int, float)): # unix timestamp
datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
- elif isinstance(timestamp, compat_str): # assume YYYYMMDD
+ elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
return datetime_object.strftime(date_format)
except (ValueError, TypeError, AttributeError):
@@ -2592,7 +2587,7 @@ def _get_exe_version_output(exe, args, *, to_screen=None):
def detect_exe_version(output, version_re=None, unrecognized='present'):
- assert isinstance(output, compat_str)
+ assert isinstance(output, str)
if version_re is None:
version_re = r'version\s+([-0-9._a-zA-Z]+)'
m = re.search(version_re, output)
@@ -2973,7 +2968,7 @@ def escape_rfc3986(s):
def escape_url(url):
"""Escape URL as suggested by RFC 3986"""
- url_parsed = compat_urllib_parse_urlparse(url)
+ url_parsed = urllib.parse.urlparse(url)
return url_parsed._replace(
netloc=url_parsed.netloc.encode('idna').decode('ascii'),
path=escape_rfc3986(url_parsed.path),
@@ -2984,12 +2979,12 @@ def escape_url(url):
def parse_qs(url):
- return compat_parse_qs(compat_urllib_parse_urlparse(url).query)
+ return urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
def read_batch_urls(batch_fd):
def fixup(url):
- if not isinstance(url, compat_str):
+ if not isinstance(url, str):
url = url.decode('utf-8', 'replace')
BOM_UTF8 = ('\xef\xbb\xbf', '\ufeff')
for bom in BOM_UTF8:
@@ -3007,17 +3002,17 @@ def read_batch_urls(batch_fd):
def urlencode_postdata(*args, **kargs):
- return compat_urllib_parse_urlencode(*args, **kargs).encode('ascii')
+ return urllib.parse.urlencode(*args, **kargs).encode('ascii')
def update_url_query(url, query):
if not query:
return url
- parsed_url = compat_urlparse.urlparse(url)
- qs = compat_parse_qs(parsed_url.query)
+ parsed_url = urllib.parse.urlparse(url)
+ qs = urllib.parse.parse_qs(parsed_url.query)
qs.update(query)
- return compat_urlparse.urlunparse(parsed_url._replace(
- query=compat_urllib_parse_urlencode(qs, True)))
+ return urllib.parse.urlunparse(parsed_url._replace(
+ query=urllib.parse.urlencode(qs, True)))
def update_Request(req, url=None, data=None, headers={}, query={}):
@@ -3046,9 +3041,9 @@ def _multipart_encode_impl(data, boundary):
out = b''
for k, v in data.items():
out += b'--' + boundary.encode('ascii') + b'\r\n'
- if isinstance(k, compat_str):
+ if isinstance(k, str):
k = k.encode()
- if isinstance(v, compat_str):
+ if isinstance(v, str):
v = v.encode()
# RFC 2047 requires non-ASCII field names to be encoded, while RFC 7578
# suggests sending UTF-8 directly. Firefox sends UTF-8, too
@@ -3129,7 +3124,7 @@ def merge_dicts(*dicts):
def encode_compat_str(string, encoding=preferredencoding(), errors='strict'):
- return string if isinstance(string, compat_str) else compat_str(string, encoding, errors)
+ return string if isinstance(string, str) else str(string, encoding, errors)
US_RATINGS = {
@@ -3509,7 +3504,7 @@ def determine_protocol(info_dict):
elif ext == 'f4m':
return 'f4m'
- return compat_urllib_parse_urlparse(url).scheme
+ return urllib.parse.urlparse(url).scheme
def render_table(header_row, data, delim=False, extra_gap=0, hide_empty=False):
@@ -4632,7 +4627,7 @@ class GeoUtils:
addr, preflen = block.split('/')
addr_min = struct.unpack('!L', socket.inet_aton(addr))[0]
addr_max = addr_min | (0xffffffff >> int(preflen))
- return compat_str(socket.inet_ntoa(
+ return str(socket.inet_ntoa(
struct.pack('!L', random.randint(addr_min, addr_max))))
@@ -4653,7 +4648,7 @@ class PerRequestProxyHandler(urllib.request.ProxyHandler):
if proxy == '__noproxy__':
return None # No Proxy
- if compat_urlparse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
+ if urllib.parse.urlparse(proxy).scheme.lower() in ('socks', 'socks4', 'socks4a', 'socks5'):
req.add_header('Ytdl-socks-proxy', proxy)
# yt-dlp's http/https handlers do wrapping the socket with socks
return None
@@ -5036,7 +5031,7 @@ def iri_to_uri(iri):
The function doesn't add an additional layer of escaping; e.g., it doesn't escape `%3C` as `%253C`. Instead, it percent-escapes characters with an underlying UTF-8 encoding *besides* those already escaped, leaving the URI intact.
"""
- iri_parts = compat_urllib_parse_urlparse(iri)
+ iri_parts = urllib.parse.urlparse(iri)
if '[' in iri_parts.netloc:
raise ValueError('IPv6 URIs are not, yet, supported.')