from youtube.template import Template from youtube import local_playlist import settings import html import json import re import urllib.parse import gzip import brotli import time import socks, sockshandler URL_ORIGIN = "/https://www.youtube.com" # videos (all of type str): # id # title # url # author # author_url # thumbnail # description # published # duration # likes # dislikes # views # playlist_index # playlists: # id # title # url # author # author_url # thumbnail # description # updated # size # first_video_id with open('yt_basic_template.html', 'r', encoding='utf-8') as file: yt_basic_template = Template(file.read()) page_button_template = Template('''$page''') current_page_button_template = Template('''
$page''') medium_playlist_item_template = Template('''
''') medium_video_item_template = Template('''
$duration $title
$stats
$description $badges
''') small_video_item_template = Template('''
$duration $title
$author
$views
''') small_playlist_item_template = Template('''
$size
$title
$author
''') medium_channel_item_template = Template('''
$duration $title $subscriber_count $size $description
''') class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler): '''Separate cookiejars for receiving and sending''' def __init__(self, cookiejar_send=None, cookiejar_receive=None): import http.cookiejar self.cookiejar_send = cookiejar_send self.cookiejar_receive = cookiejar_receive def http_request(self, request): if self.cookiejar_send is not None: self.cookiejar_send.add_cookie_header(request) return request def http_response(self, request, response): if self.cookiejar_receive is not None: self.cookiejar_receive.extract_cookies(response, request) return response https_request = http_request https_response = http_response def decode_content(content, encoding_header): encodings = encoding_header.replace(' ', '').split(',') for encoding in reversed(encodings): if encoding == 'identity': continue if encoding == 'br': content = brotli.decompress(content) elif encoding == 'gzip': content = gzip.decompress(content) return content def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True): ''' When cookiejar_send is set to a CookieJar object, those cookies will be sent in the request (but cookies in response will not be merged into it) When cookiejar_receive is set to a CookieJar object, cookies received in the response will be merged into the object (nothing will be sent from it) When both are set to the same object, cookies will be sent from the object, and response cookies will be merged into it. ''' headers = dict(headers) # Note: Calling dict() on a dict will make a copy headers['Accept-Encoding'] = 'gzip, br' # prevent python version being leaked by urllib if User-Agent isn't provided # (urllib will use ex. Python-urllib/3.6 otherwise) if 'User-Agent' not in headers and 'user-agent' not in headers and 'User-agent' not in headers: headers['User-Agent'] = 'Python-urllib' if data is not None: if isinstance(data, str): data = data.encode('ascii') elif not isinstance(data, bytes): data = urllib.parse.urlencode(data).encode('ascii') start_time = time.time() req = urllib.request.Request(url, data=data, headers=headers) cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive) if use_tor and settings.route_tor: opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9150), cookie_processor) else: opener = urllib.request.build_opener(cookie_processor) response = opener.open(req, timeout=timeout) response_time = time.time() content = response.read() read_finish = time.time() if report_text: print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3)) content = decode_content(content, response.getheader('Content-Encoding', default='identity')) return content mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1' mobile_ua = (('User-Agent', mobile_user_agent),) desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0' desktop_ua = (('User-Agent', desktop_user_agent),) def dict_add(*dicts): for dictionary in dicts[1:]: dicts[0].update(dictionary) return dicts[0] def video_id(url): url_parts = urllib.parse.urlparse(url) return urllib.parse.parse_qs(url_parts.query)['v'][0] def uppercase_escape(s): return re.sub( r'\\U([0-9a-fA-F]{8})', lambda m: chr(int(m.group(1), base=16)), s) def default_multi_get(object, *keys, default): ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors ''' try: for key in keys: object = object[key] return object except (IndexError, KeyError): return default def get_plain_text(node): try: return html.escape(node['simpleText']) except KeyError: return unformmated_text_runs(node['runs']) def unformmated_text_runs(runs): result = '' for text_run in runs: result += html.escape(text_run["text"]) return result def format_text_runs(runs): if isinstance(runs, str): return runs result = '' for text_run in runs: if text_run.get("bold", False): result += "" + html.escape(text_run["text"]) + "" elif text_run.get('italics', False): result += "" + html.escape(text_run["text"]) + "" else: result += html.escape(text_run["text"]) return result # default, sddefault, mqdefault, hqdefault, hq720 def get_thumbnail_url(video_id): return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg" def seconds_to_timestamp(seconds): seconds = int(seconds) hours, seconds = divmod(seconds,3600) minutes, seconds = divmod(seconds,60) if hours != 0: timestamp = str(hours) + ":" timestamp += str(minutes).zfill(2) # zfill pads with zeros else: timestamp = str(minutes) timestamp += ":" + str(seconds).zfill(2) return timestamp # ----- # HTML # ----- def small_video_item_html(item): video_info = json.dumps({key: item[key] for key in ('id', 'title', 'author', 'duration')}) return small_video_item_template.substitute( title = html.escape(item["title"]), views = item["views"], author = html.escape(item["author"]), duration = item["duration"], url = URL_ORIGIN + "/watch?v=" + item["id"], thumbnail = get_thumbnail_url(item['id']), video_info = html.escape(video_info), ) def small_playlist_item_html(item): return small_playlist_item_template.substitute( title=html.escape(item["title"]), size = item['size'], author="", url = URL_ORIGIN + "/playlist?list=" + item["id"], thumbnail= get_thumbnail_url(item['first_video_id']), ) def medium_playlist_item_html(item): return medium_playlist_item_template.substitute( title=html.escape(item["title"]), size = item['size'], author=item['author'], author_url= URL_ORIGIN + item['author_url'], url = URL_ORIGIN + "/playlist?list=" + item["id"], thumbnail= item['thumbnail'], ) def medium_video_item_html(medium_video_info): info = medium_video_info return medium_video_item_template.substitute( title=html.escape(info["title"]), views=info["views"], published = info["published"], description = format_text_runs(info["description"]), author=html.escape(info["author"]), author_url=info["author_url"], duration=info["duration"], url = URL_ORIGIN + "/watch?v=" + info["id"], thumbnail=info['thumbnail'], datetime='', # TODO ) header_template = Template('''
$playlists
Local playlists
''') playlist_option_template = Template('''''') def get_header(search_box_value=""): playlists = '' for name in local_playlist.get_playlist_names(): playlists += playlist_option_template.substitute(name = name) return header_template.substitute(playlists = playlists, search_box_value = html.escape(search_box_value)) def get_url(node): try: return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] except KeyError: return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] def get_text(node): try: return node['simpleText'] except KeyError: pass try: return node['runs'][0]['text'] except IndexError: # empty text runs return '' def get_formatted_text(node): try: return node['runs'] except KeyError: return node['simpleText'] def get_badges(node): badges = [] for badge_node in node: badge = badge_node['metadataBadgeRenderer']['label'] if badge.lower() != 'new': badges.append(badge) return badges def get_thumbnail(node): try: return node['thumbnails'][0]['url'] # polymer format except KeyError: return node['url'] # ajax format dispatch = { # polymer format 'title': ('title', get_text), 'publishedTimeText': ('published', get_text), 'videoId': ('id', lambda node: node), 'descriptionSnippet': ('description', get_formatted_text), 'lengthText': ('duration', get_text), 'thumbnail': ('thumbnail', get_thumbnail), 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']), 'viewCountText': ('views', get_text), 'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos" 'videoCountText': ('size', get_text), 'playlistId': ('id', lambda node: node), 'descriptionText': ('description', get_formatted_text), 'subscriberCountText': ('subscriber_count', get_text), 'channelId': ('id', lambda node: node), 'badges': ('badges', get_badges), # ajax format 'view_count_text': ('views', get_text), 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]), 'owner_text': ('author', get_text), 'owner_endpoint': ('author_url', lambda node: node['url']), 'description': ('description', get_formatted_text), 'index': ('playlist_index', get_text), 'short_byline': ('author', get_text), 'length': ('duration', get_text), 'video_id': ('id', lambda node: node), } def renderer_info(renderer): try: info = {} if 'viewCountText' in renderer: # prefer this one as it contains all the digits info['views'] = get_text(renderer['viewCountText']) elif 'shortViewCountText' in renderer: info['views'] = get_text(renderer['shortViewCountText']) if 'ownerText' in renderer: info['author'] = renderer['ownerText']['runs'][0]['text'] info['author_url'] = renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] try: overlays = renderer['thumbnailOverlays'] except KeyError: pass else: for overlay in overlays: if 'thumbnailOverlayTimeStatusRenderer' in overlay: info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text']) # show renderers don't have videoCountText elif 'thumbnailOverlayBottomPanelRenderer' in overlay: info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text']) # show renderers don't have playlistId, have to dig into the url to get it try: info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId'] except KeyError: pass for key, node in renderer.items(): if key in ('longBylineText', 'shortBylineText'): info['author'] = get_text(node) try: info['author_url'] = get_url(node) except KeyError: pass # show renderers don't have thumbnail key at top level, dig into thumbnailRenderer elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node: info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url'] else: try: simple_key, function = dispatch[key] except KeyError: continue info[simple_key] = function(node) return info except KeyError: print(renderer) raise def ajax_info(item_json): try: info = {} for key, node in item_json.items(): try: simple_key, function = dispatch[key] except KeyError: continue info[simple_key] = function(node) return info except KeyError: print(item_json) raise def badges_html(badges): return ' | '.join(map(html.escape, badges)) html_transform_dispatch = { 'title': html.escape, 'published': html.escape, 'id': html.escape, 'description': format_text_runs, 'duration': html.escape, 'thumbnail': lambda url: html.escape('/' + url.lstrip('/')), 'size': html.escape, 'author': html.escape, 'author_url': lambda url: html.escape(URL_ORIGIN + url), 'views': html.escape, 'subscriber_count': html.escape, 'badges': badges_html, 'playlist_index': html.escape, } def get_html_ready(item): html_ready = {} for key, value in item.items(): try: function = html_transform_dispatch[key] except KeyError: continue html_ready[key] = function(value) return html_ready author_template_url = Template('''
By $author
''') author_template = Template('''
$author
''') stat_templates = ( Template('''$views'''), Template(''''''), ) def get_stats(html_ready): stats = [] if 'author' in html_ready: if 'author_url' in html_ready: stats.append(author_template_url.substitute(html_ready)) else: stats.append(author_template.substitute(html_ready)) for stat in stat_templates: try: stats.append(stat.strict_substitute(html_ready)) except KeyError: pass return ' | '.join(stats) def video_item_html(item, template, html_exclude=set()): video_info = {} for key in ('id', 'title', 'author'): try: video_info[key] = item[key] except KeyError: video_info[key] = '' try: video_info['duration'] = item['duration'] except KeyError: video_info['duration'] = 'Live' # livestreams don't have a duration html_ready = get_html_ready(item) html_ready['video_info'] = html.escape(json.dumps(video_info) ) html_ready['url'] = URL_ORIGIN + "/watch?v=" + html_ready['id'] html_ready['datetime'] = '' #TODO for key in html_exclude: del html_ready[key] html_ready['stats'] = get_stats(html_ready) return template.substitute(html_ready) def playlist_item_html(item, template, html_exclude=set()): html_ready = get_html_ready(item) html_ready['url'] = URL_ORIGIN + "/playlist?list=" + html_ready['id'] html_ready['datetime'] = '' #TODO for key in html_exclude: del html_ready[key] html_ready['stats'] = get_stats(html_ready) return template.substitute(html_ready) def update_query_string(query_string, items): parameters = urllib.parse.parse_qs(query_string) parameters.update(items) return urllib.parse.urlencode(parameters, doseq=True) page_button_template = Template('''$page''') current_page_button_template = Template('''
$page
''') def page_buttons_html(current_page, estimated_pages, url, current_query_string): if current_page <= 5: page_start = 1 page_end = min(9, estimated_pages) else: page_start = current_page - 4 page_end = min(current_page + 4, estimated_pages) result = "" for page in range(page_start, page_end+1): if page == current_page: template = current_page_button_template else: template = page_button_template result += template.substitute(page=page, href = url + "?" + update_query_string(current_query_string, {'page': [str(page)]}) ) return result showing_results_for = Template('''
Showing results for $corrected_query
Search instead for $original_query
''') did_you_mean = Template('''
Did you mean $corrected_query
''') def renderer_html(renderer, additional_info={}, current_query_string=''): type = list(renderer.keys())[0] renderer = renderer[type] if type == 'itemSectionRenderer': return renderer_html(renderer['contents'][0], additional_info, current_query_string) if type == 'channelRenderer': info = renderer_info(renderer) html_ready = get_html_ready(info) html_ready['url'] = URL_ORIGIN + "/channel/" + html_ready['id'] return medium_channel_item_template.substitute(html_ready) if type in ('movieRenderer', 'clarificationRenderer'): return '' info = renderer_info(renderer) info.update(additional_info) html_exclude = set(additional_info.keys()) if type == 'compactVideoRenderer': return video_item_html(info, small_video_item_template, html_exclude=html_exclude) if type in ('compactPlaylistRenderer', 'compactRadioRenderer', 'compactShowRenderer'): return playlist_item_html(info, small_playlist_item_template, html_exclude=html_exclude) if type in ('videoRenderer', 'gridVideoRenderer'): return video_item_html(info, medium_video_item_template, html_exclude=html_exclude) if type in ('playlistRenderer', 'gridPlaylistRenderer', 'radioRenderer', 'gridRadioRenderer', 'gridShowRenderer', 'showRenderer'): return playlist_item_html(info, medium_playlist_item_template, html_exclude=html_exclude) #print(renderer) #raise NotImplementedError('Unknown renderer type: ' + type) return ''