from youtube import util, proto
import html
import json
import re
import urllib
from math import ceil
# videos (all of type str):
# id
# title
# url
# author
# author_url
# thumbnail
# description
# published
# duration
# likes
# dislikes
# views
# playlist_index
# playlists:
# id
# title
# url
# author
# author_url
# thumbnail
# description
# updated
# size
# first_video_id
def get_plain_text(node):
try:
return node['simpleText']
except KeyError:
return ''.join(text_run['text'] for text_run in node['runs'])
def format_text_runs(runs):
if isinstance(runs, str):
return runs
result = ''
for text_run in runs:
if text_run.get("bold", False):
result += "" + html.escape(text_run["text"]) + ""
elif text_run.get('italics', False):
result += "" + html.escape(text_run["text"]) + ""
else:
result += html.escape(text_run["text"])
return result
def default_multi_get(object, *keys, default):
''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
try:
for key in keys:
object = object[key]
return object
except (IndexError, KeyError):
return default
def get_url(node):
try:
return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
except KeyError:
return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
def get_text(node):
if node == {}:
return ''
try:
return node['simpleText']
except KeyError:
pass
try:
return node['runs'][0]['text']
except IndexError: # empty text runs
return ''
except KeyError:
print(node)
raise
def get_formatted_text(node):
try:
return node['runs']
except KeyError:
return node['simpleText']
def get_badges(node):
badges = []
for badge_node in node:
badge = badge_node['metadataBadgeRenderer']['label']
badges.append(badge)
return badges
def get_thumbnail(node):
try:
return node['thumbnails'][0]['url'] # polymer format
except KeyError:
return node['url'] # ajax format
dispatch = {
# polymer format
'title': ('title', get_text),
'publishedTimeText': ('published', get_text),
'videoId': ('id', lambda node: node),
'descriptionSnippet': ('description', get_formatted_text),
'lengthText': ('duration', get_text),
'thumbnail': ('thumbnail', get_thumbnail),
'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']),
'viewCountText': ('views', get_text),
'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos"
'videoCountText': ('size', get_text),
'playlistId': ('id', lambda node: node),
'descriptionText': ('description', get_formatted_text),
'subscriberCountText': ('subscriber_count', get_text),
'channelId': ('id', lambda node: node),
'badges': ('badges', get_badges),
# ajax format
'view_count_text': ('views', get_text),
'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]),
'owner_text': ('author', get_text),
'owner_endpoint': ('author_url', lambda node: node['url']),
'description': ('description', get_formatted_text),
'index': ('playlist_index', get_text),
'short_byline': ('author', get_text),
'length': ('duration', get_text),
'video_id': ('id', lambda node: node),
}
def ajax_info(item_json):
try:
info = {}
for key, node in item_json.items():
try:
simple_key, function = dispatch[key]
except KeyError:
continue
info[simple_key] = function(node)
return info
except KeyError:
print(item_json)
raise
youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$')
def normalize_url(url):
match = youtube_url_re.fullmatch(url)
if match is None:
raise Exception()
return 'https://www.youtube.com' + match.group(1)
def prefix_urls(item):
try:
item['thumbnail'] = util.prefix_url(item['thumbnail'])
except KeyError:
pass
try:
item['author_url'] = util.prefix_url(item['author_url'])
except KeyError:
pass
def add_extra_html_info(item):
if item['type'] == 'video':
item['url'] = util.URL_ORIGIN + '/watch?v=' + item['id']
video_info = {}
for key in ('id', 'title', 'author', 'duration'):
try:
video_info[key] = item[key]
except KeyError:
video_info[key] = ''
item['video_info'] = json.dumps(video_info)
elif item['type'] == 'playlist':
item['url'] = util.URL_ORIGIN + '/playlist?list=' + item['id']
elif item['type'] == 'channel':
item['url'] = util.URL_ORIGIN + "/channel/" + item['id']
def renderer_info(renderer, additional_info={}):
type = list(renderer.keys())[0]
renderer = renderer[type]
info = {}
if type == 'itemSectionRenderer':
return renderer_info(renderer['contents'][0], additional_info)
if type in ('movieRenderer', 'clarificationRenderer'):
info['type'] = 'unsupported'
return info
info.update(additional_info)
if type in ('compactVideoRenderer', 'videoRenderer', 'playlistVideoRenderer', 'gridVideoRenderer'):
info['type'] = 'video'
elif type in ('playlistRenderer', 'compactPlaylistRenderer', 'gridPlaylistRenderer',
'radioRenderer', 'compactRadioRenderer', 'gridRadioRenderer',
'showRenderer', 'compactShowRenderer', 'gridShowRenderer'):
info['type'] = 'playlist'
elif type == 'channelRenderer':
info['type'] = 'channel'
elif type == 'playlistHeaderRenderer':
info['type'] = 'playlist_metadata'
else:
info['type'] = 'unsupported'
return info
try:
if 'viewCountText' in renderer: # prefer this one as it contains all the digits
info['views'] = get_text(renderer['viewCountText'])
elif 'shortViewCountText' in renderer:
info['views'] = get_text(renderer['shortViewCountText'])
if 'ownerText' in renderer:
info['author'] = renderer['ownerText']['runs'][0]['text']
info['author_url'] = normalize_url(renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'])
try:
overlays = renderer['thumbnailOverlays']
except KeyError:
pass
else:
for overlay in overlays:
if 'thumbnailOverlayTimeStatusRenderer' in overlay:
info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text'])
# show renderers don't have videoCountText
elif 'thumbnailOverlayBottomPanelRenderer' in overlay:
info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text'])
# show renderers don't have playlistId, have to dig into the url to get it
try:
info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId']
except KeyError:
pass
for key, node in renderer.items():
if key in ('longBylineText', 'shortBylineText'):
info['author'] = get_text(node)
try:
info['author_url'] = normalize_url(get_url(node))
except KeyError:
pass
# show renderers don't have thumbnail key at top level, dig into thumbnailRenderer
elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node:
info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url']
else:
try:
simple_key, function = dispatch[key]
except KeyError:
continue
info[simple_key] = function(node)
if info['type'] == 'video' and 'duration' not in info:
info['duration'] = 'Live'
return info
except KeyError:
print(renderer)
raise
def parse_info_prepare_for_html(renderer, additional_info={}):
item = renderer_info(renderer, additional_info)
prefix_urls(item)
add_extra_html_info(item)
return item
def get_response(polymer_json):
'''return response, error'''
# responses returned for desktop version
try:
return polymer_json[1]['response'], None
except (TypeError, KeyError, IndexError):
pass
# responses returned for mobile version
try:
return polymer_json['response'], None
except (TypeError, KeyError):
pass
return None, 'Failed to extract response'
def extract_channel_info(polymer_json, tab):
response, err = get_response(polymer_json)
if err:
return {'error': err}
try:
microformat = response['microformat']['microformatDataRenderer']
# channel doesn't exist or was terminated
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
except KeyError:
if 'alerts' in response and len(response['alerts']) > 0:
return {'error': ' '.join(alert['alertRenderer']['text']['simpleText'] for alert in response['alerts']) }
elif 'errors' in response['responseContext']:
for error in response['responseContext']['errors']['error']:
if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id':
return {'error': 'This channel does not exist'}
return {'error': 'Failure getting microformat'}
info = {'error': None}
info['current_tab'] = tab
# stuff from microformat (info given by youtube for every page on channel)
info['short_description'] = microformat['description']
info['channel_name'] = microformat['title']
info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url']
channel_url = microformat['urlCanonical'].rstrip('/')
channel_id = channel_url[channel_url.rfind('/')+1:]
info['channel_id'] = channel_id
info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
info['items'] = []
# empty channel
if 'contents' not in response and 'continuationContents' not in response:
return info
# find the tab with content
# example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg
# TODO: maybe use the 'selected' attribute for this?
if 'continuationContents' not in response:
tab_renderer = None
tab_content = None
for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']:
try:
tab_renderer = tab_json['tabRenderer']
except KeyError:
tab_renderer = tab_json['expandableTabRenderer']
try:
tab_content = tab_renderer['content']
break
except KeyError:
pass
else: # didn't break
raise Exception("No tabs found with content")
assert tab == tab_renderer['title'].lower()
# extract tab-specific info
if tab in ('videos', 'playlists', 'search'): # find the list of items
if 'continuationContents' in response:
try:
items = response['continuationContents']['gridContinuation']['items']
except KeyError:
items = response['continuationContents']['sectionListContinuation']['contents'] # for search
else:
contents = tab_content['sectionListRenderer']['contents']
if 'itemSectionRenderer' in contents[0]:
item_section = contents[0]['itemSectionRenderer']['contents'][0]
try:
items = item_section['gridRenderer']['items']
except KeyError:
if "messageRenderer" in item_section:
items = []
else:
raise Exception('gridRenderer missing but messageRenderer not found')
else:
items = contents # for search
additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id}
info['items'] = [renderer_info(renderer, additional_info) for renderer in items]
elif tab == 'about':
channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer']
info['links'] = []
for link_json in channel_metadata.get('primaryLinks', ()):
url = link_json['navigationEndpoint']['urlEndpoint']['url']
if url.startswith('/redirect'): # youtube puts these on external links to do tracking
query_string = url[url.find('?')+1: ]
url = urllib.parse.parse_qs(query_string)['q'][0]
text = get_plain_text(link_json['title'])
info['links'].append( (text, url) )
info['stats'] = []
for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
try:
stat = channel_metadata[stat_name]
except KeyError:
continue
info['stats'].append(get_plain_text(stat))
if 'description' in channel_metadata:
info['description'] = get_text(channel_metadata['description'])
else:
info['description'] = ''
else:
raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
return info
def extract_search_info(polymer_json):
response, err = get_response(polymer_json)
if err:
return {'error': err}
info = {'error': None}
info['estimated_results'] = int(response['estimatedResults'])
info['estimated_pages'] = ceil(info['estimated_results']/20)
# almost always is the first "section", but if there's an advertisement for a google product like Stadia or Home in the search results, then that becomes the first "section" and the search results are in the second. So just join all of them for resiliency
results = []
for section in response['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents']:
results += section['itemSectionRenderer']['contents']
info['items'] = []
info['corrections'] = {'type': None}
for renderer in results:
type = list(renderer.keys())[0]
if type == 'shelfRenderer':
continue
if type == 'didYouMeanRenderer':
renderer = renderer[type]
corrected_query_string = request.args.to_dict(flat=False)
corrected_query_string['query'] = [renderer['correctedQueryEndpoint']['searchEndpoint']['query']]
corrected_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
info['corrections'] = {
'type': 'did_you_mean',
'corrected_query': yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
'corrected_query_url': corrected_query_url,
}
continue
if type == 'showingResultsForRenderer':
renderer = renderer[type]
no_autocorrect_query_string = request.args.to_dict(flat=False)
no_autocorrect_query_string['autocorrect'] = ['0']
no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
info['corrections'] = {
'type': 'showing_results_for',
'corrected_query': yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
'original_query_url': no_autocorrect_query_url,
'original_query': renderer['originalQuery']['simpleText'],
}
continue
item_info = renderer_info(renderer)
if item_info['type'] != 'unsupported':
info['items'].append(item_info)
return info
def extract_playlist_metadata(polymer_json):
response, err = get_response(polymer_json)
if err:
return {'error': err}
metadata = renderer_info(response['header'])
metadata['error'] = None
if 'description' not in metadata:
metadata['description'] = ''
metadata['size'] = int(metadata['size'].replace(',', ''))
return metadata
def extract_playlist_info(polymer_json):
response, err = get_response(polymer_json)
if err:
return {'error': err}
info = {'error': None}
try: # first page
video_list = response['contents']['singleColumnBrowseResultsRenderer']['tabs'][0]['tabRenderer']['content']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['playlistVideoListRenderer']['contents']
first_page = True
except KeyError: # other pages
video_list = response['continuationContents']['playlistVideoListContinuation']['contents']
first_page = False
info['items'] = [renderer_info(renderer) for renderer in video_list]
if first_page:
info['metadata'] = extract_playlist_metadata(polymer_json)
return info
def ctoken_metadata(ctoken):
result = dict()
params = proto.parse(proto.b64_to_bytes(ctoken))
result['video_id'] = proto.parse(params[2])[2].decode('ascii')
offset_information = proto.parse(params[6])
result['offset'] = offset_information.get(5, 0)
result['is_replies'] = False
if (3 in offset_information) and (2 in proto.parse(offset_information[3])):
result['is_replies'] = True
result['sort'] = None
else:
try:
result['sort'] = proto.parse(offset_information[4])[6]
except KeyError:
result['sort'] = 0
return result
def parse_comments_polymer(polymer_json):
try:
video_title = ''
response, err = get_response(polymer_json)
if err:
raise Exception(err)
try:
url = polymer_json[1]['url']
except (TypeError, IndexError, KeyError):
url = polymer_json['url']
ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
metadata = ctoken_metadata(ctoken)
try:
comments_raw = response['continuationContents']['commentSectionContinuation']['items']
except KeyError:
comments_raw = response['continuationContents']['commentRepliesContinuation']['contents']
ctoken = default_multi_get(response, 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='')
comments = []
for comment_json in comments_raw:
number_of_replies = 0
try:
comment_thread = comment_json['commentThreadRenderer']
except KeyError:
comment_renderer = comment_json['commentRenderer']
else:
if 'commentTargetTitle' in comment_thread:
video_title = comment_thread['commentTargetTitle']['runs'][0]['text']
if 'replies' in comment_thread:
view_replies_text = get_plain_text(comment_thread['replies']['commentRepliesRenderer']['moreText'])
view_replies_text = view_replies_text.replace(',', '')
match = re.search(r'(\d+)', view_replies_text)
if match is None:
number_of_replies = 1
else:
number_of_replies = int(match.group(1))
comment_renderer = comment_thread['comment']['commentRenderer']
comment = {
'author_id': comment_renderer.get('authorId', ''),
'author_avatar': comment_renderer['authorThumbnail']['thumbnails'][0]['url'],
'likes': comment_renderer['likeCount'],
'published': get_plain_text(comment_renderer['publishedTimeText']),
'text': comment_renderer['contentText'].get('runs', ''),
'number_of_replies': number_of_replies,
'comment_id': comment_renderer['commentId'],
}
if 'authorText' in comment_renderer: # deleted channels have no name or channel link
comment['author'] = get_plain_text(comment_renderer['authorText'])
comment['author_url'] = comment_renderer['authorEndpoint']['commandMetadata']['webCommandMetadata']['url']
comment['author_channel_id'] = comment_renderer['authorEndpoint']['browseEndpoint']['browseId']
else:
comment['author'] = ''
comment['author_url'] = ''
comment['author_channel_id'] = ''
comments.append(comment)
except Exception as e:
print('Error parsing comments: ' + str(e))
comments = ()
ctoken = ''
return {
'ctoken': ctoken,
'comments': comments,
'video_title': video_title,
'video_id': metadata['video_id'],
'offset': metadata['offset'],
'is_replies': metadata['is_replies'],
'sort': metadata['sort'],
}