From 79937c1c823f998a1d6bb324901fd13b483b3607 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Mon, 2 Jul 2018 17:45:25 -0700 Subject: fix line endings --- youtube/common.py | 1278 ++++++++++++++++++++++++++--------------------------- 1 file changed, 639 insertions(+), 639 deletions(-) (limited to 'youtube/common.py') diff --git a/youtube/common.py b/youtube/common.py index 67bd81f..3133fed 100644 --- a/youtube/common.py +++ b/youtube/common.py @@ -1,639 +1,639 @@ -from youtube.template import Template -import html -import json -import re -import urllib.parse -import gzip -import brotli -import time - - -URL_ORIGIN = "/https://www.youtube.com" - - -# videos (all of type str): - -# id -# title -# url -# author -# author_url -# thumbnail -# description -# published -# duration -# likes -# dislikes -# views -# playlist_index - -# playlists: - -# id -# title -# url -# author -# author_url -# thumbnail -# description -# updated -# size -# first_video_id - - - - - - - -page_button_template = Template('''$page''') -current_page_button_template = Template('''
$page''') - -medium_playlist_item_template = Template(''' - -''') -medium_video_item_template = Template(''' -
- - - $duration - - - $title - -
$stats
- - - $description - $badges -
-''') - -small_video_item_template = Template(''' -
-
- - - $duration - - $title - -
$author
- $views - -
- -
-''') - -small_playlist_item_template = Template(''' -
-
- - -
- $size -
-
- $title - -
$author
-
-
-''') - -medium_channel_item_template = Template(''' -
- - - $duration - - - $title - - $subscriber_count - $size - - $description -
-''') - - -def fetch_url(url, headers=(), timeout=5, report_text=None): - if isinstance(headers, list): - headers += [('Accept-Encoding', 'gzip, br')] - headers = dict(headers) - elif isinstance(headers, tuple): - headers += (('Accept-Encoding', 'gzip, br'),) - headers = dict(headers) - else: - headers = headers.copy() - headers['Accept-Encoding'] = 'gzip, br' - - start_time = time.time() - - req = urllib.request.Request(url, headers=headers) - response = urllib.request.urlopen(req, timeout=timeout) - response_time = time.time() - - content = response.read() - read_finish = time.time() - if report_text: - print(report_text, 'Latency:', response_time - start_time, ' Read time:', read_finish - response_time) - encodings = response.getheader('Content-Encoding', default='identity').replace(' ', '').split(',') - for encoding in reversed(encodings): - if encoding == 'identity': - continue - if encoding == 'br': - content = brotli.decompress(content) - elif encoding == 'gzip': - content = gzip.decompress(content) - return content - -mobile_ua = (('User-Agent', 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'),) - -def dict_add(*dicts): - for dictionary in dicts[1:]: - dicts[0].update(dictionary) - return dicts[0] - -def video_id(url): - url_parts = urllib.parse.urlparse(url) - return urllib.parse.parse_qs(url_parts.query)['v'][0] - -def uppercase_escape(s): - return re.sub( - r'\\U([0-9a-fA-F]{8})', - lambda m: chr(int(m.group(1), base=16)), s) - -def default_multi_get(object, *keys, default): - ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors ''' - try: - for key in keys: - object = object[key] - return object - except (IndexError, KeyError): - return default - -def get_plain_text(node): - try: - return html.escape(node['simpleText']) - except KeyError: - return unformmated_text_runs(node['runs']) - -def unformmated_text_runs(runs): - result = '' - for text_run in runs: - result += html.escape(text_run["text"]) - return result - -def format_text_runs(runs): - if isinstance(runs, str): - return runs - result = '' - for text_run in runs: - if text_run.get("bold", False): - result += "" + html.escape(text_run["text"]) + "" - elif text_run.get('italics', False): - result += "" + html.escape(text_run["text"]) + "" - else: - result += html.escape(text_run["text"]) - return result - -# default, sddefault, mqdefault, hqdefault, hq720 -def get_thumbnail_url(video_id): - return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg" - -def seconds_to_timestamp(seconds): - seconds = int(seconds) - hours, seconds = divmod(seconds,3600) - minutes, seconds = divmod(seconds,60) - if hours != 0: - timestamp = str(hours) + ":" - timestamp += str(minutes).zfill(2) # zfill pads with zeros - else: - timestamp = str(minutes) - - timestamp += ":" + str(seconds).zfill(2) - return timestamp - -# playlists: - -# id -# title -# url -# author -# author_url -# thumbnail -# description -# updated -# size -# first_video_id -def medium_playlist_item_info(playlist_renderer): - renderer = playlist_renderer - try: - author_url = URL_ORIGIN + renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] - except KeyError: # radioRenderer - author_url = '' - try: - thumbnail = renderer['thumbnails'][0]['thumbnails'][0]['url'] - except KeyError: - thumbnail = renderer['thumbnail']['thumbnails'][0]['url'] - return { - "title": renderer["title"]["simpleText"], - 'id': renderer["playlistId"], - 'size': renderer.get('videoCount', '50+'), - "author": default_multi_get(renderer,'longBylineText','runs',0,'text', default='Youtube'), - "author_url": author_url, - 'thumbnail': thumbnail, - } - -def medium_video_item_info(video_renderer): - renderer = video_renderer - try: - return { - "title": renderer["title"]["simpleText"], - "id": renderer["videoId"], - "description": renderer.get("descriptionSnippet",dict()).get('runs',[]), # a list of text runs (formmated), rather than plain text - "thumbnail": get_thumbnail_url(renderer["videoId"]), - "views": renderer['viewCountText'].get('simpleText', None) or renderer['viewCountText']['runs'][0]['text'], - "duration": default_multi_get(renderer, 'lengthText', 'simpleText', default=''), # livestreams dont have a length - "author": renderer['longBylineText']['runs'][0]['text'], - "author_url": URL_ORIGIN + renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'], - "published": default_multi_get(renderer, 'publishedTimeText', 'simpleText', default=''), - } - except KeyError: - print(renderer) - raise - -def small_video_item_info(compact_video_renderer): - renderer = compact_video_renderer - return { - "title": renderer['title']['simpleText'], - "id": renderer['videoId'], - "views": renderer['viewCountText'].get('simpleText', None) or renderer['viewCountText']['runs'][0]['text'], - "duration": default_multi_get(renderer, 'lengthText', 'simpleText', default=''), # livestreams dont have a length - "author": renderer['longBylineText']['runs'][0]['text'], - "author_url": renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'], - } - - -# ----- -# HTML -# ----- - -def small_video_item_html(item): - video_info = json.dumps({key: item[key] for key in ('id', 'title', 'author', 'duration')}) - return small_video_item_template.substitute( - title = html.escape(item["title"]), - views = item["views"], - author = html.escape(item["author"]), - duration = item["duration"], - url = URL_ORIGIN + "/watch?v=" + item["id"], - thumbnail = get_thumbnail_url(item['id']), - video_info = html.escape(json.dumps(video_info)), - ) - -def small_playlist_item_html(item): - return small_playlist_item_template.substitute( - title=html.escape(item["title"]), - size = item['size'], - author="", - url = URL_ORIGIN + "/playlist?list=" + item["id"], - thumbnail= get_thumbnail_url(item['first_video_id']), - ) - -def medium_playlist_item_html(item): - return medium_playlist_item_template.substitute( - title=html.escape(item["title"]), - size = item['size'], - author=item['author'], - author_url= URL_ORIGIN + item['author_url'], - url = URL_ORIGIN + "/playlist?list=" + item["id"], - thumbnail= item['thumbnail'], - ) - -def medium_video_item_html(medium_video_info): - info = medium_video_info - - return medium_video_item_template.substitute( - title=html.escape(info["title"]), - views=info["views"], - published = info["published"], - description = format_text_runs(info["description"]), - author=html.escape(info["author"]), - author_url=info["author_url"], - duration=info["duration"], - url = URL_ORIGIN + "/watch?v=" + info["id"], - thumbnail=info['thumbnail'], - datetime='', # TODO - ) - -html_functions = { - 'compactVideoRenderer': lambda x: small_video_item_html(small_video_item_info(x)), - 'videoRenderer': lambda x: medium_video_item_html(medium_video_item_info(x)), - 'compactPlaylistRenderer': lambda x: small_playlist_item_html(small_playlist_item_info(x)), - 'playlistRenderer': lambda x: medium_playlist_item_html(medium_playlist_item_info(x)), - 'channelRenderer': lambda x: '', - 'radioRenderer': lambda x: medium_playlist_item_html(medium_playlist_item_info(x)), - 'compactRadioRenderer': lambda x: small_playlist_item_html(small_playlist_item_info(x)), - 'didYouMeanRenderer': lambda x: '', -} - - - - - - - -def get_url(node): - try: - return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] - except KeyError: - return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] - - -def get_text(node): - try: - return node['simpleText'] - except KeyError: - return node['runs'][0]['text'] - -def get_formatted_text(node): - try: - return node['runs'] - except KeyError: - return node['simpleText'] - -def get_badges(node): - badges = [] - for badge_node in node: - badge = badge_node['metadataBadgeRenderer']['label'] - if badge.lower() != 'new': - badges.append(badge) - return badges - -def get_thumbnail(node): - try: - return node['thumbnails'][0]['url'] # polymer format - except KeyError: - return node['url'] # ajax format - -dispatch = { - -# polymer format - 'title': ('title', get_text), - 'publishedTimeText': ('published', get_text), - 'videoId': ('id', lambda node: node), - 'descriptionSnippet': ('description', get_formatted_text), - 'lengthText': ('duration', get_text), - 'thumbnail': ('thumbnail', get_thumbnail), - 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']), - - 'videoCountText': ('size', get_text), - 'playlistId': ('id', lambda node: node), - - 'subscriberCountText': ('subscriber_count', get_text), - 'channelId': ('id', lambda node: node), - 'badges': ('badges', get_badges), - -# ajax format - 'view_count_text': ('views', get_text), - 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]), - 'owner_text': ('author', get_text), - 'owner_endpoint': ('author_url', lambda node: node['url']), - 'description': ('description', get_formatted_text), - 'index': ('playlist_index', get_text), - 'short_byline': ('author', get_text), - 'length': ('duration', get_text), - 'video_id': ('id', lambda node: node), - -} - -def renderer_info(renderer): - try: - info = {} - if 'viewCountText' in renderer: # prefer this one as it contains all the digits - info['views'] = get_text(renderer['viewCountText']) - elif 'shortViewCountText' in renderer: - info['views'] = get_text(renderer['shortViewCountText']) - - for key, node in renderer.items(): - if key in ('longBylineText', 'shortBylineText'): - info['author'] = get_text(node) - try: - info['author_url'] = get_url(node) - except KeyError: - pass - - continue - - try: - simple_key, function = dispatch[key] - except KeyError: - continue - info[simple_key] = function(node) - return info - except KeyError: - print(renderer) - raise - -def ajax_info(item_json): - try: - info = {} - for key, node in item_json.items(): - try: - simple_key, function = dispatch[key] - except KeyError: - continue - info[simple_key] = function(node) - return info - except KeyError: - print(item_json) - raise - -def badges_html(badges): - return ' | '.join(map(html.escape, badges)) - - - - - -html_transform_dispatch = { - 'title': html.escape, - 'published': html.escape, - 'id': html.escape, - 'description': format_text_runs, - 'duration': html.escape, - 'thumbnail': lambda url: html.escape('/' + url.lstrip('/')), - 'size': html.escape, - 'author': html.escape, - 'author_url': lambda url: html.escape(URL_ORIGIN + url), - 'views': html.escape, - 'subscriber_count': html.escape, - 'badges': badges_html, - 'playlist_index': html.escape, -} - -def get_html_ready(item): - html_ready = {} - for key, value in item.items(): - try: - function = html_transform_dispatch[key] - except KeyError: - continue - html_ready[key] = function(value) - return html_ready - - -author_template_url = Template('''
By $author
''') -author_template = Template('''
By $author
''') -stat_templates = ( - Template('''$views'''), - Template(''''''), -) -def get_video_stats(html_ready): - stats = [] - if 'author' in html_ready: - if 'author_url' in html_ready: - stats.append(author_template_url.substitute(html_ready)) - else: - stats.append(author_template.substitute(html_ready)) - for stat in stat_templates: - try: - stats.append(stat.strict_substitute(html_ready)) - except KeyError: - pass - return ' | '.join(stats) - -def video_item_html(item, template): - html_ready = get_html_ready(item) - video_info = {} - for key in ('id', 'title', 'author'): - try: - video_info[key] = html_ready[key] - except KeyError: - video_info[key] = '' - try: - video_info['duration'] = html_ready['duration'] - except KeyError: - video_info['duration'] = 'Live' # livestreams don't have a duration - - html_ready['video_info'] = html.escape(json.dumps(video_info) ) - html_ready['url'] = URL_ORIGIN + "/watch?v=" + html_ready['id'] - html_ready['datetime'] = '' #TODO - - html_ready['stats'] = get_video_stats(html_ready) - - return template.substitute(html_ready) - - -def playlist_item_html(item, template): - html_ready = get_html_ready(item) - - html_ready['url'] = URL_ORIGIN + "/playlist?list=" + html_ready['id'] - html_ready['datetime'] = '' #TODO - return template.substitute(html_ready) - - - - - - -def make_query_string(query_string): - return '&'.join(key + '=' + ','.join(values) for key,values in query_string.items()) - -def update_query_string(query_string, items): - parameters = urllib.parse.parse_qs(query_string) - parameters.update(items) - return make_query_string(parameters) - -page_button_template = Template('''$page''') -current_page_button_template = Template('''
$page
''') - -def page_buttons_html(current_page, estimated_pages, url, current_query_string): - if current_page <= 5: - page_start = 1 - page_end = min(9, estimated_pages) - else: - page_start = current_page - 4 - page_end = min(current_page + 4, estimated_pages) - - result = "" - for page in range(page_start, page_end+1): - if page == current_page: - template = current_page_button_template - else: - template = page_button_template - result += template.substitute(page=page, href = url + "?" + update_query_string(current_query_string, {'page': [str(page)]}) ) - return result - - - - - - - -showing_results_for = Template(''' -
-
Showing results for $corrected_query
-
Search instead for $original_query
-
-''') - -did_you_mean = Template(''' -
-
Did you mean $corrected_query
-
-''') - -def renderer_html(renderer, additional_info={}, current_query_string=''): - type = list(renderer.keys())[0] - renderer = renderer[type] - if type in ('videoRenderer', 'playlistRenderer', 'radioRenderer', 'compactVideoRenderer', 'compactPlaylistRenderer', 'compactRadioRenderer', 'gridVideoRenderer', 'gridPlaylistRenderer', 'gridRadioRenderer'): - info = renderer_info(renderer) - info.update(additional_info) - if type == 'compactVideoRenderer': - return video_item_html(info, small_video_item_template) - if type in ('compactPlaylistRenderer', 'compactRadioRenderer'): - return playlist_item_html(info, small_playlist_item_template) - if type in ('videoRenderer', 'gridVideoRenderer'): - return video_item_html(info, medium_video_item_template) - if type in ('playlistRenderer', 'gridPlaylistRenderer', 'radioRenderer', 'gridRadioRenderer'): - return playlist_item_html(info, medium_playlist_item_template) - - if type == 'channelRenderer': - info = renderer_info(renderer) - html_ready = get_html_ready(info) - html_ready['url'] = URL_ORIGIN + "/channel/" + html_ready['id'] - return medium_channel_item_template.substitute(html_ready) - - if type == 'movieRenderer': - return '' - print(renderer) - raise NotImplementedError('Unknown renderer type: ' + type) - - -'videoRenderer' -'playlistRenderer' -'channelRenderer' -'radioRenderer' -'gridVideoRenderer' -'gridPlaylistRenderer' - -'didYouMeanRenderer' -'showingResultsForRenderer' +from youtube.template import Template +import html +import json +import re +import urllib.parse +import gzip +import brotli +import time + + +URL_ORIGIN = "/https://www.youtube.com" + + +# videos (all of type str): + +# id +# title +# url +# author +# author_url +# thumbnail +# description +# published +# duration +# likes +# dislikes +# views +# playlist_index + +# playlists: + +# id +# title +# url +# author +# author_url +# thumbnail +# description +# updated +# size +# first_video_id + + + + + + + +page_button_template = Template('''$page''') +current_page_button_template = Template('''
$page''') + +medium_playlist_item_template = Template(''' + +''') +medium_video_item_template = Template(''' +
+ + + $duration + + + $title + +
$stats
+ + + $description + $badges +
+''') + +small_video_item_template = Template(''' +
+
+ + + $duration + + $title + +
$author
+ $views + +
+ +
+''') + +small_playlist_item_template = Template(''' +
+
+ + +
+ $size +
+
+ $title + +
$author
+
+
+''') + +medium_channel_item_template = Template(''' +
+ + + $duration + + + $title + + $subscriber_count + $size + + $description +
+''') + + +def fetch_url(url, headers=(), timeout=5, report_text=None): + if isinstance(headers, list): + headers += [('Accept-Encoding', 'gzip, br')] + headers = dict(headers) + elif isinstance(headers, tuple): + headers += (('Accept-Encoding', 'gzip, br'),) + headers = dict(headers) + else: + headers = headers.copy() + headers['Accept-Encoding'] = 'gzip, br' + + start_time = time.time() + + req = urllib.request.Request(url, headers=headers) + response = urllib.request.urlopen(req, timeout=timeout) + response_time = time.time() + + content = response.read() + read_finish = time.time() + if report_text: + print(report_text, 'Latency:', response_time - start_time, ' Read time:', read_finish - response_time) + encodings = response.getheader('Content-Encoding', default='identity').replace(' ', '').split(',') + for encoding in reversed(encodings): + if encoding == 'identity': + continue + if encoding == 'br': + content = brotli.decompress(content) + elif encoding == 'gzip': + content = gzip.decompress(content) + return content + +mobile_ua = (('User-Agent', 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'),) + +def dict_add(*dicts): + for dictionary in dicts[1:]: + dicts[0].update(dictionary) + return dicts[0] + +def video_id(url): + url_parts = urllib.parse.urlparse(url) + return urllib.parse.parse_qs(url_parts.query)['v'][0] + +def uppercase_escape(s): + return re.sub( + r'\\U([0-9a-fA-F]{8})', + lambda m: chr(int(m.group(1), base=16)), s) + +def default_multi_get(object, *keys, default): + ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors ''' + try: + for key in keys: + object = object[key] + return object + except (IndexError, KeyError): + return default + +def get_plain_text(node): + try: + return html.escape(node['simpleText']) + except KeyError: + return unformmated_text_runs(node['runs']) + +def unformmated_text_runs(runs): + result = '' + for text_run in runs: + result += html.escape(text_run["text"]) + return result + +def format_text_runs(runs): + if isinstance(runs, str): + return runs + result = '' + for text_run in runs: + if text_run.get("bold", False): + result += "" + html.escape(text_run["text"]) + "" + elif text_run.get('italics', False): + result += "" + html.escape(text_run["text"]) + "" + else: + result += html.escape(text_run["text"]) + return result + +# default, sddefault, mqdefault, hqdefault, hq720 +def get_thumbnail_url(video_id): + return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg" + +def seconds_to_timestamp(seconds): + seconds = int(seconds) + hours, seconds = divmod(seconds,3600) + minutes, seconds = divmod(seconds,60) + if hours != 0: + timestamp = str(hours) + ":" + timestamp += str(minutes).zfill(2) # zfill pads with zeros + else: + timestamp = str(minutes) + + timestamp += ":" + str(seconds).zfill(2) + return timestamp + +# playlists: + +# id +# title +# url +# author +# author_url +# thumbnail +# description +# updated +# size +# first_video_id +def medium_playlist_item_info(playlist_renderer): + renderer = playlist_renderer + try: + author_url = URL_ORIGIN + renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] + except KeyError: # radioRenderer + author_url = '' + try: + thumbnail = renderer['thumbnails'][0]['thumbnails'][0]['url'] + except KeyError: + thumbnail = renderer['thumbnail']['thumbnails'][0]['url'] + return { + "title": renderer["title"]["simpleText"], + 'id': renderer["playlistId"], + 'size': renderer.get('videoCount', '50+'), + "author": default_multi_get(renderer,'longBylineText','runs',0,'text', default='Youtube'), + "author_url": author_url, + 'thumbnail': thumbnail, + } + +def medium_video_item_info(video_renderer): + renderer = video_renderer + try: + return { + "title": renderer["title"]["simpleText"], + "id": renderer["videoId"], + "description": renderer.get("descriptionSnippet",dict()).get('runs',[]), # a list of text runs (formmated), rather than plain text + "thumbnail": get_thumbnail_url(renderer["videoId"]), + "views": renderer['viewCountText'].get('simpleText', None) or renderer['viewCountText']['runs'][0]['text'], + "duration": default_multi_get(renderer, 'lengthText', 'simpleText', default=''), # livestreams dont have a length + "author": renderer['longBylineText']['runs'][0]['text'], + "author_url": URL_ORIGIN + renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'], + "published": default_multi_get(renderer, 'publishedTimeText', 'simpleText', default=''), + } + except KeyError: + print(renderer) + raise + +def small_video_item_info(compact_video_renderer): + renderer = compact_video_renderer + return { + "title": renderer['title']['simpleText'], + "id": renderer['videoId'], + "views": renderer['viewCountText'].get('simpleText', None) or renderer['viewCountText']['runs'][0]['text'], + "duration": default_multi_get(renderer, 'lengthText', 'simpleText', default=''), # livestreams dont have a length + "author": renderer['longBylineText']['runs'][0]['text'], + "author_url": renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'], + } + + +# ----- +# HTML +# ----- + +def small_video_item_html(item): + video_info = json.dumps({key: item[key] for key in ('id', 'title', 'author', 'duration')}) + return small_video_item_template.substitute( + title = html.escape(item["title"]), + views = item["views"], + author = html.escape(item["author"]), + duration = item["duration"], + url = URL_ORIGIN + "/watch?v=" + item["id"], + thumbnail = get_thumbnail_url(item['id']), + video_info = html.escape(json.dumps(video_info)), + ) + +def small_playlist_item_html(item): + return small_playlist_item_template.substitute( + title=html.escape(item["title"]), + size = item['size'], + author="", + url = URL_ORIGIN + "/playlist?list=" + item["id"], + thumbnail= get_thumbnail_url(item['first_video_id']), + ) + +def medium_playlist_item_html(item): + return medium_playlist_item_template.substitute( + title=html.escape(item["title"]), + size = item['size'], + author=item['author'], + author_url= URL_ORIGIN + item['author_url'], + url = URL_ORIGIN + "/playlist?list=" + item["id"], + thumbnail= item['thumbnail'], + ) + +def medium_video_item_html(medium_video_info): + info = medium_video_info + + return medium_video_item_template.substitute( + title=html.escape(info["title"]), + views=info["views"], + published = info["published"], + description = format_text_runs(info["description"]), + author=html.escape(info["author"]), + author_url=info["author_url"], + duration=info["duration"], + url = URL_ORIGIN + "/watch?v=" + info["id"], + thumbnail=info['thumbnail'], + datetime='', # TODO + ) + +html_functions = { + 'compactVideoRenderer': lambda x: small_video_item_html(small_video_item_info(x)), + 'videoRenderer': lambda x: medium_video_item_html(medium_video_item_info(x)), + 'compactPlaylistRenderer': lambda x: small_playlist_item_html(small_playlist_item_info(x)), + 'playlistRenderer': lambda x: medium_playlist_item_html(medium_playlist_item_info(x)), + 'channelRenderer': lambda x: '', + 'radioRenderer': lambda x: medium_playlist_item_html(medium_playlist_item_info(x)), + 'compactRadioRenderer': lambda x: small_playlist_item_html(small_playlist_item_info(x)), + 'didYouMeanRenderer': lambda x: '', +} + + + + + + + +def get_url(node): + try: + return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] + except KeyError: + return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] + + +def get_text(node): + try: + return node['simpleText'] + except KeyError: + return node['runs'][0]['text'] + +def get_formatted_text(node): + try: + return node['runs'] + except KeyError: + return node['simpleText'] + +def get_badges(node): + badges = [] + for badge_node in node: + badge = badge_node['metadataBadgeRenderer']['label'] + if badge.lower() != 'new': + badges.append(badge) + return badges + +def get_thumbnail(node): + try: + return node['thumbnails'][0]['url'] # polymer format + except KeyError: + return node['url'] # ajax format + +dispatch = { + +# polymer format + 'title': ('title', get_text), + 'publishedTimeText': ('published', get_text), + 'videoId': ('id', lambda node: node), + 'descriptionSnippet': ('description', get_formatted_text), + 'lengthText': ('duration', get_text), + 'thumbnail': ('thumbnail', get_thumbnail), + 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']), + + 'videoCountText': ('size', get_text), + 'playlistId': ('id', lambda node: node), + + 'subscriberCountText': ('subscriber_count', get_text), + 'channelId': ('id', lambda node: node), + 'badges': ('badges', get_badges), + +# ajax format + 'view_count_text': ('views', get_text), + 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]), + 'owner_text': ('author', get_text), + 'owner_endpoint': ('author_url', lambda node: node['url']), + 'description': ('description', get_formatted_text), + 'index': ('playlist_index', get_text), + 'short_byline': ('author', get_text), + 'length': ('duration', get_text), + 'video_id': ('id', lambda node: node), + +} + +def renderer_info(renderer): + try: + info = {} + if 'viewCountText' in renderer: # prefer this one as it contains all the digits + info['views'] = get_text(renderer['viewCountText']) + elif 'shortViewCountText' in renderer: + info['views'] = get_text(renderer['shortViewCountText']) + + for key, node in renderer.items(): + if key in ('longBylineText', 'shortBylineText'): + info['author'] = get_text(node) + try: + info['author_url'] = get_url(node) + except KeyError: + pass + + continue + + try: + simple_key, function = dispatch[key] + except KeyError: + continue + info[simple_key] = function(node) + return info + except KeyError: + print(renderer) + raise + +def ajax_info(item_json): + try: + info = {} + for key, node in item_json.items(): + try: + simple_key, function = dispatch[key] + except KeyError: + continue + info[simple_key] = function(node) + return info + except KeyError: + print(item_json) + raise + +def badges_html(badges): + return ' | '.join(map(html.escape, badges)) + + + + + +html_transform_dispatch = { + 'title': html.escape, + 'published': html.escape, + 'id': html.escape, + 'description': format_text_runs, + 'duration': html.escape, + 'thumbnail': lambda url: html.escape('/' + url.lstrip('/')), + 'size': html.escape, + 'author': html.escape, + 'author_url': lambda url: html.escape(URL_ORIGIN + url), + 'views': html.escape, + 'subscriber_count': html.escape, + 'badges': badges_html, + 'playlist_index': html.escape, +} + +def get_html_ready(item): + html_ready = {} + for key, value in item.items(): + try: + function = html_transform_dispatch[key] + except KeyError: + continue + html_ready[key] = function(value) + return html_ready + + +author_template_url = Template('''
By $author
''') +author_template = Template('''
By $author
''') +stat_templates = ( + Template('''$views'''), + Template(''''''), +) +def get_video_stats(html_ready): + stats = [] + if 'author' in html_ready: + if 'author_url' in html_ready: + stats.append(author_template_url.substitute(html_ready)) + else: + stats.append(author_template.substitute(html_ready)) + for stat in stat_templates: + try: + stats.append(stat.strict_substitute(html_ready)) + except KeyError: + pass + return ' | '.join(stats) + +def video_item_html(item, template): + html_ready = get_html_ready(item) + video_info = {} + for key in ('id', 'title', 'author'): + try: + video_info[key] = html_ready[key] + except KeyError: + video_info[key] = '' + try: + video_info['duration'] = html_ready['duration'] + except KeyError: + video_info['duration'] = 'Live' # livestreams don't have a duration + + html_ready['video_info'] = html.escape(json.dumps(video_info) ) + html_ready['url'] = URL_ORIGIN + "/watch?v=" + html_ready['id'] + html_ready['datetime'] = '' #TODO + + html_ready['stats'] = get_video_stats(html_ready) + + return template.substitute(html_ready) + + +def playlist_item_html(item, template): + html_ready = get_html_ready(item) + + html_ready['url'] = URL_ORIGIN + "/playlist?list=" + html_ready['id'] + html_ready['datetime'] = '' #TODO + return template.substitute(html_ready) + + + + + + +def make_query_string(query_string): + return '&'.join(key + '=' + ','.join(values) for key,values in query_string.items()) + +def update_query_string(query_string, items): + parameters = urllib.parse.parse_qs(query_string) + parameters.update(items) + return make_query_string(parameters) + +page_button_template = Template('''$page''') +current_page_button_template = Template('''
$page
''') + +def page_buttons_html(current_page, estimated_pages, url, current_query_string): + if current_page <= 5: + page_start = 1 + page_end = min(9, estimated_pages) + else: + page_start = current_page - 4 + page_end = min(current_page + 4, estimated_pages) + + result = "" + for page in range(page_start, page_end+1): + if page == current_page: + template = current_page_button_template + else: + template = page_button_template + result += template.substitute(page=page, href = url + "?" + update_query_string(current_query_string, {'page': [str(page)]}) ) + return result + + + + + + + +showing_results_for = Template(''' +
+
Showing results for $corrected_query
+
Search instead for $original_query
+
+''') + +did_you_mean = Template(''' +
+
Did you mean $corrected_query
+
+''') + +def renderer_html(renderer, additional_info={}, current_query_string=''): + type = list(renderer.keys())[0] + renderer = renderer[type] + if type in ('videoRenderer', 'playlistRenderer', 'radioRenderer', 'compactVideoRenderer', 'compactPlaylistRenderer', 'compactRadioRenderer', 'gridVideoRenderer', 'gridPlaylistRenderer', 'gridRadioRenderer'): + info = renderer_info(renderer) + info.update(additional_info) + if type == 'compactVideoRenderer': + return video_item_html(info, small_video_item_template) + if type in ('compactPlaylistRenderer', 'compactRadioRenderer'): + return playlist_item_html(info, small_playlist_item_template) + if type in ('videoRenderer', 'gridVideoRenderer'): + return video_item_html(info, medium_video_item_template) + if type in ('playlistRenderer', 'gridPlaylistRenderer', 'radioRenderer', 'gridRadioRenderer'): + return playlist_item_html(info, medium_playlist_item_template) + + if type == 'channelRenderer': + info = renderer_info(renderer) + html_ready = get_html_ready(info) + html_ready['url'] = URL_ORIGIN + "/channel/" + html_ready['id'] + return medium_channel_item_template.substitute(html_ready) + + if type == 'movieRenderer': + return '' + print(renderer) + raise NotImplementedError('Unknown renderer type: ' + type) + + +'videoRenderer' +'playlistRenderer' +'channelRenderer' +'radioRenderer' +'gridVideoRenderer' +'gridPlaylistRenderer' + +'didYouMeanRenderer' +'showingResultsForRenderer' -- cgit v1.2.3