From b3013540b41d1eb77c4803c5fca46f8d75b40fc1 Mon Sep 17 00:00:00 2001 From: Jesus Date: Mon, 4 Sep 2023 01:59:36 +0800 Subject: update from upstream --- hypervideo_dl/utils/_utils.py | 5484 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 5484 insertions(+) create mode 100644 hypervideo_dl/utils/_utils.py (limited to 'hypervideo_dl/utils/_utils.py') diff --git a/hypervideo_dl/utils/_utils.py b/hypervideo_dl/utils/_utils.py new file mode 100644 index 0000000..5a85462 --- /dev/null +++ b/hypervideo_dl/utils/_utils.py @@ -0,0 +1,5484 @@ +import asyncio +import atexit +import base64 +import binascii +import calendar +import codecs +import collections +import collections.abc +import contextlib +import datetime +import email.header +import email.utils +import errno +import hashlib +import hmac +import html.entities +import html.parser +import inspect +import io +import itertools +import json +import locale +import math +import mimetypes +import netrc +import operator +import os +import platform +import random +import re +import shlex +import socket +import ssl +import struct +import subprocess +import sys +import tempfile +import time +import traceback +import types +import unicodedata +import urllib.error +import urllib.parse +import urllib.request +import xml.etree.ElementTree + +from . import traversal + +from ..compat import functools # isort: split +from ..compat import ( + compat_etree_fromstring, + compat_expanduser, + compat_HTMLParseError, + compat_os_name, + compat_shlex_quote, +) +from ..dependencies import websockets, xattr + +__name__ = __name__.rsplit('.', 1)[0] # Pretend to be the parent module + +# This is not clearly defined otherwise +compiled_regex_type = type(re.compile('')) + + +class NO_DEFAULT: + pass + + +def IDENTITY(x): + return x + + +ENGLISH_MONTH_NAMES = [ + 'January', 'February', 'March', 'April', 'May', 'June', + 'July', 'August', 'September', 'October', 'November', 'December'] + +MONTH_NAMES = { + 'en': ENGLISH_MONTH_NAMES, + 'fr': [ + 'janvier', 'février', 'mars', 'avril', 'mai', 'juin', + 'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'], + # these follow the genitive grammatical case (dopełniacz) + # some websites might be using nominative, which will require another month list + # https://en.wikibooks.org/wiki/Polish/Noun_cases + 'pl': ['stycznia', 'lutego', 'marca', 'kwietnia', 'maja', 'czerwca', + 'lipca', 'sierpnia', 'września', 'października', 'listopada', 'grudnia'], +} + +# From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42 +TIMEZONE_NAMES = { + 'UT': 0, 'UTC': 0, 'GMT': 0, 'Z': 0, + 'AST': -4, 'ADT': -3, # Atlantic (used in Canada) + 'EST': -5, 'EDT': -4, # Eastern + 'CST': -6, 'CDT': -5, # Central + 'MST': -7, 'MDT': -6, # Mountain + 'PST': -8, 'PDT': -7 # Pacific +} + +# needed for sanitizing filenames in restricted mode +ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ', + itertools.chain('AAAAAA', ['AE'], 'CEEEEIIIIDNOOOOOOO', ['OE'], 'UUUUUY', ['TH', 'ss'], + 'aaaaaa', ['ae'], 'ceeeeiiiionooooooo', ['oe'], 'uuuuuy', ['th'], 'y'))) + +DATE_FORMATS = ( + '%d %B %Y', + '%d %b %Y', + '%B %d %Y', + '%B %dst %Y', + '%B %dnd %Y', + '%B %drd %Y', + '%B %dth %Y', + '%b %d %Y', + '%b %dst %Y', + '%b %dnd %Y', + '%b %drd %Y', + '%b %dth %Y', + '%b %dst %Y %I:%M', + '%b %dnd %Y %I:%M', + '%b %drd %Y %I:%M', + '%b %dth %Y %I:%M', + '%Y %m %d', + '%Y-%m-%d', + '%Y.%m.%d.', + '%Y/%m/%d', + '%Y/%m/%d %H:%M', + '%Y/%m/%d %H:%M:%S', + '%Y%m%d%H%M', + '%Y%m%d%H%M%S', + '%Y%m%d', + '%Y-%m-%d %H:%M', + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%d %H:%M:%S.%f', + '%Y-%m-%d %H:%M:%S:%f', + '%d.%m.%Y %H:%M', + '%d.%m.%Y %H.%M', + '%Y-%m-%dT%H:%M:%SZ', + '%Y-%m-%dT%H:%M:%S.%fZ', + '%Y-%m-%dT%H:%M:%S.%f0Z', + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%dT%H:%M:%S.%f', + '%Y-%m-%dT%H:%M', + '%b %d %Y at %H:%M', + '%b %d %Y at %H:%M:%S', + '%B %d %Y at %H:%M', + '%B %d %Y at %H:%M:%S', + '%H:%M %d-%b-%Y', +) + +DATE_FORMATS_DAY_FIRST = list(DATE_FORMATS) +DATE_FORMATS_DAY_FIRST.extend([ + '%d-%m-%Y', + '%d.%m.%Y', + '%d.%m.%y', + '%d/%m/%Y', + '%d/%m/%y', + '%d/%m/%Y %H:%M:%S', + '%d-%m-%Y %H:%M', + '%H:%M %d/%m/%Y', +]) + +DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS) +DATE_FORMATS_MONTH_FIRST.extend([ + '%m-%d-%Y', + '%m.%d.%Y', + '%m/%d/%Y', + '%m/%d/%y', + '%m/%d/%Y %H:%M:%S', +]) + +PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)" +JSON_LD_RE = r'(?is)]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P{.+?}|\[.+?\])\s*' + +NUMBER_RE = r'\d+(?:\.\d+)?' + + +@functools.cache +def preferredencoding(): + """Get preferred encoding. + + Returns the best encoding scheme for the system, based on + locale.getpreferredencoding() and some further tweaks. + """ + try: + pref = locale.getpreferredencoding() + 'TEST'.encode(pref) + except Exception: + pref = 'UTF-8' + + return pref + + +def write_json_file(obj, fn): + """ Encode obj as JSON and write it to fn, atomically if possible """ + + tf = tempfile.NamedTemporaryFile( + prefix=f'{os.path.basename(fn)}.', dir=os.path.dirname(fn), + suffix='.tmp', delete=False, mode='w', encoding='utf-8') + + try: + with tf: + json.dump(obj, tf, ensure_ascii=False) + if sys.platform == 'win32': + # Need to remove existing file on Windows, else os.rename raises + # WindowsError or FileExistsError. + with contextlib.suppress(OSError): + os.unlink(fn) + with contextlib.suppress(OSError): + mask = os.umask(0) + os.umask(mask) + os.chmod(tf.name, 0o666 & ~mask) + os.rename(tf.name, fn) + except Exception: + with contextlib.suppress(OSError): + os.remove(tf.name) + raise + + +def find_xpath_attr(node, xpath, key, val=None): + """ Find the xpath xpath[@key=val] """ + assert re.match(r'^[a-zA-Z_-]+$', key) + expr = xpath + ('[@%s]' % key if val is None else f"[@{key}='{val}']") + return node.find(expr) + +# On python2.6 the xml.etree.ElementTree.Element methods don't support +# the namespace parameter + + +def xpath_with_ns(path, ns_map): + components = [c.split(':') for c in path.split('/')] + replaced = [] + for c in components: + if len(c) == 1: + replaced.append(c[0]) + else: + ns, tag = c + replaced.append('{%s}%s' % (ns_map[ns], tag)) + return '/'.join(replaced) + + +def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT): + def _find_xpath(xpath): + return node.find(xpath) + + if isinstance(xpath, str): + n = _find_xpath(xpath) + else: + for xp in xpath: + n = _find_xpath(xp) + if n is not None: + break + + if n is None: + if default is not NO_DEFAULT: + return default + elif fatal: + name = xpath if name is None else name + raise ExtractorError('Could not find XML element %s' % name) + else: + return None + return n + + +def xpath_text(node, xpath, name=None, fatal=False, default=NO_DEFAULT): + n = xpath_element(node, xpath, name, fatal=fatal, default=default) + if n is None or n == default: + return n + if n.text is None: + if default is not NO_DEFAULT: + return default + elif fatal: + name = xpath if name is None else name + raise ExtractorError('Could not find XML element\'s text %s' % name) + else: + return None + return n.text + + +def xpath_attr(node, xpath, key, name=None, fatal=False, default=NO_DEFAULT): + n = find_xpath_attr(node, xpath, key) + if n is None: + if default is not NO_DEFAULT: + return default + elif fatal: + name = f'{xpath}[@{key}]' if name is None else name + raise ExtractorError('Could not find XML attribute %s' % name) + else: + return None + return n.attrib[key] + + +def get_element_by_id(id, html, **kwargs): + """Return the content of the tag with the specified ID in the passed HTML document""" + return get_element_by_attribute('id', id, html, **kwargs) + + +def get_element_html_by_id(id, html, **kwargs): + """Return the html of the tag with the specified ID in the passed HTML document""" + return get_element_html_by_attribute('id', id, html, **kwargs) + + +def get_element_by_class(class_name, html): + """Return the content of the first tag with the specified class in the passed HTML document""" + retval = get_elements_by_class(class_name, html) + return retval[0] if retval else None + + +def get_element_html_by_class(class_name, html): + """Return the html of the first tag with the specified class in the passed HTML document""" + retval = get_elements_html_by_class(class_name, html) + return retval[0] if retval else None + + +def get_element_by_attribute(attribute, value, html, **kwargs): + retval = get_elements_by_attribute(attribute, value, html, **kwargs) + return retval[0] if retval else None + + +def get_element_html_by_attribute(attribute, value, html, **kargs): + retval = get_elements_html_by_attribute(attribute, value, html, **kargs) + return retval[0] if retval else None + + +def get_elements_by_class(class_name, html, **kargs): + """Return the content of all tags with the specified class in the passed HTML document as a list""" + return get_elements_by_attribute( + 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name), + html, escape_value=False) + + +def get_elements_html_by_class(class_name, html): + """Return the html of all tags with the specified class in the passed HTML document as a list""" + return get_elements_html_by_attribute( + 'class', r'[^\'"]*(?<=[\'"\s])%s(?=[\'"\s])[^\'"]*' % re.escape(class_name), + html, escape_value=False) + + +def get_elements_by_attribute(*args, **kwargs): + """Return the content of the tag with the specified attribute in the passed HTML document""" + return [content for content, _ in get_elements_text_and_html_by_attribute(*args, **kwargs)] + + +def get_elements_html_by_attribute(*args, **kwargs): + """Return the html of the tag with the specified attribute in the passed HTML document""" + return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)] + + +def get_elements_text_and_html_by_attribute(attribute, value, html, *, tag=r'[\w:.-]+', escape_value=True): + """ + Return the text (content) and the html (whole) of the tag with the specified + attribute in the passed HTML document + """ + if not value: + return + + quote = '' if re.match(r'''[\s"'`=<>]''', value) else '?' + + value = re.escape(value) if escape_value else value + + partial_element_re = rf'''(?x) + <(?P{tag}) + (?:\s(?:[^>"']|"[^"]*"|'[^']*')*)? + \s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q) + ''' + + for m in re.finditer(partial_element_re, html): + content, whole = get_element_text_and_html_by_tag(m.group('tag'), html[m.start():]) + + yield ( + unescapeHTML(re.sub(r'^(?P["\'])(?P.*)(?P=q)$', r'\g', content, flags=re.DOTALL)), + whole + ) + + +class HTMLBreakOnClosingTagParser(html.parser.HTMLParser): + """ + HTML parser which raises HTMLBreakOnClosingTagException upon reaching the + closing tag for the first opening tag it has encountered, and can be used + as a context manager + """ + + class HTMLBreakOnClosingTagException(Exception): + pass + + def __init__(self): + self.tagstack = collections.deque() + html.parser.HTMLParser.__init__(self) + + def __enter__(self): + return self + + def __exit__(self, *_): + self.close() + + def close(self): + # handle_endtag does not return upon raising HTMLBreakOnClosingTagException, + # so data remains buffered; we no longer have any interest in it, thus + # override this method to discard it + pass + + def handle_starttag(self, tag, _): + self.tagstack.append(tag) + + def handle_endtag(self, tag): + if not self.tagstack: + raise compat_HTMLParseError('no tags in the stack') + while self.tagstack: + inner_tag = self.tagstack.pop() + if inner_tag == tag: + break + else: + raise compat_HTMLParseError(f'matching opening tag for closing {tag} tag not found') + if not self.tagstack: + raise self.HTMLBreakOnClosingTagException() + + +# XXX: This should be far less strict +def get_element_text_and_html_by_tag(tag, html): + """ + For the first element with the specified tag in the passed HTML document + return its' content (text) and the whole element (html) + """ + def find_or_raise(haystack, needle, exc): + try: + return haystack.index(needle) + except ValueError: + raise exc + closing_tag = f'' + whole_start = find_or_raise( + html, f'<{tag}', compat_HTMLParseError(f'opening {tag} tag not found')) + content_start = find_or_raise( + html[whole_start:], '>', compat_HTMLParseError(f'malformed opening {tag} tag')) + content_start += whole_start + 1 + with HTMLBreakOnClosingTagParser() as parser: + parser.feed(html[whole_start:content_start]) + if not parser.tagstack or parser.tagstack[0] != tag: + raise compat_HTMLParseError(f'parser did not match opening {tag} tag') + offset = content_start + while offset < len(html): + next_closing_tag_start = find_or_raise( + html[offset:], closing_tag, + compat_HTMLParseError(f'closing {tag} tag not found')) + next_closing_tag_end = next_closing_tag_start + len(closing_tag) + try: + parser.feed(html[offset:offset + next_closing_tag_end]) + offset += next_closing_tag_end + except HTMLBreakOnClosingTagParser.HTMLBreakOnClosingTagException: + return html[content_start:offset + next_closing_tag_start], \ + html[whole_start:offset + next_closing_tag_end] + raise compat_HTMLParseError('unexpected end of html') + + +class HTMLAttributeParser(html.parser.HTMLParser): + """Trivial HTML parser to gather the attributes for a single element""" + + def __init__(self): + self.attrs = {} + html.parser.HTMLParser.__init__(self) + + def handle_starttag(self, tag, attrs): + self.attrs = dict(attrs) + raise compat_HTMLParseError('done') + + +class HTMLListAttrsParser(html.parser.HTMLParser): + """HTML parser to gather the attributes for the elements of a list""" + + def __init__(self): + html.parser.HTMLParser.__init__(self) + self.items = [] + self._level = 0 + + def handle_starttag(self, tag, attrs): + if tag == 'li' and self._level == 0: + self.items.append(dict(attrs)) + self._level += 1 + + def handle_endtag(self, tag): + self._level -= 1 + + +def extract_attributes(html_element): + """Given a string for an HTML element such as + + Decode and return a dictionary of attributes. + { + 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz', + 'empty': '', 'noval': None, 'entity': '&', + 'sq': '"', 'dq': '\'' + }. + """ + parser = HTMLAttributeParser() + with contextlib.suppress(compat_HTMLParseError): + parser.feed(html_element) + parser.close() + return parser.attrs + + +def parse_list(webpage): + """Given a string for an series of HTML
  • elements, + return a dictionary of their attributes""" + parser = HTMLListAttrsParser() + parser.feed(webpage) + parser.close() + return parser.items + + +def clean_html(html): + """Clean an HTML snippet into a readable string""" + + if html is None: # Convenience for sanitizing descriptions etc. + return html + + html = re.sub(r'\s+', ' ', html) + html = re.sub(r'(?u)\s?<\s?br\s?/?\s?>\s?', '\n', html) + html = re.sub(r'(?u)<\s?/\s?p\s?>\s?<\s?p[^>]*>', '\n', html) + # Strip html tags + html = re.sub('<.*?>', '', html) + # Replace html entities + html = unescapeHTML(html) + return html.strip() + + +class LenientJSONDecoder(json.JSONDecoder): + # TODO: Write tests + def __init__(self, *args, transform_source=None, ignore_extra=False, close_objects=0, **kwargs): + self.transform_source, self.ignore_extra = transform_source, ignore_extra + self._close_attempts = 2 * close_objects + super().__init__(*args, **kwargs) + + @staticmethod + def _close_object(err): + doc = err.doc[:err.pos] + # We need to add comma first to get the correct error message + if err.msg.startswith('Expecting \',\''): + return doc + ',' + elif not doc.endswith(','): + return + + if err.msg.startswith('Expecting property name'): + return doc[:-1] + '}' + elif err.msg.startswith('Expecting value'): + return doc[:-1] + ']' + + def decode(self, s): + if self.transform_source: + s = self.transform_source(s) + for attempt in range(self._close_attempts + 1): + try: + if self.ignore_extra: + return self.raw_decode(s.lstrip())[0] + return super().decode(s) + except json.JSONDecodeError as e: + if e.pos is None: + raise + elif attempt < self._close_attempts: + s = self._close_object(e) + if s is not None: + continue + raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos) + assert False, 'Too many attempts to decode JSON' + + +def sanitize_open(filename, open_mode): + """Try to open the given filename, and slightly tweak it if this fails. + + Attempts to open the given filename. If this fails, it tries to change + the filename slightly, step by step, until it's either able to open it + or it fails and raises a final exception, like the standard open() + function. + + It returns the tuple (stream, definitive_file_name). + """ + if filename == '-': + if sys.platform == 'win32': + import msvcrt + + # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout + with contextlib.suppress(io.UnsupportedOperation): + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) + return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename) + + for attempt in range(2): + try: + try: + if sys.platform == 'win32': + # FIXME: An exclusive lock also locks the file from being read. + # Since windows locks are mandatory, don't lock the file on windows (for now). + # Ref: https://github.com/hypervideo/hypervideo/issues/3124 + raise LockingUnsupportedError() + stream = locked_file(filename, open_mode, block=False).__enter__() + except OSError: + stream = open(filename, open_mode) + return stream, filename + except OSError as err: + if attempt or err.errno in (errno.EACCES,): + raise + old_filename, filename = filename, sanitize_path(filename) + if old_filename == filename: + raise + + +def timeconvert(timestr): + """Convert RFC 2822 defined time string into system timestamp""" + timestamp = None + timetuple = email.utils.parsedate_tz(timestr) + if timetuple is not None: + timestamp = email.utils.mktime_tz(timetuple) + return timestamp + + +def sanitize_filename(s, restricted=False, is_id=NO_DEFAULT): + """Sanitizes a string so it could be used as part of a filename. + @param restricted Use a stricter subset of allowed characters + @param is_id Whether this is an ID that should be kept unchanged if possible. + If unset, hypervideo's new sanitization rules are in effect + """ + if s == '': + return '' + + def replace_insane(char): + if restricted and char in ACCENT_CHARS: + return ACCENT_CHARS[char] + elif not restricted and char == '\n': + return '\0 ' + elif is_id is NO_DEFAULT and not restricted and char in '"*:<>?|/\\': + # Replace with their full-width unicode counterparts + return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0)) + elif char == '?' or ord(char) < 32 or ord(char) == 127: + return '' + elif char == '"': + return '' if restricted else '\'' + elif char == ':': + return '\0_\0-' if restricted else '\0 \0-' + elif char in '\\/|*<>': + return '\0_' + if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127): + return '\0_' + return char + + # Replace look-alike Unicode glyphs + if restricted and (is_id is NO_DEFAULT or not is_id): + s = unicodedata.normalize('NFKC', s) + s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps + result = ''.join(map(replace_insane, s)) + if is_id is NO_DEFAULT: + result = re.sub(r'(\0.)(?:(?=\1)..)+', r'\1', result) # Remove repeated substitute chars + STRIP_RE = r'(?:\0.|[ _-])*' + result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result) # Remove substitute chars from start/end + result = result.replace('\0', '') or '_' + + if not is_id: + while '__' in result: + result = result.replace('__', '_') + result = result.strip('_') + # Common case of "Foreign band name - English song title" + if restricted and result.startswith('-_'): + result = result[2:] + if result.startswith('-'): + result = '_' + result[len('-'):] + result = result.lstrip('.') + if not result: + result = '_' + return result + + +def sanitize_path(s, force=False): + """Sanitizes and normalizes path on Windows""" + if sys.platform == 'win32': + force = False + drive_or_unc, _ = os.path.splitdrive(s) + elif force: + drive_or_unc = '' + else: + return s + + norm_path = os.path.normpath(remove_start(s, drive_or_unc)).split(os.path.sep) + if drive_or_unc: + norm_path.pop(0) + sanitized_path = [ + path_part if path_part in ['.', '..'] else re.sub(r'(?:[/<>:"\|\\?\*]|[\s.]$)', '#', path_part) + for path_part in norm_path] + if drive_or_unc: + sanitized_path.insert(0, drive_or_unc + os.path.sep) + elif force and s and s[0] == os.path.sep: + sanitized_path.insert(0, os.path.sep) + return os.path.join(*sanitized_path) + + +def sanitize_url(url, *, scheme='http'): + # Prepend protocol-less URLs with `http:` scheme in order to mitigate + # the number of unwanted failures due to missing protocol + if url is None: + return + elif url.startswith('//'): + return f'{scheme}:{url}' + # Fix some common typos seen so far + COMMON_TYPOS = ( + # https://github.com/ytdl-org/youtube-dl/issues/15649 + (r'^httpss://', r'https://'), + # https://bx1.be/lives/direct-tv/ + (r'^rmtp([es]?)://', r'rtmp\1://'), + ) + for mistake, fixup in COMMON_TYPOS: + if re.match(mistake, url): + return re.sub(mistake, fixup, url) + return url + + +def extract_basic_auth(url): + parts = urllib.parse.urlsplit(url) + if parts.username is None: + return url, None + url = urllib.parse.urlunsplit(parts._replace(netloc=( + parts.hostname if parts.port is None + else '%s:%d' % (parts.hostname, parts.port)))) + auth_payload = base64.b64encode( + ('%s:%s' % (parts.username, parts.password or '')).encode()) + return url, f'Basic {auth_payload.decode()}' + + +def expand_path(s): + """Expand shell variables and ~""" + return os.path.expandvars(compat_expanduser(s)) + + +def orderedSet(iterable, *, lazy=False): + """Remove all duplicates from the input iterable""" + def _iter(): + seen = [] # Do not use set since the items can be unhashable + for x in iterable: + if x not in seen: + seen.append(x) + yield x + + return _iter() if lazy else list(_iter()) + + +def _htmlentity_transform(entity_with_semicolon): + """Transforms an HTML entity to a character.""" + entity = entity_with_semicolon[:-1] + + # Known non-numeric HTML entity + if entity in html.entities.name2codepoint: + return chr(html.entities.name2codepoint[entity]) + + # TODO: HTML5 allows entities without a semicolon. + # E.g. 'Éric' should be decoded as 'Éric'. + if entity_with_semicolon in html.entities.html5: + return html.entities.html5[entity_with_semicolon] + + mobj = re.match(r'#(x[0-9a-fA-F]+|[0-9]+)', entity) + if mobj is not None: + numstr = mobj.group(1) + if numstr.startswith('x'): + base = 16 + numstr = '0%s' % numstr + else: + base = 10 + # See https://github.com/ytdl-org/youtube-dl/issues/7518 + with contextlib.suppress(ValueError): + return chr(int(numstr, base)) + + # Unknown entity in name, return its literal representation + return '&%s;' % entity + + +def unescapeHTML(s): + if s is None: + return None + assert isinstance(s, str) + + return re.sub( + r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s) + + +def escapeHTML(text): + return ( + text + .replace('&', '&') + .replace('<', '<') + .replace('>', '>') + .replace('"', '"') + .replace("'", ''') + ) + + +class netrc_from_content(netrc.netrc): + def __init__(self, content): + self.hosts, self.macros = {}, {} + with io.StringIO(content) as stream: + self._parse('-', stream, False) + + +class Popen(subprocess.Popen): + if sys.platform == 'win32': + _startupinfo = subprocess.STARTUPINFO() + _startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + else: + _startupinfo = None + + @staticmethod + def _fix_pyinstaller_ld_path(env): + """Restore LD_LIBRARY_PATH when using PyInstaller + Ref: https://github.com/pyinstaller/pyinstaller/blob/develop/doc/runtime-information.rst#ld_library_path--libpath-considerations + https://github.com/hypervideo/hypervideo/issues/4573 + """ + if not hasattr(sys, '_MEIPASS'): + return + + def _fix(key): + orig = env.get(f'{key}_ORIG') + if orig is None: + env.pop(key, None) + else: + env[key] = orig + + _fix('LD_LIBRARY_PATH') # Linux + _fix('DYLD_LIBRARY_PATH') # macOS + + def __init__(self, *args, env=None, text=False, **kwargs): + if env is None: + env = os.environ.copy() + self._fix_pyinstaller_ld_path(env) + + self.__text_mode = kwargs.get('encoding') or kwargs.get('errors') or text or kwargs.get('universal_newlines') + if text is True: + kwargs['universal_newlines'] = True # For 3.6 compatibility + kwargs.setdefault('encoding', 'utf-8') + kwargs.setdefault('errors', 'replace') + super().__init__(*args, env=env, **kwargs, startupinfo=self._startupinfo) + + def communicate_or_kill(self, *args, **kwargs): + try: + return self.communicate(*args, **kwargs) + except BaseException: # Including KeyboardInterrupt + self.kill(timeout=None) + raise + + def kill(self, *, timeout=0): + super().kill() + if timeout != 0: + self.wait(timeout=timeout) + + @classmethod + def run(cls, *args, timeout=None, **kwargs): + with cls(*args, **kwargs) as proc: + default = '' if proc.__text_mode else b'' + stdout, stderr = proc.communicate_or_kill(timeout=timeout) + return stdout or default, stderr or default, proc.returncode + + +def encodeArgument(s): + # Legacy code that uses byte strings + # Uncomment the following line after fixing all post processors + # assert isinstance(s, str), 'Internal error: %r should be of type %r, is %r' % (s, str, type(s)) + return s if isinstance(s, str) else s.decode('ascii') + + +_timetuple = collections.namedtuple('Time', ('hours', 'minutes', 'seconds', 'milliseconds')) + + +def timetuple_from_msec(msec): + secs, msec = divmod(msec, 1000) + mins, secs = divmod(secs, 60) + hrs, mins = divmod(mins, 60) + return _timetuple(hrs, mins, secs, msec) + + +def formatSeconds(secs, delim=':', msec=False): + time = timetuple_from_msec(secs * 1000) + if time.hours: + ret = '%d%s%02d%s%02d' % (time.hours, delim, time.minutes, delim, time.seconds) + elif time.minutes: + ret = '%d%s%02d' % (time.minutes, delim, time.seconds) + else: + ret = '%d' % time.seconds + return '%s.%03d' % (ret, time.milliseconds) if msec else ret + + +def bug_reports_message(before=';'): + msg = ('please report this issue on https://issues.hyperbola.info/ , ' + 'filling out the appropriate issue template. ' + 'Confirm you are on the latest version using pacman -Su') + + before = before.rstrip() + if not before or before.endswith(('.', '!', '?')): + msg = msg[0].title() + msg[1:] + + return (before + ' ' if before else '') + msg + + +class YoutubeDLError(Exception): + """Base exception for YoutubeDL errors.""" + msg = None + + def __init__(self, msg=None): + if msg is not None: + self.msg = msg + elif self.msg is None: + self.msg = type(self).__name__ + super().__init__(self.msg) + + +class ExtractorError(YoutubeDLError): + """Error during info extraction.""" + + def __init__(self, msg, tb=None, expected=False, cause=None, video_id=None, ie=None): + """ tb, if given, is the original traceback (so that it can be printed out). + If expected is set, this is a normal error message and most likely not a bug in hypervideo. + """ + from ..networking.exceptions import network_exceptions + if sys.exc_info()[0] in network_exceptions: + expected = True + + self.orig_msg = str(msg) + self.traceback = tb + self.expected = expected + self.cause = cause + self.video_id = video_id + self.ie = ie + self.exc_info = sys.exc_info() # preserve original exception + if isinstance(self.exc_info[1], ExtractorError): + self.exc_info = self.exc_info[1].exc_info + super().__init__(self.__msg) + + @property + def __msg(self): + return ''.join(( + format_field(self.ie, None, '[%s] '), + format_field(self.video_id, None, '%s: '), + self.orig_msg, + format_field(self.cause, None, ' (caused by %r)'), + '' if self.expected else bug_reports_message())) + + def format_traceback(self): + return join_nonempty( + self.traceback and ''.join(traceback.format_tb(self.traceback)), + self.cause and ''.join(traceback.format_exception(None, self.cause, self.cause.__traceback__)[1:]), + delim='\n') or None + + def __setattr__(self, name, value): + super().__setattr__(name, value) + if getattr(self, 'msg', None) and name not in ('msg', 'args'): + self.msg = self.__msg or type(self).__name__ + self.args = (self.msg, ) # Cannot be property + + +class UnsupportedError(ExtractorError): + def __init__(self, url): + super().__init__( + 'Unsupported URL: %s' % url, expected=True) + self.url = url + + +class RegexNotFoundError(ExtractorError): + """Error when a regex didn't match""" + pass + + +class GeoRestrictedError(ExtractorError): + """Geographic restriction Error exception. + + This exception may be thrown when a video is not available from your + geographic location due to geographic restrictions imposed by a website. + """ + + def __init__(self, msg, countries=None, **kwargs): + kwargs['expected'] = True + super().__init__(msg, **kwargs) + self.countries = countries + + +class UserNotLive(ExtractorError): + """Error when a channel/user is not live""" + + def __init__(self, msg=None, **kwargs): + kwargs['expected'] = True + super().__init__(msg or 'The channel is not currently live', **kwargs) + + +class DownloadError(YoutubeDLError): + """Download Error exception. + + This exception may be thrown by FileDownloader objects if they are not + configured to continue on errors. They will contain the appropriate + error message. + """ + + def __init__(self, msg, exc_info=None): + """ exc_info, if given, is the original exception that caused the trouble (as returned by sys.exc_info()). """ + super().__init__(msg) + self.exc_info = exc_info + + +class EntryNotInPlaylist(YoutubeDLError): + """Entry not in playlist exception. + + This exception will be thrown by YoutubeDL when a requested entry + is not found in the playlist info_dict + """ + msg = 'Entry not found in info' + + +class SameFileError(YoutubeDLError): + """Same File exception. + + This exception will be thrown by FileDownloader objects if they detect + multiple files would have to be downloaded to the same file on disk. + """ + msg = 'Fixed output name but more than one file to download' + + def __init__(self, filename=None): + if filename is not None: + self.msg += f': {filename}' + super().__init__(self.msg) + + +class PostProcessingError(YoutubeDLError): + """Post Processing exception. + + This exception may be raised by PostProcessor's .run() method to + indicate an error in the postprocessing task. + """ + + +class DownloadCancelled(YoutubeDLError): + """ Exception raised when the download queue should be interrupted """ + msg = 'The download was cancelled' + + +class ExistingVideoReached(DownloadCancelled): + """ --break-on-existing triggered """ + msg = 'Encountered a video that is already in the archive, stopping due to --break-on-existing' + + +class RejectedVideoReached(DownloadCancelled): + """ --break-match-filter triggered """ + msg = 'Encountered a video that did not match filter, stopping due to --break-match-filter' + + +class MaxDownloadsReached(DownloadCancelled): + """ --max-downloads limit has been reached. """ + msg = 'Maximum number of downloads reached, stopping due to --max-downloads' + + +class ReExtractInfo(YoutubeDLError): + """ Video info needs to be re-extracted. """ + + def __init__(self, msg, expected=False): + super().__init__(msg) + self.expected = expected + + +class ThrottledDownload(ReExtractInfo): + """ Download speed below --throttled-rate. """ + msg = 'The download speed is below throttle limit' + + def __init__(self): + super().__init__(self.msg, expected=False) + + +class UnavailableVideoError(YoutubeDLError): + """Unavailable Format exception. + + This exception will be thrown when a video is requested + in a format that is not available for that video. + """ + msg = 'Unable to download video' + + def __init__(self, err=None): + if err is not None: + self.msg += f': {err}' + super().__init__(self.msg) + + +class ContentTooShortError(YoutubeDLError): + """Content Too Short exception. + + This exception may be raised by FileDownloader objects when a file they + download is too small for what the server announced first, indicating + the connection was probably interrupted. + """ + + def __init__(self, downloaded, expected): + super().__init__(f'Downloaded {downloaded} bytes, expected {expected} bytes') + # Both in bytes + self.downloaded = downloaded + self.expected = expected + + +class XAttrMetadataError(YoutubeDLError): + def __init__(self, code=None, msg='Unknown error'): + super().__init__(msg) + self.code = code + self.msg = msg + + # Parsing code and msg + if (self.code in (errno.ENOSPC, errno.EDQUOT) + or 'No space left' in self.msg or 'Disk quota exceeded' in self.msg): + self.reason = 'NO_SPACE' + elif self.code == errno.E2BIG or 'Argument list too long' in self.msg: + self.reason = 'VALUE_TOO_LONG' + else: + self.reason = 'NOT_SUPPORTED' + + +class XAttrUnavailableError(YoutubeDLError): + pass + + +def is_path_like(f): + return isinstance(f, (str, bytes, os.PathLike)) + + +def extract_timezone(date_str): + m = re.search( + r'''(?x) + ^.{8,}? # >=8 char non-TZ prefix, if present + (?PZ| # just the UTC Z, or + (?:(?<=.\b\d{4}|\b\d{2}:\d\d)| # preceded by 4 digits or hh:mm or + (?= 4 alpha or 2 digits + [ ]? # optional space + (?P\+|-) # +/- + (?P[0-9]{2}):?(?P[0-9]{2}) # hh[:]mm + $) + ''', date_str) + if not m: + m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P\s*[A-Z]+)$', date_str) + timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip()) + if timezone is not None: + date_str = date_str[:-len(m.group('tz'))] + timezone = datetime.timedelta(hours=timezone or 0) + else: + date_str = date_str[:-len(m.group('tz'))] + if not m.group('sign'): + timezone = datetime.timedelta() + else: + sign = 1 if m.group('sign') == '+' else -1 + timezone = datetime.timedelta( + hours=sign * int(m.group('hours')), + minutes=sign * int(m.group('minutes'))) + return timezone, date_str + + +def parse_iso8601(date_str, delimiter='T', timezone=None): + """ Return a UNIX timestamp from the given date """ + + if date_str is None: + return None + + date_str = re.sub(r'\.[0-9]+', '', date_str) + + if timezone is None: + timezone, date_str = extract_timezone(date_str) + + with contextlib.suppress(ValueError): + date_format = f'%Y-%m-%d{delimiter}%H:%M:%S' + dt = datetime.datetime.strptime(date_str, date_format) - timezone + return calendar.timegm(dt.timetuple()) + + +def date_formats(day_first=True): + return DATE_FORMATS_DAY_FIRST if day_first else DATE_FORMATS_MONTH_FIRST + + +def unified_strdate(date_str, day_first=True): + """Return a string with the date in the format YYYYMMDD""" + + if date_str is None: + return None + upload_date = None + # Replace commas + date_str = date_str.replace(',', ' ') + # Remove AM/PM + timezone + date_str = re.sub(r'(?i)\s*(?:AM|PM)(?:\s+[A-Z]+)?', '', date_str) + _, date_str = extract_timezone(date_str) + + for expression in date_formats(day_first): + with contextlib.suppress(ValueError): + upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d') + if upload_date is None: + timetuple = email.utils.parsedate_tz(date_str) + if timetuple: + with contextlib.suppress(ValueError): + upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d') + if upload_date is not None: + return str(upload_date) + + +def unified_timestamp(date_str, day_first=True): + if not isinstance(date_str, str): + return None + + date_str = re.sub(r'\s+', ' ', re.sub( + r'(?i)[,|]|(mon|tues?|wed(nes)?|thu(rs)?|fri|sat(ur)?)(day)?', '', date_str)) + + pm_delta = 12 if re.search(r'(?i)PM', date_str) else 0 + timezone, date_str = extract_timezone(date_str) + + # Remove AM/PM + timezone + date_str = re.sub(r'(?i)\s*(?:AM|PM)(?:\s+[A-Z]+)?', '', date_str) + + # Remove unrecognized timezones from ISO 8601 alike timestamps + m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P\s*[A-Z]+)$', date_str) + if m: + date_str = date_str[:-len(m.group('tz'))] + + # Python only supports microseconds, so remove nanoseconds + m = re.search(r'^([0-9]{4,}-[0-9]{1,2}-[0-9]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}\.[0-9]{6})[0-9]+$', date_str) + if m: + date_str = m.group(1) + + for expression in date_formats(day_first): + with contextlib.suppress(ValueError): + dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta) + return calendar.timegm(dt.timetuple()) + + timetuple = email.utils.parsedate_tz(date_str) + if timetuple: + return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds() + + +def determine_ext(url, default_ext='unknown_video'): + if url is None or '.' not in url: + return default_ext + guess = url.partition('?')[0].rpartition('.')[2] + if re.match(r'^[A-Za-z0-9]+$', guess): + return guess + # Try extract ext from URLs like http://example.com/foo/bar.mp4/?download + elif guess.rstrip('/') in KNOWN_EXTENSIONS: + return guess.rstrip('/') + else: + return default_ext + + +def subtitles_filename(filename, sub_lang, sub_format, expected_real_ext=None): + return replace_extension(filename, sub_lang + '.' + sub_format, expected_real_ext) + + +def datetime_from_str(date_str, precision='auto', format='%Y%m%d'): + R""" + Return a datetime object from a string. + Supported format: + (now|today|yesterday|DATE)([+-]\d+(microsecond|second|minute|hour|day|week|month|year)s?)? + + @param format strftime format of DATE + @param precision Round the datetime object: auto|microsecond|second|minute|hour|day + auto: round to the unit provided in date_str (if applicable). + """ + auto_precision = False + if precision == 'auto': + auto_precision = True + precision = 'microsecond' + today = datetime_round(datetime.datetime.utcnow(), precision) + if date_str in ('now', 'today'): + return today + if date_str == 'yesterday': + return today - datetime.timedelta(days=1) + match = re.match( + r'(?P.+)(?P[+-])(?P