aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/util.py')
-rw-r--r--youtube/util.py66
1 files changed, 40 insertions, 26 deletions
diff --git a/youtube/util.py b/youtube/util.py
index 8945b9f..df4759e 100644
--- a/youtube/util.py
+++ b/youtube/util.py
@@ -1,6 +1,7 @@
import settings
from youtube import yt_data_extract
-import socks, sockshandler
+import socks
+import sockshandler
import gzip
try:
import brotli
@@ -55,14 +56,15 @@ import urllib3.contrib.socks
URL_ORIGIN = "/https://www.youtube.com"
-connection_pool = urllib3.PoolManager(cert_reqs = 'CERT_REQUIRED')
+connection_pool = urllib3.PoolManager(cert_reqs='CERT_REQUIRED')
+
class TorManager:
def __init__(self):
self.old_tor_connection_pool = None
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
- cert_reqs = 'CERT_REQUIRED')
+ cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic()
self.new_identity_lock = gevent.lock.BoundedSemaphore(1)
@@ -77,7 +79,7 @@ class TorManager:
self.tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager(
'socks5h://127.0.0.1:' + str(settings.tor_port) + '/',
- cert_reqs = 'CERT_REQUIRED')
+ cert_reqs='CERT_REQUIRED')
self.tor_pool_refresh_time = time.monotonic()
def get_tor_connection_pool(self):
@@ -125,6 +127,7 @@ class TorManager:
finally:
self.new_identity_lock.release()
+
tor_manager = TorManager()
@@ -154,6 +157,7 @@ class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
https_request = http_request
https_response = http_response
+
class FetchError(Exception):
def __init__(self, code, reason='', ip=None, error_message=None):
Exception.__init__(self, 'HTTP error during request: ' + code + ' ' + reason)
@@ -162,6 +166,7 @@ class FetchError(Exception):
self.ip = ip
self.error_message = error_message
+
def decode_content(content, encoding_header):
encodings = encoding_header.replace(' ', '').split(',')
for encoding in reversed(encodings):
@@ -173,6 +178,7 @@ def decode_content(content, encoding_header):
content = gzip.decompress(content)
return content
+
def fetch_url_response(url, headers=(), timeout=15, data=None,
cookiejar_send=None, cookiejar_receive=None,
use_tor=True, max_redirects=None):
@@ -234,6 +240,7 @@ def fetch_url_response(url, headers=(), timeout=15, data=None,
return response, cleanup_func
+
def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
cookiejar_send=None, cookiejar_receive=None, use_tor=True,
debug_name=None):
@@ -284,7 +291,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
break
if report_text:
- print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
+ print(report_text, ' Latency:', round(response_time - start_time, 3), ' Read time:', round(read_finish - response_time,3))
if settings.debugging_save_responses and debug_name is not None:
save_dir = os.path.join(settings.data_dir, 'debug')
@@ -296,6 +303,7 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
return content
+
def head(url, use_tor=False, report_text=None, max_redirects=10):
pool = get_pool(use_tor and settings.route_tor)
start_time = time.monotonic()
@@ -305,7 +313,9 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
# According to the documentation for urlopen, a redirect counts as a retry
# So there are 3 redirects max by default. Let's change that
# to 10 since googlevideo redirects a lot.
- retries = urllib3.Retry(3+max_redirects, redirect=max_redirects,
+ retries = urllib3.Retry(
+ 3+max_redirects,
+ redirect=max_redirects,
raise_on_redirect=False)
headers = {'User-Agent': 'Python-urllib'}
response = pool.request('HEAD', url, headers=headers, retries=retries)
@@ -313,19 +323,16 @@ def head(url, use_tor=False, report_text=None, max_redirects=10):
print(
report_text,
' Latency:',
- round(time.monotonic() - start_time,3))
+ round(time.monotonic() - start_time, 3))
return response
+
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
mobile_ua = (('User-Agent', mobile_user_agent),)
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_ua = (('User-Agent', desktop_user_agent),)
-
-
-
-
class RateLimitedQueue(gevent.queue.Queue):
''' Does initial_burst (def. 30) at first, then alternates between waiting waiting_period (def. 5) seconds and doing subsequent_bursts (def. 10) queries. After 5 seconds with nothing left in the queue, resets rate limiting. '''
@@ -342,7 +349,6 @@ class RateLimitedQueue(gevent.queue.Queue):
self.empty_start = 0
gevent.queue.Queue.__init__(self)
-
def get(self):
self.lock.acquire() # blocks if another greenlet currently has the lock
if self.count_since_last_wait >= self.subsequent_bursts and self.surpassed_initial:
@@ -374,7 +380,6 @@ class RateLimitedQueue(gevent.queue.Queue):
return item
-
def download_thumbnail(save_directory, video_id):
url = "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
save_location = os.path.join(save_directory, video_id + ".jpg")
@@ -386,12 +391,13 @@ def download_thumbnail(save_directory, video_id):
try:
f = open(save_location, 'wb')
except FileNotFoundError:
- os.makedirs(save_directory, exist_ok = True)
+ os.makedirs(save_directory, exist_ok=True)
f = open(save_location, 'wb')
f.write(thumbnail)
f.close()
return True
+
def download_thumbnails(save_directory, ids):
if not isinstance(ids, (list, tuple)):
ids = list(ids)
@@ -404,15 +410,12 @@ def download_thumbnails(save_directory, ids):
gevent.joinall([gevent.spawn(download_thumbnail, save_directory, ids[j]) for j in range(i*5 + 5, len(ids))])
-
-
-
-
def dict_add(*dicts):
for dictionary in dicts[1:]:
dicts[0].update(dictionary)
return dicts[0]
+
def video_id(url):
url_parts = urllib.parse.urlparse(url)
return urllib.parse.parse_qs(url_parts.query)['v'][0]
@@ -422,10 +425,11 @@ def video_id(url):
def get_thumbnail_url(video_id):
return settings.img_prefix + "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
+
def seconds_to_timestamp(seconds):
seconds = int(seconds)
- hours, seconds = divmod(seconds,3600)
- minutes, seconds = divmod(seconds,60)
+ hours, seconds = divmod(seconds, 3600)
+ minutes, seconds = divmod(seconds, 60)
if hours != 0:
timestamp = str(hours) + ":"
timestamp += str(minutes).zfill(2) # zfill pads with zeros
@@ -436,18 +440,17 @@ def seconds_to_timestamp(seconds):
return timestamp
-
def update_query_string(query_string, items):
parameters = urllib.parse.parse_qs(query_string)
parameters.update(items)
return urllib.parse.urlencode(parameters, doseq=True)
-
def uppercase_escape(s):
- return re.sub(
- r'\\U([0-9a-fA-F]{8})',
- lambda m: chr(int(m.group(1), base=16)), s)
+ return re.sub(
+ r'\\U([0-9a-fA-F]{8})',
+ lambda m: chr(int(m.group(1), base=16)), s)
+
def prefix_url(url):
if url is None:
@@ -455,12 +458,14 @@ def prefix_url(url):
url = url.lstrip('/') # some urls have // before them, which has a special meaning
return '/' + url
+
def left_remove(string, substring):
'''removes substring from the start of string, if present'''
if string.startswith(substring):
return string[len(substring):]
return string
+
def concat_or_none(*strings):
'''Concatenates strings. Returns None if any of the arguments are None'''
result = ''
@@ -483,6 +488,7 @@ def prefix_urls(item):
except KeyError:
pass
+
def add_extra_html_info(item):
if item['type'] == 'video':
item['url'] = (URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None
@@ -501,6 +507,7 @@ def add_extra_html_info(item):
elif item['type'] == 'channel':
item['url'] = (URL_ORIGIN + "/channel/" + item['id']) if item.get('id') else None
+
def parse_info_prepare_for_html(renderer, additional_info={}):
item = yt_data_extract.extract_item_info(renderer, additional_info)
prefix_urls(item)
@@ -508,6 +515,7 @@ def parse_info_prepare_for_html(renderer, additional_info={}):
return item
+
def check_gevent_exceptions(*tasks):
for task in tasks:
if task.exception:
@@ -528,7 +536,13 @@ replacement_map = collections.OrderedDict([
('*', '_'),
('\t', ' '),
])
-DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3', 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0', 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9'}
+
+DOS_names = {'con', 'prn', 'aux', 'nul', 'com0', 'com1', 'com2', 'com3',
+ 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt0',
+ 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7',
+ 'lpt8', 'lpt9'}
+
+
def to_valid_filename(name):
'''Changes the name so it's valid on Windows, Linux, and Mac'''
# See https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file