path: root/youtube/util.py
diff options
authorJames Taylor <user234683@users.noreply.github.com>2019-02-21 21:32:31 -0800
committerJames Taylor <user234683@users.noreply.github.com>2019-02-21 21:32:31 -0800
commitb32330be4f15dd044e6212f526e52375f0a0f6c2 (patch)
treefe2f7da84243d895e46967bd39d61d6cf17dab21 /youtube/util.py
parenta61ba6b8f45d94bf8e89a9f351c5c6cac2379387 (diff)
refactor common.py into 3 files
Diffstat (limited to 'youtube/util.py')
1 files changed, 153 insertions, 0 deletions
diff --git a/youtube/util.py b/youtube/util.py
new file mode 100644
index 0000000..2ebd8bc
--- /dev/null
+++ b/youtube/util.py
@@ -0,0 +1,153 @@
+import socks, sockshandler
+import gzip
+import brotli
+import urllib.parse
+import re
+import time
+import settings
+URL_ORIGIN = "/https://www.youtube.com"
+class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
+ '''Separate cookiejars for receiving and sending'''
+ def __init__(self, cookiejar_send=None, cookiejar_receive=None):
+ import http.cookiejar
+ self.cookiejar_send = cookiejar_send
+ self.cookiejar_receive = cookiejar_receive
+ def http_request(self, request):
+ if self.cookiejar_send is not None:
+ self.cookiejar_send.add_cookie_header(request)
+ return request
+ def http_response(self, request, response):
+ if self.cookiejar_receive is not None:
+ self.cookiejar_receive.extract_cookies(response, request)
+ return response
+ https_request = http_request
+ https_response = http_response
+def decode_content(content, encoding_header):
+ encodings = encoding_header.replace(' ', '').split(',')
+ for encoding in reversed(encodings):
+ if encoding == 'identity':
+ continue
+ if encoding == 'br':
+ content = brotli.decompress(content)
+ elif encoding == 'gzip':
+ content = gzip.decompress(content)
+ return content
+def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True):
+ '''
+ When cookiejar_send is set to a CookieJar object,
+ those cookies will be sent in the request (but cookies in response will not be merged into it)
+ When cookiejar_receive is set to a CookieJar object,
+ cookies received in the response will be merged into the object (nothing will be sent from it)
+ When both are set to the same object, cookies will be sent from the object,
+ and response cookies will be merged into it.
+ '''
+ headers = dict(headers) # Note: Calling dict() on a dict will make a copy
+ headers['Accept-Encoding'] = 'gzip, br'
+ # prevent python version being leaked by urllib if User-Agent isn't provided
+ # (urllib will use ex. Python-urllib/3.6 otherwise)
+ if 'User-Agent' not in headers and 'user-agent' not in headers and 'User-agent' not in headers:
+ headers['User-Agent'] = 'Python-urllib'
+ if data is not None:
+ if isinstance(data, str):
+ data = data.encode('ascii')
+ elif not isinstance(data, bytes):
+ data = urllib.parse.urlencode(data).encode('ascii')
+ start_time = time.time()
+ req = urllib.request.Request(url, data=data, headers=headers)
+ cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
+ if use_tor and settings.route_tor:
+ opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "", 9150), cookie_processor)
+ else:
+ opener = urllib.request.build_opener(cookie_processor)
+ response = opener.open(req, timeout=timeout)
+ response_time = time.time()
+ content = response.read()
+ read_finish = time.time()
+ if report_text:
+ print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
+ content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
+ return content
+mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
+mobile_ua = (('User-Agent', mobile_user_agent),)
+desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
+desktop_ua = (('User-Agent', desktop_user_agent),)
+def dict_add(*dicts):
+ for dictionary in dicts[1:]:
+ dicts[0].update(dictionary)
+ return dicts[0]
+def video_id(url):
+ url_parts = urllib.parse.urlparse(url)
+ return urllib.parse.parse_qs(url_parts.query)['v'][0]
+def default_multi_get(object, *keys, default):
+ ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
+ try:
+ for key in keys:
+ object = object[key]
+ return object
+ except (IndexError, KeyError):
+ return default
+# default, sddefault, mqdefault, hqdefault, hq720
+def get_thumbnail_url(video_id):
+ return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
+def seconds_to_timestamp(seconds):
+ seconds = int(seconds)
+ hours, seconds = divmod(seconds,3600)
+ minutes, seconds = divmod(seconds,60)
+ if hours != 0:
+ timestamp = str(hours) + ":"
+ timestamp += str(minutes).zfill(2) # zfill pads with zeros
+ else:
+ timestamp = str(minutes)
+ timestamp += ":" + str(seconds).zfill(2)
+ return timestamp
+def update_query_string(query_string, items):
+ parameters = urllib.parse.parse_qs(query_string)
+ parameters.update(items)
+ return urllib.parse.urlencode(parameters, doseq=True)
+def uppercase_escape(s):
+ return re.sub(
+ r'\\U([0-9a-fA-F]{8})',
+ lambda m: chr(int(m.group(1), base=16)), s) \ No newline at end of file