diff options
Diffstat (limited to 'youtube/util.py')
-rw-r--r-- | youtube/util.py | 693 |
1 files changed, 611 insertions, 82 deletions
diff --git a/youtube/util.py b/youtube/util.py index a81ae83..c59fae8 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -1,14 +1,25 @@ +from datetime import datetime import settings -import socks, sockshandler +import socks +import sockshandler import gzip -import brotli +try: + import brotli + have_brotli = True +except ImportError: + have_brotli = False import urllib.parse import re import time import os +import json import gevent import gevent.queue import gevent.lock +import collections +import stem +import stem.control +import traceback # The trouble with the requests library: It ships its own certificate bundle via certifi # instead of using the system certificate store, meaning self-signed certificates @@ -45,34 +56,119 @@ import urllib3.contrib.socks URL_ORIGIN = "/https://www.youtube.com" -connection_pool = urllib3.PoolManager(cert_reqs = 'CERT_REQUIRED') +connection_pool = urllib3.PoolManager(cert_reqs='CERT_REQUIRED') + + +class TorManager: + MAX_TRIES = 3 + # Remember the 7-sec wait times, so make cooldown be two of those + # (otherwise it will retry forever if 429s never end) + COOLDOWN_TIME = 14 + + def __init__(self): + self.old_tor_connection_pool = None + self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( + 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', + cert_reqs='CERT_REQUIRED') + self.tor_pool_refresh_time = time.monotonic() + settings.add_setting_changed_hook( + 'tor_port', + lambda old_val, new_val: self.refresh_tor_connection_pool(), + ) + + self.new_identity_lock = gevent.lock.BoundedSemaphore(1) + self.last_new_identity_time = time.monotonic() - 20 + self.try_num = 1 + + def refresh_tor_connection_pool(self): + self.tor_connection_pool.clear() + + # Keep a reference for 5 min to avoid it getting garbage collected + # while sockets still in use + self.old_tor_connection_pool = self.tor_connection_pool + + self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager( + 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/', + cert_reqs='CERT_REQUIRED') + self.tor_pool_refresh_time = time.monotonic() + + def get_tor_connection_pool(self): + # Tor changes circuits after 10 minutes: + # https://tor.stackexchange.com/questions/262/for-how-long-does-a-circuit-stay-alive + current_time = time.monotonic() + + # close pool after 5 minutes + if current_time - self.tor_pool_refresh_time > 300: + self.refresh_tor_connection_pool() + + return self.tor_connection_pool + + def new_identity(self, time_failed_request_started): + '''return error, or None if no error and the identity is fresh''' + + # The overall pattern at maximum (always returning 429) will be + # R N (0) R N (6) R N (6) R | (12) R N (0) R N (6) ... + # where R is a request, N is a new identity, (x) is a wait time of + # x sec, and | is where we give up and display an error to the user. + + print('new_identity: new_identity called') + # blocks if another greenlet currently has the lock + self.new_identity_lock.acquire() + print('new_identity: New identity lock acquired') + + try: + # This was caused by a request that failed within a previous, + # stale identity + if time_failed_request_started <= self.last_new_identity_time: + print('new_identity: Cancelling; request was from stale identity') + return None + + delta = time.monotonic() - self.last_new_identity_time + if delta < self.COOLDOWN_TIME and self.try_num == 1: + err = ('Retried with new circuit %d times (max) within last ' + '%d seconds.' % (self.MAX_TRIES, self.COOLDOWN_TIME)) + print('new_identity:', err) + return err + elif delta >= self.COOLDOWN_TIME: + self.try_num = 1 + + try: + port = settings.tor_control_port + with stem.control.Controller.from_port(port=port) as controller: + controller.authenticate('') + print('new_identity: Getting new identity') + controller.signal(stem.Signal.NEWNYM) + print('new_identity: NEWNYM signal sent') + self.last_new_identity_time = time.monotonic() + self.refresh_tor_connection_pool() + except stem.SocketError: + traceback.print_exc() + return 'Failed to connect to Tor control port.' + finally: + original_try_num = self.try_num + self.try_num += 1 + if self.try_num > self.MAX_TRIES: + self.try_num = 1 + + # If we do the request right after second new identity it won't + # be a new IP, based on experiments. + # Not necessary after first new identity + if original_try_num > 1: + print('Sleeping for 7 seconds before retrying request') + time.sleep(7) # experimentally determined minimum + + return None + finally: + self.new_identity_lock.release() + + +tor_manager = TorManager() -old_tor_connection_pool = None -tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:9150/', cert_reqs = 'CERT_REQUIRED') - -tor_pool_refresh_time = time.monotonic() # prevent problems due to clock changes def get_pool(use_tor): - global old_tor_connection_pool - global tor_connection_pool - global tor_pool_refresh_time - if not use_tor: return connection_pool - - # Tor changes circuits after 10 minutes: https://tor.stackexchange.com/questions/262/for-how-long-does-a-circuit-stay-alive - current_time = time.monotonic() - if current_time - tor_pool_refresh_time > 300: # close pool after 5 minutes - tor_connection_pool.clear() - - # Keep a reference for 5 min to avoid it getting garbage collected while sockets still in use - old_tor_connection_pool = tor_connection_pool - - tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:9150/', cert_reqs = 'CERT_REQUIRED') - tor_pool_refresh_time = current_time - - return tor_connection_pool - + return tor_manager.get_tor_connection_pool() class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): @@ -96,6 +192,19 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): https_response = http_response +class FetchError(Exception): + def __init__(self, code, reason='', ip=None, error_message=None): + if error_message: + string = code + ' ' + reason + ': ' + error_message + else: + string = 'HTTP error during request: ' + code + ' ' + reason + Exception.__init__(self, string) + self.code = code + self.reason = reason + self.ip = ip + self.error_message = error_message + + def decode_content(content, encoding_header): encodings = encoding_header.replace(' ', '').split(',') for encoding in reversed(encodings): @@ -107,8 +216,12 @@ def decode_content(content, encoding_header): content = gzip.decompress(content) return content -def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True, return_response=False, debug_name=None): + +def fetch_url_response(url, headers=(), timeout=15, data=None, + cookiejar_send=None, cookiejar_receive=None, + use_tor=True, max_redirects=None): ''' + returns response, cleanup_function When cookiejar_send is set to a CookieJar object, those cookies will be sent in the request (but cookies in response will not be merged into it) When cookiejar_receive is set to a CookieJar object, @@ -117,7 +230,10 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookieja and response cookies will be merged into it. ''' headers = dict(headers) # Note: Calling dict() on a dict will make a copy - headers['Accept-Encoding'] = 'gzip, br' + if have_brotli: + headers['Accept-Encoding'] = 'gzip, br' + else: + headers['Accept-Encoding'] = 'gzip' # prevent python version being leaked by urllib if User-Agent isn't provided # (urllib will use ex. Python-urllib/3.6 otherwise) @@ -128,11 +244,10 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookieja if data is not None: method = "POST" if isinstance(data, str): - data = data.encode('ascii') + data = data.encode('utf-8') elif not isinstance(data, bytes): - data = urllib.parse.urlencode(data).encode('ascii') + data = urllib.parse.urlencode(data).encode('utf-8') - start_time = time.time() if cookiejar_send is not None or cookiejar_receive is not None: # Use urllib req = urllib.request.Request(url, data=data, headers=headers) @@ -140,50 +255,163 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookieja cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive) if use_tor and settings.route_tor: - opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9150), cookie_processor) + opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", settings.tor_port), cookie_processor) else: opener = urllib.request.build_opener(cookie_processor) response = opener.open(req, timeout=timeout) - response_time = time.time() - - - content = response.read() + cleanup_func = (lambda r: None) else: # Use a urllib3 pool. Cookies can't be used since urllib3 doesn't have easy support for them. + # default: Retry.DEFAULT = Retry(3) + # (in connectionpool.py in urllib3) + # According to the documentation for urlopen, a redirect counts as a + # retry. So there are 3 redirects max by default. + if max_redirects: + retries = urllib3.Retry(3+max_redirects, redirect=max_redirects, raise_on_redirect=False) + else: + retries = urllib3.Retry(3, raise_on_redirect=False) pool = get_pool(use_tor and settings.route_tor) - - response = pool.request(method, url, headers=headers, timeout=timeout, preload_content=False, decode_content=False) - response_time = time.time() + try: + response = pool.request(method, url, headers=headers, body=data, + timeout=timeout, preload_content=False, + decode_content=False, retries=retries) + response.retries = retries + except urllib3.exceptions.MaxRetryError as e: + exception_cause = e.__context__.__context__ + if (isinstance(exception_cause, socks.ProxyConnectionError) + and settings.route_tor): + msg = ('Failed to connect to Tor. Check that Tor is open and ' + 'that your internet connection is working.\n\n' + + str(e)) + raise FetchError('502', reason='Bad Gateway', + error_message=msg) + elif isinstance(e.__context__, + urllib3.exceptions.NewConnectionError): + msg = 'Failed to establish a connection.\n\n' + str(e) + raise FetchError( + '502', reason='Bad Gateway', + error_message=msg) + else: + raise + cleanup_func = (lambda r: r.release_conn()) + + return response, cleanup_func + + +def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, + cookiejar_send=None, cookiejar_receive=None, use_tor=True, + debug_name=None): + while True: + start_time = time.monotonic() + + response, cleanup_func = fetch_url_response( + url, headers, timeout=timeout, data=data, + cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive, + use_tor=use_tor) + response_time = time.monotonic() content = response.read() - response.release_conn() - read_finish = time.time() + read_finish = time.monotonic() + + cleanup_func(response) # release_connection for urllib3 + content = decode_content( + content, + response.headers.get('Content-Encoding', default='identity')) + + if (settings.debugging_save_responses + and debug_name is not None + and content): + save_dir = os.path.join(settings.data_dir, 'debug') + if not os.path.exists(save_dir): + os.makedirs(save_dir) + + with open(os.path.join(save_dir, debug_name), 'wb') as f: + f.write(content) + + if response.status == 429 or ( + response.status == 302 and (response.getheader('Location') == url + or response.getheader('Location').startswith( + 'https://www.google.com/sorry/index' + ) + ) + ): + print(response.status, response.reason, response.headers) + ip = re.search( + br'IP address: ((?:[\da-f]*:)+[\da-f]+|(?:\d+\.)+\d+)', + content) + ip = ip.group(1).decode('ascii') if ip else None + if not ip: + ip = re.search(r'IP=((?:\d+\.)+\d+)', + response.getheader('Set-Cookie') or '') + ip = ip.group(1) if ip else None + + # don't get new identity if we're not using Tor + if not use_tor: + raise FetchError('429', reason=response.reason, ip=ip) + + print('Error: YouTube blocked the request because the Tor exit node is overutilized. Exit node IP address: %s' % ip) + + # get new identity + error = tor_manager.new_identity(start_time) + if error: + raise FetchError( + '429', reason=response.reason, ip=ip, + error_message='Automatic circuit change: ' + error) + else: + continue # retry now that we have new identity + + elif response.status >= 400: + raise FetchError(str(response.status), reason=response.reason, + ip=None) + break + if report_text: - print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3)) - content = decode_content(content, response.getheader('Content-Encoding', default='identity')) + print(report_text, ' Latency:', round(response_time - start_time, 3), ' Read time:', round(read_finish - response_time,3)) - if settings.debugging_save_responses and debug_name is not None: - save_dir = os.path.join(settings.data_dir, 'debug') - if not os.path.exists(save_dir): - os.makedirs(save_dir) + return content - with open(os.path.join(save_dir, debug_name), 'wb') as f: - f.write(content) - if return_response: - return content, response - return content +def head(url, use_tor=False, report_text=None, max_redirects=10): + pool = get_pool(use_tor and settings.route_tor) + start_time = time.monotonic() + + # default: Retry.DEFAULT = Retry(3) + # (in connectionpool.py in urllib3) + # According to the documentation for urlopen, a redirect counts as a retry + # So there are 3 redirects max by default. Let's change that + # to 10 since googlevideo redirects a lot. + retries = urllib3.Retry( + 3+max_redirects, + redirect=max_redirects, + raise_on_redirect=False) + headers = {'User-Agent': 'Python-urllib'} + response = pool.request('HEAD', url, headers=headers, retries=retries) + if report_text: + print( + report_text, + ' Latency:', + round(time.monotonic() - start_time, 3)) + return response -mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1' +mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36' mobile_ua = (('User-Agent', mobile_user_agent),) desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0' desktop_ua = (('User-Agent', desktop_user_agent),) - - - - +json_header = (('Content-Type', 'application/json'),) +desktop_xhr_headers = ( + ('Accept', '*/*'), + ('Accept-Language', 'en-US,en;q=0.5'), + ('X-YouTube-Client-Name', '1'), + ('X-YouTube-Client-Version', '2.20240304.00.00'), +) + desktop_ua +mobile_xhr_headers = ( + ('Accept', '*/*'), + ('Accept-Language', 'en-US,en;q=0.5'), + ('X-YouTube-Client-Name', '2'), + ('X-YouTube-Client-Version', '2.20240304.08.00'), +) + mobile_ua class RateLimitedQueue(gevent.queue.Queue): @@ -202,7 +430,6 @@ class RateLimitedQueue(gevent.queue.Queue): self.empty_start = 0 gevent.queue.Queue.__init__(self) - def get(self): self.lock.acquire() # blocks if another greenlet currently has the lock if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial: @@ -234,9 +461,8 @@ class RateLimitedQueue(gevent.queue.Queue): return item - def download_thumbnail(save_directory, video_id): - url = "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg" + url = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" save_location = os.path.join(save_directory, video_id + ".jpg") try: thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id) @@ -246,12 +472,13 @@ def download_thumbnail(save_directory, video_id): try: f = open(save_location, 'wb') except FileNotFoundError: - os.makedirs(save_directory, exist_ok = True) + os.makedirs(save_directory, exist_ok=True) f = open(save_location, 'wb') f.write(thumbnail) f.close() return True + def download_thumbnails(save_directory, ids): if not isinstance(ids, (list, tuple)): ids = list(ids) @@ -264,37 +491,26 @@ def download_thumbnails(save_directory, ids): gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i*5 + 5, len(ids))]) - - - - def dict_add(*dicts): for dictionary in dicts[1:]: dicts[0].update(dictionary) return dicts[0] + def video_id(url): url_parts = urllib.parse.urlparse(url) return urllib.parse.parse_qs(url_parts.query)['v'][0] -def default_multi_get(object, *keys, default): - ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors ''' - try: - for key in keys: - object = object[key] - return object - except (IndexError, KeyError): - return default - # default, sddefault, mqdefault, hqdefault, hq720 def get_thumbnail_url(video_id): - return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg" - + return f"{settings.img_prefix}https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" + + def seconds_to_timestamp(seconds): seconds = int(seconds) - hours, seconds = divmod(seconds,3600) - minutes, seconds = divmod(seconds,60) + hours, seconds = divmod(seconds, 3600) + minutes, seconds = divmod(seconds, 60) if hours != 0: timestamp = str(hours) + ":" timestamp += str(minutes).zfill(2) # zfill pads with zeros @@ -305,19 +521,332 @@ def seconds_to_timestamp(seconds): return timestamp - def update_query_string(query_string, items): parameters = urllib.parse.parse_qs(query_string) parameters.update(items) return urllib.parse.urlencode(parameters, doseq=True) +YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com') +YOUTUBE_URL_RE_STR = r'https?://(?:[a-zA-Z0-9_-]*\.)?(?:' +YOUTUBE_URL_RE_STR += r'|'.join(map(re.escape, YOUTUBE_DOMAINS)) +YOUTUBE_URL_RE_STR += r')(?:/[^"]*)?' +YOUTUBE_URL_RE = re.compile(YOUTUBE_URL_RE_STR) -def uppercase_escape(s): - return re.sub( - r'\\U([0-9a-fA-F]{8})', - lambda m: chr(int(m.group(1), base=16)), s) def prefix_url(url): + if url is None: + return None url = url.lstrip('/') # some urls have // before them, which has a special meaning return '/' + url + + +def left_remove(string, substring): + '''removes substring from the start of string, if present''' + if string.startswith(substring): + return string[len(substring):] + return string + + +def concat_or_none(*strings): + '''Concatenates strings. Returns None if any of the arguments are None''' + result = '' + for string in strings: + if string is None: + return None + result += string + return result + + +def prefix_urls(item): + if settings.proxy_images: + try: + item['thumbnail'] = prefix_url(item['thumbnail']) + except KeyError: + pass + + try: + item['author_url'] = prefix_url(item['author_url']) + except KeyError: + pass + + +def add_extra_html_info(item): + if item['type'] == 'video': + item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None + + video_info = {} + for key in ('id', 'title', 'author', 'duration', 'author_id'): + try: + video_info[key] = item[key] + except KeyError: + video_info[key] = None + + item['video_info'] = json.dumps(video_info) + + elif item['type'] == 'playlist' and item['playlist_type'] == 'radio': + item['url'] = concat_or_none( + URL_ORIGIN, + '/watch?v=', item['first_video_id'], + '&list=', item['id'] + ) + elif item['type'] == 'playlist': + item['url'] = concat_or_none(URL_ORIGIN, '/playlist?list=', item['id']) + elif item['type'] == 'channel': + item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id']) + + if item.get('author_id') and 'author_url' not in item: + item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id'] + + +def check_gevent_exceptions(*tasks): + for task in tasks: + if task.exception: + raise task.exception + + +# https://stackoverflow.com/a/62888 +replacement_map = collections.OrderedDict([ + ('<', '_'), + ('>', '_'), + (': ', ' - '), + (':', '-'), + ('"', "'"), + ('/', '_'), + ('\\', '_'), + ('|', '-'), + ('?', ''), + ('*', '_'), + ('\t', ' '), +]) + +DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3', + 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0', + 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', + 'lpt8', 'lpt9'} + + +def to_valid_filename(name): + '''Changes the name so it's valid on Windows, Linux, and Mac''' + # See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file + # for Windows specs + + # Additional recommendations for Linux: + # https://dwheeler.com/essays/fixing-unix-linux-filenames.html#standards + + # remove control characters + name = re.sub(r'[\x00-\x1f]', '_', name) + + # reserved characters + for reserved_char, replacement in replacement_map.items(): + name = name.replace(reserved_char, replacement) + + # check for all periods/spaces + if all(c == '.' or c == ' ' for c in name): + name = '_'*len(name) + + # remove trailing periods and spaces + name = name.rstrip('. ') + + # check for reserved DOS names, such as nul or nul.txt + base_ext_parts = name.rsplit('.', maxsplit=1) + if base_ext_parts[0].lower() in DOS_names: + base_ext_parts[0] += '_' + name = '.'.join(base_ext_parts) + + # check for blank name + if name == '': + name = '_' + + # check if name begins with a hyphen, period, or space + if name[0] in ('-', '.', ' '): + name = '_' + name + + return name + + +# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/youtube.py#L72 +INNERTUBE_CLIENTS = { + 'android': { + 'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w', + 'INNERTUBE_CONTEXT': { + 'client': { + 'hl': 'en', + 'gl': 'US', + 'clientName': 'ANDROID', + 'clientVersion': '19.09.36', + 'osName': 'Android', + 'osVersion': '12', + 'androidSdkVersion': 31, + 'platform': 'MOBILE', + 'userAgent': 'com.google.android.youtube/19.09.36 (Linux; U; Android 12; US) gzip' + }, + # https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287 + #'thirdParty': { + # 'embedUrl': 'https://google.com', # Can be any valid URL + #} + }, + 'INNERTUBE_CONTEXT_CLIENT_NAME': 3, + 'REQUIRE_JS_PLAYER': False, + }, + + 'android-test-suite': { + 'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w', + 'INNERTUBE_CONTEXT': { + 'client': { + 'hl': 'en', + 'gl': 'US', + 'clientName': 'ANDROID_TESTSUITE', + 'clientVersion': '1.9', + 'osName': 'Android', + 'osVersion': '12', + 'androidSdkVersion': 31, + 'platform': 'MOBILE', + 'userAgent': 'com.google.android.youtube/1.9 (Linux; U; Android 12; US) gzip' + }, + # https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287 + #'thirdParty': { + # 'embedUrl': 'https://google.com', # Can be any valid URL + #} + }, + 'INNERTUBE_CONTEXT_CLIENT_NAME': 3, + 'REQUIRE_JS_PLAYER': False, + }, + + 'ios': { + 'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc', + 'INNERTUBE_CONTEXT': { + 'client': { + 'hl': 'en', + 'gl': 'US', + 'clientName': 'IOS', + 'clientVersion': '19.09.3', + 'deviceModel': 'iPhone14,3', + 'userAgent': 'com.google.ios.youtube/19.09.3 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)' + } + }, + 'INNERTUBE_CONTEXT_CLIENT_NAME': 5, + 'REQUIRE_JS_PLAYER': False + }, + + # This client can access age restricted videos (unless the uploader has disabled the 'allow embedding' option) + # See: https://github.com/zerodytrash/YouTube-Internal-Clients + 'tv_embedded': { + 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8', + 'INNERTUBE_CONTEXT': { + 'client': { + 'hl': 'en', + 'gl': 'US', + 'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER', + 'clientVersion': '2.0', + 'clientScreen': 'EMBED', + }, + # https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287 + 'thirdParty': { + 'embedUrl': 'https://google.com', # Can be any valid URL + } + + }, + 'INNERTUBE_CONTEXT_CLIENT_NAME': 85, + 'REQUIRE_JS_PLAYER': True, + }, + + 'web': { + 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8', + 'INNERTUBE_CONTEXT': { + 'client': { + 'clientName': 'WEB', + 'clientVersion': '2.20220801.00.00', + 'userAgent': desktop_user_agent, + } + }, + 'INNERTUBE_CONTEXT_CLIENT_NAME': 1 + }, + 'android_vr': { + 'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w', + 'INNERTUBE_CONTEXT': { + 'client': { + 'clientName': 'ANDROID_VR', + 'clientVersion': '1.60.19', + 'deviceMake': 'Oculus', + 'deviceModel': 'Quest 3', + 'androidSdkVersion': 32, + 'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.60.19 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip', + 'osName': 'Android', + 'osVersion': '12L', + }, + }, + 'INNERTUBE_CONTEXT_CLIENT_NAME': 28, + 'REQUIRE_JS_PLAYER': False, + }, +} + +def get_visitor_data(): + visitor_data = None + visitor_data_cache = os.path.join(settings.data_dir, 'visitorData.txt') + if not os.path.exists(settings.data_dir): + os.makedirs(settings.data_dir) + if os.path.isfile(visitor_data_cache): + with open(visitor_data_cache, 'r') as file: + print('Getting visitor_data from cache') + visitor_data = file.read() + max_age = 12*3600 + file_age = time.time() - os.path.getmtime(visitor_data_cache) + if file_age > max_age: + print('visitor_data cache is too old. Removing file...') + os.remove(visitor_data_cache) + return visitor_data + + print('Fetching youtube homepage to get visitor_data') + yt_homepage = 'https://www.youtube.com' + yt_resp = fetch_url(yt_homepage, headers={'User-Agent': mobile_user_agent}, report_text='Getting youtube homepage') + visitor_data_re = r'''"visitorData":\s*?"(.+?)"''' + visitor_data_match = re.search(visitor_data_re, yt_resp.decode()) + if visitor_data_match: + visitor_data = visitor_data_match.group(1) + print(f'Got visitor_data: {len(visitor_data)}') + with open(visitor_data_cache, 'w') as file: + print('Saving visitor_data cache...') + file.write(visitor_data) + return visitor_data + else: + print('Unable to get visitor_data value') + return visitor_data + +def call_youtube_api(client, api, data): + client_params = INNERTUBE_CLIENTS[client] + context = client_params['INNERTUBE_CONTEXT'] + key = client_params['INNERTUBE_API_KEY'] + host = client_params.get('INNERTUBE_HOST') or 'www.youtube.com' + user_agent = context['client'].get('userAgent') or mobile_user_agent + visitor_data = get_visitor_data() + + url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key + if visitor_data: + context['client'].update({'visitorData': visitor_data}) + data['context'] = context + + data = json.dumps(data) + headers = (('Content-Type', 'application/json'),('User-Agent', user_agent)) + if visitor_data: + headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data )) + response = fetch_url( + url, data=data, headers=headers, + debug_name='youtubei_' + api + '_' + client, + report_text='Fetched ' + client + ' youtubei ' + api + ).decode('utf-8') + return response + + +def strip_non_ascii(string): + ''' Returns the string without non ASCII characters''' + if string is None: + return "" + stripped = (c for c in string if 0 < ord(c) < 127) + return ''.join(stripped) + + +def time_utc_isoformat(string): + t = datetime.strptime(string, '%Y-%m-%d') + t = t.astimezone().isoformat() + return t |