aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/util.py')
-rw-r--r--youtube/util.py693
1 files changed, 611 insertions, 82 deletions
diff --git a/youtube/util.py b/youtube/util.py
index a81ae83..c59fae8 100644
--- a/youtube/util.py
+++ b/youtube/util.py
@@ -1,14 +1,25 @@
+from datetime import datetime
import settings
-import socks, sockshandler
+import socks
+import sockshandler
import gzip
-import brotli
+try:
+ import brotli
+ have_brotli = True
+except ImportError:
+ have_brotli = False
import urllib.parse
import re
import time
import os
+import json
import gevent
import gevent.queue
import gevent.lock
+import collections
+import stem
+import stem.control
+import traceback
# The trouble with the requests library: It ships its own certificate bundle via certifi
# instead of using the system certificate store, meaning self-signed certificates
@@ -45,34 +56,119 @@ import urllib3.contrib.socks
URL_ORIGIN = "/https://www.youtube.com"
-connection_pool = urllib3.PoolManager(cert_reqs = 'CERT_REQUIRED')
+connection_pool = urllib3.PoolManager(cert_reqs='CERT_REQUIRED')
+
+
+class TorManager:
+ MAX_TRIES = 3
+ # Remember the 7-sec wait times, so make cooldown be two of those
+ # (otherwise it will retry forever if 429s never end)
+ COOLDOWN_TIME = 14
+
+ def __init__(self):
+ self.old_tor_connection_pool = None
+ self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
+ 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
+ cert_reqs='CERT_REQUIRED')
+ self.tor_pool_refresh_time = time.monotonic()
+ settings.add_setting_changed_hook(
+ 'tor_port',
+ lambda old_val, new_val: self.refresh_tor_connection_pool(),
+ )
+
+ self.new_identity_lock = gevent.lock.BoundedSemaphore(1)
+ self.last_new_identity_time = time.monotonic() - 20
+ self.try_num = 1
+
+ def refresh_tor_connection_pool(self):
+ self.tor_connection_pool.clear()
+
+ # Keep a reference for 5 min to avoid it getting garbage collected
+ # while sockets still in use
+ self.old_tor_connection_pool = self.tor_connection_pool
+
+ self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
+ 'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
+ cert_reqs='CERT_REQUIRED')
+ self.tor_pool_refresh_time = time.monotonic()
+
+ def get_tor_connection_pool(self):
+ # Tor changes circuits after 10 minutes:
+ # https://tor.stackexchange.com/questions/262/for-how-long-does-a-circuit-stay-alive
+ current_time = time.monotonic()
+
+ # close pool after 5 minutes
+ if current_time - self.tor_pool_refresh_time > 300:
+ self.refresh_tor_connection_pool()
+
+ return self.tor_connection_pool
+
+ def new_identity(self, time_failed_request_started):
+ '''return error, or None if no error and the identity is fresh'''
+
+ # The overall pattern at maximum (always returning 429) will be
+ # R N (0) R N (6) R N (6) R | (12) R N (0) R N (6) ...
+ # where R is a request, N is a new identity, (x) is a wait time of
+ # x sec, and | is where we give up and display an error to the user.
+
+ print('new_identity: new_identity called')
+ # blocks if another greenlet currently has the lock
+ self.new_identity_lock.acquire()
+ print('new_identity: New identity lock acquired')
+
+ try:
+ # This was caused by a request that failed within a previous,
+ # stale identity
+ if time_failed_request_started <= self.last_new_identity_time:
+ print('new_identity: Cancelling; request was from stale identity')
+ return None
+
+ delta = time.monotonic() - self.last_new_identity_time
+ if delta < self.COOLDOWN_TIME and self.try_num == 1:
+ err = ('Retried with new circuit %d times (max) within last '
+ '%d seconds.' % (self.MAX_TRIES, self.COOLDOWN_TIME))
+ print('new_identity:', err)
+ return err
+ elif delta >= self.COOLDOWN_TIME:
+ self.try_num = 1
+
+ try:
+ port = settings.tor_control_port
+ with stem.control.Controller.from_port(port=port) as controller:
+ controller.authenticate('')
+ print('new_identity: Getting new identity')
+ controller.signal(stem.Signal.NEWNYM)
+ print('new_identity: NEWNYM signal sent')
+ self.last_new_identity_time = time.monotonic()
+ self.refresh_tor_connection_pool()
+ except stem.SocketError:
+ traceback.print_exc()
+ return 'Failed to connect to Tor control port.'
+ finally:
+ original_try_num = self.try_num
+ self.try_num += 1
+ if self.try_num > self.MAX_TRIES:
+ self.try_num = 1
+
+ # If we do the request right after second new identity it won't
+ # be a new IP, based on experiments.
+ # Not necessary after first new identity
+ if original_try_num > 1:
+ print('Sleeping for 7 seconds before retrying request')
+ time.sleep(7) # experimentally determined minimum
+
+ return None
+ finally:
+ self.new_identity_lock.release()
+
+
+tor_manager = TorManager()
-old_tor_connection_pool = None
-tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:9150/', cert_reqs = 'CERT_REQUIRED')
-
-tor_pool_refresh_time = time.monotonic() # prevent problems due to clock changes
def get_pool(use_tor):
- global old_tor_connection_pool
- global tor_connection_pool
- global tor_pool_refresh_time
-
if not use_tor:
return connection_pool
-
- # Tor changes circuits after 10 minutes: https://tor.stackexchange.com/questions/262/for-how-long-does-a-circuit-stay-alive
- current_time = time.monotonic()
- if current_time - tor_pool_refresh_time > 300: # close pool after 5 minutes
- tor_connection_pool.clear()
-
- # Keep a reference for 5 min to avoid it getting garbage collected while sockets still in use
- old_tor_connection_pool = tor_connection_pool
-
- tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:9150/', cert_reqs = 'CERT_REQUIRED')
- tor_pool_refresh_time = current_time
-
- return tor_connection_pool
-
+ return tor_manager.get_tor_connection_pool()
class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
@@ -96,6 +192,19 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
https_response = http_response
+class FetchError(Exception):
+ def __init__(self, code, reason='', ip=None, error_message=None):
+ if error_message:
+ string = code + ' ' + reason + ': ' + error_message
+ else:
+ string = 'HTTP error during request: ' + code + ' ' + reason
+ Exception.__init__(self, string)
+ self.code = code
+ self.reason = reason
+ self.ip = ip
+ self.error_message = error_message
+
+
def decode_content(content, encoding_header):
encodings = encoding_header.replace(' ', '').split(',')
for encoding in reversed(encodings):
@@ -107,8 +216,12 @@ def decode_content(content, encoding_header):
content = gzip.decompress(content)
return content
-def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True, return_response=False, debug_name=None):
+
+def fetch_url_response(url, headers=(), timeout=15, data=None,
+ cookiejar_send=None, cookiejar_receive=None,
+ use_tor=True, max_redirects=None):
'''
+ returns response, cleanup_function
When cookiejar_send is set to a CookieJar object,
those cookies will be sent in the request (but cookies in response will not be merged into it)
When cookiejar_receive is set to a CookieJar object,
@@ -117,7 +230,10 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookieja
and response cookies will be merged into it.
'''
headers = dict(headers) # Note: Calling dict() on a dict will make a copy
- headers['Accept-Encoding'] = 'gzip, br'
+ if have_brotli:
+ headers['Accept-Encoding'] = 'gzip, br'
+ else:
+ headers['Accept-Encoding'] = 'gzip'
# prevent python version being leaked by urllib if User-Agent isn't provided
# (urllib will use ex. Python-urllib/3.6 otherwise)
@@ -128,11 +244,10 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookieja
if data is not None:
method = "POST"
if isinstance(data, str):
- data = data.encode('ascii')
+ data = data.encode('utf-8')
elif not isinstance(data, bytes):
- data = urllib.parse.urlencode(data).encode('ascii')
+ data = urllib.parse.urlencode(data).encode('utf-8')
- start_time = time.time()
if cookiejar_send is not None or cookiejar_receive is not None: # Use urllib
req = urllib.request.Request(url, data=data, headers=headers)
@@ -140,50 +255,163 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookieja
cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
if use_tor and settings.route_tor:
- opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9150), cookie_processor)
+ opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", settings.tor_port), cookie_processor)
else:
opener = urllib.request.build_opener(cookie_processor)
response = opener.open(req, timeout=timeout)
- response_time = time.time()
-
-
- content = response.read()
+ cleanup_func = (lambda r: None)
else: # Use a urllib3 pool. Cookies can't be used since urllib3 doesn't have easy support for them.
+ # default: Retry.DEFAULT = Retry(3)
+ # (in connectionpool.py in urllib3)
+ # According to the documentation for urlopen, a redirect counts as a
+ # retry. So there are 3 redirects max by default.
+ if max_redirects:
+ retries = urllib3.Retry(3+max_redirects, redirect=max_redirects, raise_on_redirect=False)
+ else:
+ retries = urllib3.Retry(3, raise_on_redirect=False)
pool = get_pool(use_tor and settings.route_tor)
-
- response = pool.request(method, url, headers=headers, timeout=timeout, preload_content=False, decode_content=False)
- response_time = time.time()
+ try:
+ response = pool.request(method, url, headers=headers, body=data,
+ timeout=timeout, preload_content=False,
+ decode_content=False, retries=retries)
+ response.retries = retries
+ except urllib3.exceptions.MaxRetryError as e:
+ exception_cause = e.__context__.__context__
+ if (isinstance(exception_cause, socks.ProxyConnectionError)
+ and settings.route_tor):
+ msg = ('Failed to connect to Tor. Check that Tor is open and '
+ 'that your internet connection is working.\n\n'
+ + str(e))
+ raise FetchError('502', reason='Bad Gateway',
+ error_message=msg)
+ elif isinstance(e.__context__,
+ urllib3.exceptions.NewConnectionError):
+ msg = 'Failed to establish a connection.\n\n' + str(e)
+ raise FetchError(
+ '502', reason='Bad Gateway',
+ error_message=msg)
+ else:
+ raise
+ cleanup_func = (lambda r: r.release_conn())
+
+ return response, cleanup_func
+
+
+def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
+ cookiejar_send=None, cookiejar_receive=None, use_tor=True,
+ debug_name=None):
+ while True:
+ start_time = time.monotonic()
+
+ response, cleanup_func = fetch_url_response(
+ url, headers, timeout=timeout, data=data,
+ cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive,
+ use_tor=use_tor)
+ response_time = time.monotonic()
content = response.read()
- response.release_conn()
- read_finish = time.time()
+ read_finish = time.monotonic()
+
+ cleanup_func(response) # release_connection for urllib3
+ content = decode_content(
+ content,
+ response.headers.get('Content-Encoding', default='identity'))
+
+ if (settings.debugging_save_responses
+ and debug_name is not None
+ and content):
+ save_dir = os.path.join(settings.data_dir, 'debug')
+ if not os.path.exists(save_dir):
+ os.makedirs(save_dir)
+
+ with open(os.path.join(save_dir, debug_name), 'wb') as f:
+ f.write(content)
+
+ if response.status == 429 or (
+ response.status == 302 and (response.getheader('Location') == url
+ or response.getheader('Location').startswith(
+ 'https://www.google.com/sorry/index'
+ )
+ )
+ ):
+ print(response.status, response.reason, response.headers)
+ ip = re.search(
+ br'IP address: ((?:[\da-f]*:)+[\da-f]+|(?:\d+\.)+\d+)',
+ content)
+ ip = ip.group(1).decode('ascii') if ip else None
+ if not ip:
+ ip = re.search(r'IP=((?:\d+\.)+\d+)',
+ response.getheader('Set-Cookie') or '')
+ ip = ip.group(1) if ip else None
+
+ # don't get new identity if we're not using Tor
+ if not use_tor:
+ raise FetchError('429', reason=response.reason, ip=ip)
+
+ print('Error: YouTube blocked the request because the Tor exit node is overutilized. Exit node IP address: %s' % ip)
+
+ # get new identity
+ error = tor_manager.new_identity(start_time)
+ if error:
+ raise FetchError(
+ '429', reason=response.reason, ip=ip,
+ error_message='Automatic circuit change: ' + error)
+ else:
+ continue # retry now that we have new identity
+
+ elif response.status >= 400:
+ raise FetchError(str(response.status), reason=response.reason,
+ ip=None)
+ break
+
if report_text:
- print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
- content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
+ print(report_text, ' Latency:', round(response_time - start_time, 3), ' Read time:', round(read_finish - response_time,3))
- if settings.debugging_save_responses and debug_name is not None:
- save_dir = os.path.join(settings.data_dir, 'debug')
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
+ return content
- with open(os.path.join(save_dir, debug_name), 'wb') as f:
- f.write(content)
- if return_response:
- return content, response
- return content
+def head(url, use_tor=False, report_text=None, max_redirects=10):
+ pool = get_pool(use_tor and settings.route_tor)
+ start_time = time.monotonic()
+
+ # default: Retry.DEFAULT = Retry(3)
+ # (in connectionpool.py in urllib3)
+ # According to the documentation for urlopen, a redirect counts as a retry
+ # So there are 3 redirects max by default. Let's change that
+ # to 10 since googlevideo redirects a lot.
+ retries = urllib3.Retry(
+ 3+max_redirects,
+ redirect=max_redirects,
+ raise_on_redirect=False)
+ headers = {'User-Agent': 'Python-urllib'}
+ response = pool.request('HEAD', url, headers=headers, retries=retries)
+ if report_text:
+ print(
+ report_text,
+ ' Latency:',
+ round(time.monotonic() - start_time, 3))
+ return response
-mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
+mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
mobile_ua = (('User-Agent', mobile_user_agent),)
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_ua = (('User-Agent', desktop_user_agent),)
-
-
-
-
+json_header = (('Content-Type', 'application/json'),)
+desktop_xhr_headers = (
+ ('Accept', '*/*'),
+ ('Accept-Language', 'en-US,en;q=0.5'),
+ ('X-YouTube-Client-Name', '1'),
+ ('X-YouTube-Client-Version', '2.20240304.00.00'),
+) + desktop_ua
+mobile_xhr_headers = (
+ ('Accept', '*/*'),
+ ('Accept-Language', 'en-US,en;q=0.5'),
+ ('X-YouTube-Client-Name', '2'),
+ ('X-YouTube-Client-Version', '2.20240304.08.00'),
+) + mobile_ua
class RateLimitedQueue(gevent.queue.Queue):
@@ -202,7 +430,6 @@ class RateLimitedQueue(gevent.queue.Queue):
self.empty_start = 0
gevent.queue.Queue.__init__(self)
-
def get(self):
self.lock.acquire() # blocks if another greenlet currently has the lock
if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial:
@@ -234,9 +461,8 @@ class RateLimitedQueue(gevent.queue.Queue):
return item
-
def download_thumbnail(save_directory, video_id):
- url = "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
+ url = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
save_location = os.path.join(save_directory, video_id + ".jpg")
try:
thumbnail = fetch_url(url, report_text="Saved thumbnail: " + video_id)
@@ -246,12 +472,13 @@ def download_thumbnail(save_directory, video_id):
try:
f = open(save_location, 'wb')
except FileNotFoundError:
- os.makedirs(save_directory, exist_ok = True)
+ os.makedirs(save_directory, exist_ok=True)
f = open(save_location, 'wb')
f.write(thumbnail)
f.close()
return True
+
def download_thumbnails(save_directory, ids):
if not isinstance(ids, (list, tuple)):
ids = list(ids)
@@ -264,37 +491,26 @@ def download_thumbnails(save_directory, ids):
gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i*5 + 5, len(ids))])
-
-
-
-
def dict_add(*dicts):
for dictionary in dicts[1:]:
dicts[0].update(dictionary)
return dicts[0]
+
def video_id(url):
url_parts = urllib.parse.urlparse(url)
return urllib.parse.parse_qs(url_parts.query)['v'][0]
-def default_multi_get(object, *keys, default):
- ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
- try:
- for key in keys:
- object = object[key]
- return object
- except (IndexError, KeyError):
- return default
-
# default, sddefault, mqdefault, hqdefault, hq720
def get_thumbnail_url(video_id):
- return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
-
+ return f"{settings.img_prefix}https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
+
+
def seconds_to_timestamp(seconds):
seconds = int(seconds)
- hours, seconds = divmod(seconds,3600)
- minutes, seconds = divmod(seconds,60)
+ hours, seconds = divmod(seconds, 3600)
+ minutes, seconds = divmod(seconds, 60)
if hours != 0:
timestamp = str(hours) + ":"
timestamp += str(minutes).zfill(2) # zfill pads with zeros
@@ -305,19 +521,332 @@ def seconds_to_timestamp(seconds):
return timestamp
-
def update_query_string(query_string, items):
parameters = urllib.parse.parse_qs(query_string)
parameters.update(items)
return urllib.parse.urlencode(parameters, doseq=True)
+YOUTUBE_DOMAINS = ('youtube.com', 'youtu.be', 'youtube-nocookie.com')
+YOUTUBE_URL_RE_STR = r'https?://(?:[a-zA-Z0-9_-]*\.)?(?:'
+YOUTUBE_URL_RE_STR += r'|'.join(map(re.escape, YOUTUBE_DOMAINS))
+YOUTUBE_URL_RE_STR += r')(?:/[^"]*)?'
+YOUTUBE_URL_RE = re.compile(YOUTUBE_URL_RE_STR)
-def uppercase_escape(s):
- return re.sub(
- r'\\U([0-9a-fA-F]{8})',
- lambda m: chr(int(m.group(1), base=16)), s)
def prefix_url(url):
+ if url is None:
+ return None
url = url.lstrip('/') # some urls have // before them, which has a special meaning
return '/' + url
+
+
+def left_remove(string, substring):
+ '''removes substring from the start of string, if present'''
+ if string.startswith(substring):
+ return string[len(substring):]
+ return string
+
+
+def concat_or_none(*strings):
+ '''Concatenates strings. Returns None if any of the arguments are None'''
+ result = ''
+ for string in strings:
+ if string is None:
+ return None
+ result += string
+ return result
+
+
+def prefix_urls(item):
+ if settings.proxy_images:
+ try:
+ item['thumbnail'] = prefix_url(item['thumbnail'])
+ except KeyError:
+ pass
+
+ try:
+ item['author_url'] = prefix_url(item['author_url'])
+ except KeyError:
+ pass
+
+
+def add_extra_html_info(item):
+ if item['type'] == 'video':
+ item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
+
+ video_info = {}
+ for key in ('id', 'title', 'author', 'duration', 'author_id'):
+ try:
+ video_info[key] = item[key]
+ except KeyError:
+ video_info[key] = None
+
+ item['video_info'] = json.dumps(video_info)
+
+ elif item['type'] == 'playlist' and item['playlist_type'] == 'radio':
+ item['url'] = concat_or_none(
+ URL_ORIGIN,
+ '/watch?v=', item['first_video_id'],
+ '&list=', item['id']
+ )
+ elif item['type'] == 'playlist':
+ item['url'] = concat_or_none(URL_ORIGIN, '/playlist?list=', item['id'])
+ elif item['type'] == 'channel':
+ item['url'] = concat_or_none(URL_ORIGIN, "/channel/", item['id'])
+
+ if item.get('author_id') and 'author_url' not in item:
+ item['author_url'] = URL_ORIGIN + '/channel/' + item['author_id']
+
+
+def check_gevent_exceptions(*tasks):
+ for task in tasks:
+ if task.exception:
+ raise task.exception
+
+
+# https://stackoverflow.com/a/62888
+replacement_map = collections.OrderedDict([
+ ('<', '_'),
+ ('>', '_'),
+ (': ', ' - '),
+ (':', '-'),
+ ('"', "'"),
+ ('/', '_'),
+ ('\\', '_'),
+ ('|', '-'),
+ ('?', ''),
+ ('*', '_'),
+ ('\t', ' '),
+])
+
+DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
+ 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
+ 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
+ 'lpt8', 'lpt9'}
+
+
+def to_valid_filename(name):
+ '''Changes the name so it's valid on Windows, Linux, and Mac'''
+ # See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file
+ # for Windows specs
+
+ # Additional recommendations for Linux:
+ # https://dwheeler.com/essays/fixing-unix-linux-filenames.html#standards
+
+ # remove control characters
+ name = re.sub(r'[\x00-\x1f]', '_', name)
+
+ # reserved characters
+ for reserved_char, replacement in replacement_map.items():
+ name = name.replace(reserved_char, replacement)
+
+ # check for all periods/spaces
+ if all(c == '.' or c == ' ' for c in name):
+ name = '_'*len(name)
+
+ # remove trailing periods and spaces
+ name = name.rstrip('. ')
+
+ # check for reserved DOS names, such as nul or nul.txt
+ base_ext_parts = name.rsplit('.', maxsplit=1)
+ if base_ext_parts[0].lower() in DOS_names:
+ base_ext_parts[0] += '_'
+ name = '.'.join(base_ext_parts)
+
+ # check for blank name
+ if name == '':
+ name = '_'
+
+ # check if name begins with a hyphen, period, or space
+ if name[0] in ('-', '.', ' '):
+ name = '_' + name
+
+ return name
+
+
+# https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/youtube.py#L72
+INNERTUBE_CLIENTS = {
+ 'android': {
+ 'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
+ 'INNERTUBE_CONTEXT': {
+ 'client': {
+ 'hl': 'en',
+ 'gl': 'US',
+ 'clientName': 'ANDROID',
+ 'clientVersion': '19.09.36',
+ 'osName': 'Android',
+ 'osVersion': '12',
+ 'androidSdkVersion': 31,
+ 'platform': 'MOBILE',
+ 'userAgent': 'com.google.android.youtube/19.09.36 (Linux; U; Android 12; US) gzip'
+ },
+ # https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
+ #'thirdParty': {
+ # 'embedUrl': 'https://google.com', # Can be any valid URL
+ #}
+ },
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
+ 'REQUIRE_JS_PLAYER': False,
+ },
+
+ 'android-test-suite': {
+ 'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
+ 'INNERTUBE_CONTEXT': {
+ 'client': {
+ 'hl': 'en',
+ 'gl': 'US',
+ 'clientName': 'ANDROID_TESTSUITE',
+ 'clientVersion': '1.9',
+ 'osName': 'Android',
+ 'osVersion': '12',
+ 'androidSdkVersion': 31,
+ 'platform': 'MOBILE',
+ 'userAgent': 'com.google.android.youtube/1.9 (Linux; U; Android 12; US) gzip'
+ },
+ # https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
+ #'thirdParty': {
+ # 'embedUrl': 'https://google.com', # Can be any valid URL
+ #}
+ },
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 3,
+ 'REQUIRE_JS_PLAYER': False,
+ },
+
+ 'ios': {
+ 'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc',
+ 'INNERTUBE_CONTEXT': {
+ 'client': {
+ 'hl': 'en',
+ 'gl': 'US',
+ 'clientName': 'IOS',
+ 'clientVersion': '19.09.3',
+ 'deviceModel': 'iPhone14,3',
+ 'userAgent': 'com.google.ios.youtube/19.09.3 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)'
+ }
+ },
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 5,
+ 'REQUIRE_JS_PLAYER': False
+ },
+
+ # This client can access age restricted videos (unless the uploader has disabled the 'allow embedding' option)
+ # See: https://github.com/zerodytrash/YouTube-Internal-Clients
+ 'tv_embedded': {
+ 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
+ 'INNERTUBE_CONTEXT': {
+ 'client': {
+ 'hl': 'en',
+ 'gl': 'US',
+ 'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
+ 'clientVersion': '2.0',
+ 'clientScreen': 'EMBED',
+ },
+ # https://github.com/yt-dlp/yt-dlp/pull/575#issuecomment-887739287
+ 'thirdParty': {
+ 'embedUrl': 'https://google.com', # Can be any valid URL
+ }
+
+ },
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 85,
+ 'REQUIRE_JS_PLAYER': True,
+ },
+
+ 'web': {
+ 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
+ 'INNERTUBE_CONTEXT': {
+ 'client': {
+ 'clientName': 'WEB',
+ 'clientVersion': '2.20220801.00.00',
+ 'userAgent': desktop_user_agent,
+ }
+ },
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 1
+ },
+ 'android_vr': {
+ 'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
+ 'INNERTUBE_CONTEXT': {
+ 'client': {
+ 'clientName': 'ANDROID_VR',
+ 'clientVersion': '1.60.19',
+ 'deviceMake': 'Oculus',
+ 'deviceModel': 'Quest 3',
+ 'androidSdkVersion': 32,
+ 'userAgent': 'com.google.android.apps.youtube.vr.oculus/1.60.19 (Linux; U; Android 12L; eureka-user Build/SQ3A.220605.009.A1) gzip',
+ 'osName': 'Android',
+ 'osVersion': '12L',
+ },
+ },
+ 'INNERTUBE_CONTEXT_CLIENT_NAME': 28,
+ 'REQUIRE_JS_PLAYER': False,
+ },
+}
+
+def get_visitor_data():
+ visitor_data = None
+ visitor_data_cache = os.path.join(settings.data_dir, 'visitorData.txt')
+ if not os.path.exists(settings.data_dir):
+ os.makedirs(settings.data_dir)
+ if os.path.isfile(visitor_data_cache):
+ with open(visitor_data_cache, 'r') as file:
+ print('Getting visitor_data from cache')
+ visitor_data = file.read()
+ max_age = 12*3600
+ file_age = time.time() - os.path.getmtime(visitor_data_cache)
+ if file_age > max_age:
+ print('visitor_data cache is too old. Removing file...')
+ os.remove(visitor_data_cache)
+ return visitor_data
+
+ print('Fetching youtube homepage to get visitor_data')
+ yt_homepage = 'https://www.youtube.com'
+ yt_resp = fetch_url(yt_homepage, headers={'User-Agent': mobile_user_agent}, report_text='Getting youtube homepage')
+ visitor_data_re = r'''"visitorData":\s*?"(.+?)"'''
+ visitor_data_match = re.search(visitor_data_re, yt_resp.decode())
+ if visitor_data_match:
+ visitor_data = visitor_data_match.group(1)
+ print(f'Got visitor_data: {len(visitor_data)}')
+ with open(visitor_data_cache, 'w') as file:
+ print('Saving visitor_data cache...')
+ file.write(visitor_data)
+ return visitor_data
+ else:
+ print('Unable to get visitor_data value')
+ return visitor_data
+
+def call_youtube_api(client, api, data):
+ client_params = INNERTUBE_CLIENTS[client]
+ context = client_params['INNERTUBE_CONTEXT']
+ key = client_params['INNERTUBE_API_KEY']
+ host = client_params.get('INNERTUBE_HOST') or 'www.youtube.com'
+ user_agent = context['client'].get('userAgent') or mobile_user_agent
+ visitor_data = get_visitor_data()
+
+ url = 'https://' + host + '/youtubei/v1/' + api + '?key=' + key
+ if visitor_data:
+ context['client'].update({'visitorData': visitor_data})
+ data['context'] = context
+
+ data = json.dumps(data)
+ headers = (('Content-Type', 'application/json'),('User-Agent', user_agent))
+ if visitor_data:
+ headers = ( *headers, ('X-Goog-Visitor-Id', visitor_data ))
+ response = fetch_url(
+ url, data=data, headers=headers,
+ debug_name='youtubei_' + api + '_' + client,
+ report_text='Fetched ' + client + ' youtubei ' + api
+ ).decode('utf-8')
+ return response
+
+
+def strip_non_ascii(string):
+ ''' Returns the string without non ASCII characters'''
+ if string is None:
+ return ""
+ stripped = (c for c in string if 0 < ord(c) < 127)
+ return ''.join(stripped)
+
+
+def time_utc_isoformat(string):
+ t = datetime.strptime(string, '%Y-%m-%d')
+ t = t.astimezone().isoformat()
+ return t