from youtube import util import html import json import re import urllib # videos (all of type str): # id # title # url # author # author_url # thumbnail # description # published # duration # likes # dislikes # views # playlist_index # playlists: # id # title # url # author # author_url # thumbnail # description # updated # size # first_video_id def get_plain_text(node): try: return node['simpleText'] except KeyError: return ''.join(text_run['text'] for text_run in node['runs']) def format_text_runs(runs): if isinstance(runs, str): return runs result = '' for text_run in runs: if text_run.get("bold", False): result += "" + html.escape(text_run["text"]) + "" elif text_run.get('italics', False): result += "" + html.escape(text_run["text"]) + "" else: result += html.escape(text_run["text"]) return result def get_url(node): try: return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] except KeyError: return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url'] def get_text(node): if node == {}: return '' try: return node['simpleText'] except KeyError: pass try: return node['runs'][0]['text'] except IndexError: # empty text runs return '' except KeyError: print(node) raise def get_formatted_text(node): try: return node['runs'] except KeyError: return node['simpleText'] def get_badges(node): badges = [] for badge_node in node: badge = badge_node['metadataBadgeRenderer']['label'] badges.append(badge) return badges def get_thumbnail(node): try: return node['thumbnails'][0]['url'] # polymer format except KeyError: return node['url'] # ajax format dispatch = { # polymer format 'title': ('title', get_text), 'publishedTimeText': ('published', get_text), 'videoId': ('id', lambda node: node), 'descriptionSnippet': ('description', get_formatted_text), 'lengthText': ('duration', get_text), 'thumbnail': ('thumbnail', get_thumbnail), 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']), 'viewCountText': ('views', get_text), 'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos" 'videoCountText': ('size', get_text), 'playlistId': ('id', lambda node: node), 'descriptionText': ('description', get_formatted_text), 'subscriberCountText': ('subscriber_count', get_text), 'channelId': ('id', lambda node: node), 'badges': ('badges', get_badges), # ajax format 'view_count_text': ('views', get_text), 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]), 'owner_text': ('author', get_text), 'owner_endpoint': ('author_url', lambda node: node['url']), 'description': ('description', get_formatted_text), 'index': ('playlist_index', get_text), 'short_byline': ('author', get_text), 'length': ('duration', get_text), 'video_id': ('id', lambda node: node), } def ajax_info(item_json): try: info = {} for key, node in item_json.items(): try: simple_key, function = dispatch[key] except KeyError: continue info[simple_key] = function(node) return info except KeyError: print(item_json) raise youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$') def normalize_url(url): match = youtube_url_re.fullmatch(url) if match is None: raise Exception() return 'https://www.youtube.com' + match.group(1) def prefix_urls(item): try: item['thumbnail'] = util.prefix_url(item['thumbnail']) except KeyError: pass try: item['author_url'] = util.prefix_url(item['author_url']) except KeyError: pass def add_extra_html_info(item): if item['type'] == 'video': item['url'] = util.URL_ORIGIN + '/watch?v=' + item['id'] video_info = {} for key in ('id', 'title', 'author', 'duration'): try: video_info[key] = item[key] except KeyError: video_info[key] = '' item['video_info'] = json.dumps(video_info) elif item['type'] == 'playlist': item['url'] = util.URL_ORIGIN + '/playlist?list=' + item['id'] elif item['type'] == 'channel': item['url'] = util.URL_ORIGIN + "/channel/" + item['id'] def renderer_info(renderer, additional_info={}): type = list(renderer.keys())[0] renderer = renderer[type] info = {} if type == 'itemSectionRenderer': return renderer_info(renderer['contents'][0], additional_info) if type in ('movieRenderer', 'clarificationRenderer'): info['type'] = 'unsupported' return info info.update(additional_info) if type in ('compactVideoRenderer', 'videoRenderer', 'playlistVideoRenderer', 'gridVideoRenderer'): info['type'] = 'video' elif type in ('playlistRenderer', 'compactPlaylistRenderer', 'gridPlaylistRenderer', 'radioRenderer', 'compactRadioRenderer', 'gridRadioRenderer', 'showRenderer', 'compactShowRenderer', 'gridShowRenderer'): info['type'] = 'playlist' elif type == 'channelRenderer': info['type'] = 'channel' elif type == 'playlistHeaderRenderer': info['type'] = 'playlist_metadata' else: info['type'] = 'unsupported' return info try: if 'viewCountText' in renderer: # prefer this one as it contains all the digits info['views'] = get_text(renderer['viewCountText']) elif 'shortViewCountText' in renderer: info['views'] = get_text(renderer['shortViewCountText']) if 'ownerText' in renderer: info['author'] = renderer['ownerText']['runs'][0]['text'] info['author_url'] = normalize_url(renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']) try: overlays = renderer['thumbnailOverlays'] except KeyError: pass else: for overlay in overlays: if 'thumbnailOverlayTimeStatusRenderer' in overlay: info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text']) # show renderers don't have videoCountText elif 'thumbnailOverlayBottomPanelRenderer' in overlay: info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text']) # show renderers don't have playlistId, have to dig into the url to get it try: info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId'] except KeyError: pass for key, node in renderer.items(): if key in ('longBylineText', 'shortBylineText'): info['author'] = get_text(node) try: info['author_url'] = normalize_url(get_url(node)) except KeyError: pass # show renderers don't have thumbnail key at top level, dig into thumbnailRenderer elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node: info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url'] else: try: simple_key, function = dispatch[key] except KeyError: continue info[simple_key] = function(node) if info['type'] == 'video' and 'duration' not in info: info['duration'] = 'Live' return info except KeyError: print(renderer) raise def parse_info_prepare_for_html(renderer, additional_info={}): item = renderer_info(renderer, additional_info) prefix_urls(item) add_extra_html_info(item) return item def extract_channel_info(polymer_json, tab): info = {'errors': []} response = polymer_json[1]['response'] try: microformat = response['microformat']['microformatDataRenderer'] # channel doesn't exist or was terminated # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org except KeyError: if 'alerts' in response and len(response['alerts']) > 0: for alert in response['alerts']: info['errors'].append(alert['alertRenderer']['text']['simpleText']) return info elif 'errors' in response['responseContext']: for error in response['responseContext']['errors']['error']: if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id': info['errors'].append('This channel does not exist') return info info['errors'].append('Failure getting microformat') return info info['current_tab'] = tab # stuff from microformat (info given by youtube for every page on channel) info['short_description'] = microformat['description'] info['channel_name'] = microformat['title'] info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url'] channel_url = microformat['urlCanonical'].rstrip('/') channel_id = channel_url[channel_url.rfind('/')+1:] info['channel_id'] = channel_id info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id info['items'] = [] # empty channel if 'contents' not in response and 'continuationContents' not in response: return info # find the tab with content # example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg # TODO: maybe use the 'selected' attribute for this? if 'continuationContents' not in response: tab_renderer = None tab_content = None for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']: try: tab_renderer = tab_json['tabRenderer'] except KeyError: tab_renderer = tab_json['expandableTabRenderer'] try: tab_content = tab_renderer['content'] break except KeyError: pass else: # didn't break raise Exception("No tabs found with content") assert tab == tab_renderer['title'].lower() # extract tab-specific info if tab in ('videos', 'playlists', 'search'): # find the list of items if 'continuationContents' in response: try: items = response['continuationContents']['gridContinuation']['items'] except KeyError: items = response['continuationContents']['sectionListContinuation']['contents'] # for search else: contents = tab_content['sectionListRenderer']['contents'] if 'itemSectionRenderer' in contents[0]: item_section = contents[0]['itemSectionRenderer']['contents'][0] try: items = item_section['gridRenderer']['items'] except KeyError: if "messageRenderer" in item_section: items = [] else: raise Exception('gridRenderer missing but messageRenderer not found') else: items = contents # for search additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} info['items'] = [renderer_info(renderer, additional_info) for renderer in items] elif tab == 'about': channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer'] info['links'] = [] for link_json in channel_metadata.get('primaryLinks', ()): url = link_json['navigationEndpoint']['urlEndpoint']['url'] if url.startswith('/redirect'): # youtube puts these on external links to do tracking query_string = url[url.find('?')+1: ] url = urllib.parse.parse_qs(query_string)['q'][0] text = get_plain_text(link_json['title']) info['links'].append( (text, url) ) info['stats'] = [] for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): try: stat = channel_metadata[stat_name] except KeyError: continue info['stats'].append(get_plain_text(stat)) if 'description' in channel_metadata: info['description'] = get_text(channel_metadata['description']) else: info['description'] = '' else: raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) return info