aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/util.py')
-rw-r--r--youtube/util.py153
1 files changed, 153 insertions, 0 deletions
diff --git a/youtube/util.py b/youtube/util.py
new file mode 100644
index 0000000..2ebd8bc
--- /dev/null
+++ b/youtube/util.py
@@ -0,0 +1,153 @@
+import socks, sockshandler
+import gzip
+import brotli
+import urllib.parse
+import re
+import time
+import settings
+
+
+URL_ORIGIN = "/https://www.youtube.com"
+
+
+class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
+ '''Separate cookiejars for receiving and sending'''
+ def __init__(self, cookiejar_send=None, cookiejar_receive=None):
+ import http.cookiejar
+ self.cookiejar_send = cookiejar_send
+ self.cookiejar_receive = cookiejar_receive
+
+ def http_request(self, request):
+ if self.cookiejar_send is not None:
+ self.cookiejar_send.add_cookie_header(request)
+ return request
+
+ def http_response(self, request, response):
+ if self.cookiejar_receive is not None:
+ self.cookiejar_receive.extract_cookies(response, request)
+ return response
+
+ https_request = http_request
+ https_response = http_response
+
+
+def decode_content(content, encoding_header):
+ encodings = encoding_header.replace(' ', '').split(',')
+ for encoding in reversed(encodings):
+ if encoding == 'identity':
+ continue
+ if encoding == 'br':
+ content = brotli.decompress(content)
+ elif encoding == 'gzip':
+ content = gzip.decompress(content)
+ return content
+
+def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True):
+ '''
+ When cookiejar_send is set to a CookieJar object,
+ those cookies will be sent in the request (but cookies in response will not be merged into it)
+ When cookiejar_receive is set to a CookieJar object,
+ cookies received in the response will be merged into the object (nothing will be sent from it)
+ When both are set to the same object, cookies will be sent from the object,
+ and response cookies will be merged into it.
+ '''
+ headers = dict(headers) # Note: Calling dict() on a dict will make a copy
+ headers['Accept-Encoding'] = 'gzip, br'
+
+ # prevent python version being leaked by urllib if User-Agent isn't provided
+ # (urllib will use ex. Python-urllib/3.6 otherwise)
+ if 'User-Agent' not in headers and 'user-agent' not in headers and 'User-agent' not in headers:
+ headers['User-Agent'] = 'Python-urllib'
+
+ if data is not None:
+ if isinstance(data, str):
+ data = data.encode('ascii')
+ elif not isinstance(data, bytes):
+ data = urllib.parse.urlencode(data).encode('ascii')
+
+ start_time = time.time()
+
+
+ req = urllib.request.Request(url, data=data, headers=headers)
+
+ cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
+
+ if use_tor and settings.route_tor:
+ opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9150), cookie_processor)
+ else:
+ opener = urllib.request.build_opener(cookie_processor)
+
+ response = opener.open(req, timeout=timeout)
+ response_time = time.time()
+
+
+ content = response.read()
+ read_finish = time.time()
+ if report_text:
+ print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
+ content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
+ return content
+
+mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
+mobile_ua = (('User-Agent', mobile_user_agent),)
+desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
+desktop_ua = (('User-Agent', desktop_user_agent),)
+
+
+
+
+
+
+
+
+
+
+def dict_add(*dicts):
+ for dictionary in dicts[1:]:
+ dicts[0].update(dictionary)
+ return dicts[0]
+
+def video_id(url):
+ url_parts = urllib.parse.urlparse(url)
+ return urllib.parse.parse_qs(url_parts.query)['v'][0]
+
+def default_multi_get(object, *keys, default):
+ ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
+ try:
+ for key in keys:
+ object = object[key]
+ return object
+ except (IndexError, KeyError):
+ return default
+
+
+# default, sddefault, mqdefault, hqdefault, hq720
+def get_thumbnail_url(video_id):
+ return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
+
+def seconds_to_timestamp(seconds):
+ seconds = int(seconds)
+ hours, seconds = divmod(seconds,3600)
+ minutes, seconds = divmod(seconds,60)
+ if hours != 0:
+ timestamp = str(hours) + ":"
+ timestamp += str(minutes).zfill(2) # zfill pads with zeros
+ else:
+ timestamp = str(minutes)
+
+ timestamp += ":" + str(seconds).zfill(2)
+ return timestamp
+
+
+
+def update_query_string(query_string, items):
+ parameters = urllib.parse.parse_qs(query_string)
+ parameters.update(items)
+ return urllib.parse.urlencode(parameters, doseq=True)
+
+
+
+def uppercase_escape(s):
+ return re.sub(
+ r'\\U([0-9a-fA-F]{8})',
+ lambda m: chr(int(m.group(1), base=16)), s) \ No newline at end of file