diff options
Diffstat (limited to 'youtube/common.py')
-rw-r--r-- | youtube/common.py | 639 |
1 files changed, 639 insertions, 0 deletions
diff --git a/youtube/common.py b/youtube/common.py new file mode 100644 index 0000000..67bd81f --- /dev/null +++ b/youtube/common.py @@ -0,0 +1,639 @@ +from youtube.template import Template
+import html
+import json
+import re
+import urllib.parse
+import gzip
+import brotli
+import time
+
+
+URL_ORIGIN = "/https://www.youtube.com"
+
+
+# videos (all of type str):
+
+# id
+# title
+# url
+# author
+# author_url
+# thumbnail
+# description
+# published
+# duration
+# likes
+# dislikes
+# views
+# playlist_index
+
+# playlists:
+
+# id
+# title
+# url
+# author
+# author_url
+# thumbnail
+# description
+# updated
+# size
+# first_video_id
+
+
+
+
+
+
+
+page_button_template = Template('''<a class="page-button" href="$href">$page</a>''')
+current_page_button_template = Template('''<div class="current-page-button">$page</a>''')
+
+medium_playlist_item_template = Template('''
+ <div class="medium-item">
+ <a class="playlist-thumbnail-box" href="$url" title="$title">
+ <img class="playlist-thumbnail-img" src="$thumbnail">
+ <div class="playlist-thumbnail-info">
+ <span>$size</span>
+ </div>
+ </a>
+
+ <a class="title" href="$url" title=$title>$title</a>
+
+ <address><a href="$author_url">$author</a></address>
+ </div>
+''')
+medium_video_item_template = Template('''
+ <div class="medium-item">
+ <a class="video-thumbnail-box" href="$url" title="$title">
+ <img class="video-thumbnail-img" src="$thumbnail">
+ <span class="video-duration">$duration</span>
+ </a>
+
+ <a class="title" href="$url">$title</a>
+
+ <div class="stats">$stats</div>
+ <!--
+ <address><a href="$author_url">$author</a></address>
+ <span class="views">$views</span>
+ <time datetime="$datetime">Uploaded $published</time>-->
+
+ <span class="description">$description</span>
+ <span class="badges">$badges</span>
+ </div>
+''')
+
+small_video_item_template = Template('''
+ <div class="small-item-box">
+ <div class="small-item">
+ <a class="video-thumbnail-box" href="$url" title="$title">
+ <img class="video-thumbnail-img" src="$thumbnail">
+ <span class="video-duration">$duration</span>
+ </a>
+ <a class="title" href="$url" title="$title">$title</a>
+
+ <address>$author</address>
+ <span class="views">$views</span>
+
+ </div>
+ <input class="item-checkbox" type="checkbox" name="video_info_list" value="$video_info" form="playlist-add">
+ </div>
+''')
+
+small_playlist_item_template = Template('''
+ <div class="small-item-box">
+ <div class="small-item">
+ <a class="playlist-thumbnail-box" href="$url" title="$title">
+ <img class="playlist-thumbnail-img" src="$thumbnail">
+ <div class="playlist-thumbnail-info">
+ <span>$size</span>
+ </div>
+ </a>
+ <a class="title" href="$url" title="$title">$title</a>
+
+ <address>$author</address>
+ </div>
+ </div>
+''')
+
+medium_channel_item_template = Template('''
+ <div class="medium-item">
+ <a class="video-thumbnail-box" href="$url" title="$title">
+ <img class="video-thumbnail-img" src="$thumbnail">
+ <span class="video-duration">$duration</span>
+ </a>
+
+ <a class="title" href="$url">$title</a>
+
+ <span>$subscriber_count</span>
+ <span>$size</span>
+
+ <span class="description">$description</span>
+ </div>
+''')
+
+
+def fetch_url(url, headers=(), timeout=5, report_text=None):
+ if isinstance(headers, list):
+ headers += [('Accept-Encoding', 'gzip, br')]
+ headers = dict(headers)
+ elif isinstance(headers, tuple):
+ headers += (('Accept-Encoding', 'gzip, br'),)
+ headers = dict(headers)
+ else:
+ headers = headers.copy()
+ headers['Accept-Encoding'] = 'gzip, br'
+
+ start_time = time.time()
+
+ req = urllib.request.Request(url, headers=headers)
+ response = urllib.request.urlopen(req, timeout=timeout)
+ response_time = time.time()
+
+ content = response.read()
+ read_finish = time.time()
+ if report_text:
+ print(report_text, 'Latency:', response_time - start_time, ' Read time:', read_finish - response_time)
+ encodings = response.getheader('Content-Encoding', default='identity').replace(' ', '').split(',')
+ for encoding in reversed(encodings):
+ if encoding == 'identity':
+ continue
+ if encoding == 'br':
+ content = brotli.decompress(content)
+ elif encoding == 'gzip':
+ content = gzip.decompress(content)
+ return content
+
+mobile_ua = (('User-Agent', 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'),)
+
+def dict_add(*dicts):
+ for dictionary in dicts[1:]:
+ dicts[0].update(dictionary)
+ return dicts[0]
+
+def video_id(url):
+ url_parts = urllib.parse.urlparse(url)
+ return urllib.parse.parse_qs(url_parts.query)['v'][0]
+
+def uppercase_escape(s):
+ return re.sub(
+ r'\\U([0-9a-fA-F]{8})',
+ lambda m: chr(int(m.group(1), base=16)), s)
+
+def default_multi_get(object, *keys, default):
+ ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
+ try:
+ for key in keys:
+ object = object[key]
+ return object
+ except (IndexError, KeyError):
+ return default
+
+def get_plain_text(node):
+ try:
+ return html.escape(node['simpleText'])
+ except KeyError:
+ return unformmated_text_runs(node['runs'])
+
+def unformmated_text_runs(runs):
+ result = ''
+ for text_run in runs:
+ result += html.escape(text_run["text"])
+ return result
+
+def format_text_runs(runs):
+ if isinstance(runs, str):
+ return runs
+ result = ''
+ for text_run in runs:
+ if text_run.get("bold", False):
+ result += "<b>" + html.escape(text_run["text"]) + "</b>"
+ elif text_run.get('italics', False):
+ result += "<i>" + html.escape(text_run["text"]) + "</i>"
+ else:
+ result += html.escape(text_run["text"])
+ return result
+
+# default, sddefault, mqdefault, hqdefault, hq720
+def get_thumbnail_url(video_id):
+ return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
+
+def seconds_to_timestamp(seconds):
+ seconds = int(seconds)
+ hours, seconds = divmod(seconds,3600)
+ minutes, seconds = divmod(seconds,60)
+ if hours != 0:
+ timestamp = str(hours) + ":"
+ timestamp += str(minutes).zfill(2) # zfill pads with zeros
+ else:
+ timestamp = str(minutes)
+
+ timestamp += ":" + str(seconds).zfill(2)
+ return timestamp
+
+# playlists:
+
+# id
+# title
+# url
+# author
+# author_url
+# thumbnail
+# description
+# updated
+# size
+# first_video_id
+def medium_playlist_item_info(playlist_renderer):
+ renderer = playlist_renderer
+ try:
+ author_url = URL_ORIGIN + renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
+ except KeyError: # radioRenderer
+ author_url = ''
+ try:
+ thumbnail = renderer['thumbnails'][0]['thumbnails'][0]['url']
+ except KeyError:
+ thumbnail = renderer['thumbnail']['thumbnails'][0]['url']
+ return {
+ "title": renderer["title"]["simpleText"],
+ 'id': renderer["playlistId"],
+ 'size': renderer.get('videoCount', '50+'),
+ "author": default_multi_get(renderer,'longBylineText','runs',0,'text', default='Youtube'),
+ "author_url": author_url,
+ 'thumbnail': thumbnail,
+ }
+
+def medium_video_item_info(video_renderer):
+ renderer = video_renderer
+ try:
+ return {
+ "title": renderer["title"]["simpleText"],
+ "id": renderer["videoId"],
+ "description": renderer.get("descriptionSnippet",dict()).get('runs',[]), # a list of text runs (formmated), rather than plain text
+ "thumbnail": get_thumbnail_url(renderer["videoId"]),
+ "views": renderer['viewCountText'].get('simpleText', None) or renderer['viewCountText']['runs'][0]['text'],
+ "duration": default_multi_get(renderer, 'lengthText', 'simpleText', default=''), # livestreams dont have a length
+ "author": renderer['longBylineText']['runs'][0]['text'],
+ "author_url": URL_ORIGIN + renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'],
+ "published": default_multi_get(renderer, 'publishedTimeText', 'simpleText', default=''),
+ }
+ except KeyError:
+ print(renderer)
+ raise
+
+def small_video_item_info(compact_video_renderer):
+ renderer = compact_video_renderer
+ return {
+ "title": renderer['title']['simpleText'],
+ "id": renderer['videoId'],
+ "views": renderer['viewCountText'].get('simpleText', None) or renderer['viewCountText']['runs'][0]['text'],
+ "duration": default_multi_get(renderer, 'lengthText', 'simpleText', default=''), # livestreams dont have a length
+ "author": renderer['longBylineText']['runs'][0]['text'],
+ "author_url": renderer['longBylineText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'],
+ }
+
+
+# -----
+# HTML
+# -----
+
+def small_video_item_html(item):
+ video_info = json.dumps({key: item[key] for key in ('id', 'title', 'author', 'duration')})
+ return small_video_item_template.substitute(
+ title = html.escape(item["title"]),
+ views = item["views"],
+ author = html.escape(item["author"]),
+ duration = item["duration"],
+ url = URL_ORIGIN + "/watch?v=" + item["id"],
+ thumbnail = get_thumbnail_url(item['id']),
+ video_info = html.escape(json.dumps(video_info)),
+ )
+
+def small_playlist_item_html(item):
+ return small_playlist_item_template.substitute(
+ title=html.escape(item["title"]),
+ size = item['size'],
+ author="",
+ url = URL_ORIGIN + "/playlist?list=" + item["id"],
+ thumbnail= get_thumbnail_url(item['first_video_id']),
+ )
+
+def medium_playlist_item_html(item):
+ return medium_playlist_item_template.substitute(
+ title=html.escape(item["title"]),
+ size = item['size'],
+ author=item['author'],
+ author_url= URL_ORIGIN + item['author_url'],
+ url = URL_ORIGIN + "/playlist?list=" + item["id"],
+ thumbnail= item['thumbnail'],
+ )
+
+def medium_video_item_html(medium_video_info):
+ info = medium_video_info
+
+ return medium_video_item_template.substitute(
+ title=html.escape(info["title"]),
+ views=info["views"],
+ published = info["published"],
+ description = format_text_runs(info["description"]),
+ author=html.escape(info["author"]),
+ author_url=info["author_url"],
+ duration=info["duration"],
+ url = URL_ORIGIN + "/watch?v=" + info["id"],
+ thumbnail=info['thumbnail'],
+ datetime='', # TODO
+ )
+
+html_functions = {
+ 'compactVideoRenderer': lambda x: small_video_item_html(small_video_item_info(x)),
+ 'videoRenderer': lambda x: medium_video_item_html(medium_video_item_info(x)),
+ 'compactPlaylistRenderer': lambda x: small_playlist_item_html(small_playlist_item_info(x)),
+ 'playlistRenderer': lambda x: medium_playlist_item_html(medium_playlist_item_info(x)),
+ 'channelRenderer': lambda x: '',
+ 'radioRenderer': lambda x: medium_playlist_item_html(medium_playlist_item_info(x)),
+ 'compactRadioRenderer': lambda x: small_playlist_item_html(small_playlist_item_info(x)),
+ 'didYouMeanRenderer': lambda x: '',
+}
+
+
+
+
+
+
+
+def get_url(node):
+ try:
+ return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
+ except KeyError:
+ return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
+
+
+def get_text(node):
+ try:
+ return node['simpleText']
+ except KeyError:
+ return node['runs'][0]['text']
+
+def get_formatted_text(node):
+ try:
+ return node['runs']
+ except KeyError:
+ return node['simpleText']
+
+def get_badges(node):
+ badges = []
+ for badge_node in node:
+ badge = badge_node['metadataBadgeRenderer']['label']
+ if badge.lower() != 'new':
+ badges.append(badge)
+ return badges
+
+def get_thumbnail(node):
+ try:
+ return node['thumbnails'][0]['url'] # polymer format
+ except KeyError:
+ return node['url'] # ajax format
+
+dispatch = {
+
+# polymer format
+ 'title': ('title', get_text),
+ 'publishedTimeText': ('published', get_text),
+ 'videoId': ('id', lambda node: node),
+ 'descriptionSnippet': ('description', get_formatted_text),
+ 'lengthText': ('duration', get_text),
+ 'thumbnail': ('thumbnail', get_thumbnail),
+ 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']),
+
+ 'videoCountText': ('size', get_text),
+ 'playlistId': ('id', lambda node: node),
+
+ 'subscriberCountText': ('subscriber_count', get_text),
+ 'channelId': ('id', lambda node: node),
+ 'badges': ('badges', get_badges),
+
+# ajax format
+ 'view_count_text': ('views', get_text),
+ 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]),
+ 'owner_text': ('author', get_text),
+ 'owner_endpoint': ('author_url', lambda node: node['url']),
+ 'description': ('description', get_formatted_text),
+ 'index': ('playlist_index', get_text),
+ 'short_byline': ('author', get_text),
+ 'length': ('duration', get_text),
+ 'video_id': ('id', lambda node: node),
+
+}
+
+def renderer_info(renderer):
+ try:
+ info = {}
+ if 'viewCountText' in renderer: # prefer this one as it contains all the digits
+ info['views'] = get_text(renderer['viewCountText'])
+ elif 'shortViewCountText' in renderer:
+ info['views'] = get_text(renderer['shortViewCountText'])
+
+ for key, node in renderer.items():
+ if key in ('longBylineText', 'shortBylineText'):
+ info['author'] = get_text(node)
+ try:
+ info['author_url'] = get_url(node)
+ except KeyError:
+ pass
+
+ continue
+
+ try:
+ simple_key, function = dispatch[key]
+ except KeyError:
+ continue
+ info[simple_key] = function(node)
+ return info
+ except KeyError:
+ print(renderer)
+ raise
+
+def ajax_info(item_json):
+ try:
+ info = {}
+ for key, node in item_json.items():
+ try:
+ simple_key, function = dispatch[key]
+ except KeyError:
+ continue
+ info[simple_key] = function(node)
+ return info
+ except KeyError:
+ print(item_json)
+ raise
+
+def badges_html(badges):
+ return ' | '.join(map(html.escape, badges))
+
+
+
+
+
+html_transform_dispatch = {
+ 'title': html.escape,
+ 'published': html.escape,
+ 'id': html.escape,
+ 'description': format_text_runs,
+ 'duration': html.escape,
+ 'thumbnail': lambda url: html.escape('/' + url.lstrip('/')),
+ 'size': html.escape,
+ 'author': html.escape,
+ 'author_url': lambda url: html.escape(URL_ORIGIN + url),
+ 'views': html.escape,
+ 'subscriber_count': html.escape,
+ 'badges': badges_html,
+ 'playlist_index': html.escape,
+}
+
+def get_html_ready(item):
+ html_ready = {}
+ for key, value in item.items():
+ try:
+ function = html_transform_dispatch[key]
+ except KeyError:
+ continue
+ html_ready[key] = function(value)
+ return html_ready
+
+
+author_template_url = Template('''<address>By <a href="$author_url">$author</a></address>''')
+author_template = Template('''<address>By $author</address>''')
+stat_templates = (
+ Template('''<span class="views">$views</span>'''),
+ Template('''<time datetime="$datetime">$published</time>'''),
+)
+def get_video_stats(html_ready):
+ stats = []
+ if 'author' in html_ready:
+ if 'author_url' in html_ready:
+ stats.append(author_template_url.substitute(html_ready))
+ else:
+ stats.append(author_template.substitute(html_ready))
+ for stat in stat_templates:
+ try:
+ stats.append(stat.strict_substitute(html_ready))
+ except KeyError:
+ pass
+ return ' | '.join(stats)
+
+def video_item_html(item, template):
+ html_ready = get_html_ready(item)
+ video_info = {}
+ for key in ('id', 'title', 'author'):
+ try:
+ video_info[key] = html_ready[key]
+ except KeyError:
+ video_info[key] = ''
+ try:
+ video_info['duration'] = html_ready['duration']
+ except KeyError:
+ video_info['duration'] = 'Live' # livestreams don't have a duration
+
+ html_ready['video_info'] = html.escape(json.dumps(video_info) )
+ html_ready['url'] = URL_ORIGIN + "/watch?v=" + html_ready['id']
+ html_ready['datetime'] = '' #TODO
+
+ html_ready['stats'] = get_video_stats(html_ready)
+
+ return template.substitute(html_ready)
+
+
+def playlist_item_html(item, template):
+ html_ready = get_html_ready(item)
+
+ html_ready['url'] = URL_ORIGIN + "/playlist?list=" + html_ready['id']
+ html_ready['datetime'] = '' #TODO
+ return template.substitute(html_ready)
+
+
+
+
+
+
+def make_query_string(query_string):
+ return '&'.join(key + '=' + ','.join(values) for key,values in query_string.items())
+
+def update_query_string(query_string, items):
+ parameters = urllib.parse.parse_qs(query_string)
+ parameters.update(items)
+ return make_query_string(parameters)
+
+page_button_template = Template('''<a class="page-button" href="$href">$page</a>''')
+current_page_button_template = Template('''<div class="page-button">$page</div>''')
+
+def page_buttons_html(current_page, estimated_pages, url, current_query_string):
+ if current_page <= 5:
+ page_start = 1
+ page_end = min(9, estimated_pages)
+ else:
+ page_start = current_page - 4
+ page_end = min(current_page + 4, estimated_pages)
+
+ result = ""
+ for page in range(page_start, page_end+1):
+ if page == current_page:
+ template = current_page_button_template
+ else:
+ template = page_button_template
+ result += template.substitute(page=page, href = url + "?" + update_query_string(current_query_string, {'page': [str(page)]}) )
+ return result
+
+
+
+
+
+
+
+showing_results_for = Template('''
+ <div class="showing-results-for">
+ <div>Showing results for <a>$corrected_query</a></div>
+ <div>Search instead for <a href="$original_query_url">$original_query</a></div>
+ </div>
+''')
+
+did_you_mean = Template('''
+ <div class="did-you-mean">
+ <div>Did you mean <a href="$corrected_query_url">$corrected_query</a></div>
+ </div>
+''')
+
+def renderer_html(renderer, additional_info={}, current_query_string=''):
+ type = list(renderer.keys())[0]
+ renderer = renderer[type]
+ if type in ('videoRenderer', 'playlistRenderer', 'radioRenderer', 'compactVideoRenderer', 'compactPlaylistRenderer', 'compactRadioRenderer', 'gridVideoRenderer', 'gridPlaylistRenderer', 'gridRadioRenderer'):
+ info = renderer_info(renderer)
+ info.update(additional_info)
+ if type == 'compactVideoRenderer':
+ return video_item_html(info, small_video_item_template)
+ if type in ('compactPlaylistRenderer', 'compactRadioRenderer'):
+ return playlist_item_html(info, small_playlist_item_template)
+ if type in ('videoRenderer', 'gridVideoRenderer'):
+ return video_item_html(info, medium_video_item_template)
+ if type in ('playlistRenderer', 'gridPlaylistRenderer', 'radioRenderer', 'gridRadioRenderer'):
+ return playlist_item_html(info, medium_playlist_item_template)
+
+ if type == 'channelRenderer':
+ info = renderer_info(renderer)
+ html_ready = get_html_ready(info)
+ html_ready['url'] = URL_ORIGIN + "/channel/" + html_ready['id']
+ return medium_channel_item_template.substitute(html_ready)
+
+ if type == 'movieRenderer':
+ return ''
+ print(renderer)
+ raise NotImplementedError('Unknown renderer type: ' + type)
+
+
+'videoRenderer'
+'playlistRenderer'
+'channelRenderer'
+'radioRenderer'
+'gridVideoRenderer'
+'gridPlaylistRenderer'
+
+'didYouMeanRenderer'
+'showingResultsForRenderer'
|