From bd343ed71f628e0f1dd1eb3f45fb4e04887f223f Mon Sep 17 00:00:00 2001 From: James Taylor Date: Sun, 8 Sep 2019 17:28:11 -0700 Subject: Extraction: Move channel extraction to yt_data_extract --- youtube/channel.py | 122 +-------------------------------------------- youtube/subscriptions.py | 2 +- youtube/yt_data_extract.py | 121 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 121 deletions(-) diff --git a/youtube/channel.py b/youtube/channel.py index 79b7c9b..16d0a3f 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -137,124 +137,6 @@ def get_channel_search_json(channel_id, query, page): return polymer_json -def extract_info(polymer_json, tab): - response = polymer_json[1]['response'] - try: - microformat = response['microformat']['microformatDataRenderer'] - - # channel doesn't exist or was terminated - # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org - except KeyError: - if 'alerts' in response and len(response['alerts']) > 0: - result = '' - for alert in response['alerts']: - result += alert['alertRenderer']['text']['simpleText'] + '\n' - flask.abort(200, result) - elif 'errors' in response['responseContext']: - for error in response['responseContext']['errors']['error']: - if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id': - flask.abort(404, 'This channel does not exist') - raise - - - info = {} - info['current_tab'] = tab - - - # stuff from microformat (info given by youtube for every page on channel) - info['short_description'] = microformat['description'] - info['channel_name'] = microformat['title'] - info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url'] - channel_url = microformat['urlCanonical'].rstrip('/') - channel_id = channel_url[channel_url.rfind('/')+1:] - info['channel_id'] = channel_id - info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id - - info['items'] = [] - - # empty channel - if 'contents' not in response and 'continuationContents' not in response: - return info - - - # find the tab with content - # example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg - # TODO: maybe use the 'selected' attribute for this? - if 'continuationContents' not in response: - tab_renderer = None - tab_content = None - for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']: - try: - tab_renderer = tab_json['tabRenderer'] - except KeyError: - tab_renderer = tab_json['expandableTabRenderer'] - try: - tab_content = tab_renderer['content'] - break - except KeyError: - pass - else: # didn't break - raise Exception("No tabs found with content") - assert tab == tab_renderer['title'].lower() - - - # extract tab-specific info - if tab in ('videos', 'playlists', 'search'): # find the list of items - if 'continuationContents' in response: - try: - items = response['continuationContents']['gridContinuation']['items'] - except KeyError: - items = response['continuationContents']['sectionListContinuation']['contents'] # for search - else: - contents = tab_content['sectionListRenderer']['contents'] - if 'itemSectionRenderer' in contents[0]: - item_section = contents[0]['itemSectionRenderer']['contents'][0] - try: - items = item_section['gridRenderer']['items'] - except KeyError: - if "messageRenderer" in item_section: - items = [] - else: - raise Exception('gridRenderer missing but messageRenderer not found') - else: - items = contents # for search - - additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} - info['items'] = [yt_data_extract.renderer_info(renderer, additional_info) for renderer in items] - - elif tab == 'about': - channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer'] - - - info['links'] = [] - for link_json in channel_metadata.get('primaryLinks', ()): - url = link_json['navigationEndpoint']['urlEndpoint']['url'] - if url.startswith('/redirect'): # youtube puts these on external links to do tracking - query_string = url[url.find('?')+1: ] - url = urllib.parse.parse_qs(query_string)['q'][0] - - text = yt_data_extract.get_plain_text(link_json['title']) - - info['links'].append( (text, url) ) - - - info['stats'] = [] - for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): - try: - stat = channel_metadata[stat_name] - except KeyError: - continue - info['stats'].append(yt_data_extract.get_plain_text(stat)) - - if 'description' in channel_metadata: - info['description'] = yt_data_extract.get_text(channel_metadata['description']) - else: - info['description'] = '' - - else: - raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) - - return info def post_process_channel_info(info): info['avatar'] = util.prefix_url(info['avatar']) @@ -303,7 +185,7 @@ def get_channel_page(channel_id, tab='videos'): flask.abort(404, 'Unknown channel tab: ' + tab) - info = extract_info(json.loads(polymer_json), tab) + info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab) post_process_channel_info(info) if tab in ('videos', 'search'): info['number_of_videos'] = number_of_videos @@ -343,7 +225,7 @@ def get_channel_page_general_url(base_url, tab, request): flask.abort(404, 'Unknown channel tab: ' + tab) - info = extract_info(json.loads(polymer_json), tab) + info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab) post_process_channel_info(info) if tab in ('videos', 'search'): info['number_of_videos'] = 1000 diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py index 56bdf93..175622f 100644 --- a/youtube/subscriptions.py +++ b/youtube/subscriptions.py @@ -455,7 +455,7 @@ def _get_upstream_videos(channel_id): print('Failed to read atoma feed for ' + channel_status_name) traceback.print_exc() - videos = channel.extract_info(json.loads(channel_tab), 'videos')['items'] + videos = yt_data_extract.extract_channel_info(json.loads(channel_tab), 'videos')['items'] for i, video_item in enumerate(videos): if 'description' not in video_item: video_item['description'] = '' diff --git a/youtube/yt_data_extract.py b/youtube/yt_data_extract.py index 663edc4..c666ede 100644 --- a/youtube/yt_data_extract.py +++ b/youtube/yt_data_extract.py @@ -3,6 +3,7 @@ from youtube import util import html import json import re +import urllib # videos (all of type str): @@ -279,3 +280,123 @@ def parse_info_prepare_for_html(renderer, additional_info={}): return item +def extract_channel_info(polymer_json, tab): + response = polymer_json[1]['response'] + try: + microformat = response['microformat']['microformatDataRenderer'] + + # channel doesn't exist or was terminated + # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org + except KeyError: + if 'alerts' in response and len(response['alerts']) > 0: + result = '' + for alert in response['alerts']: + result += alert['alertRenderer']['text']['simpleText'] + '\n' + flask.abort(200, result) + elif 'errors' in response['responseContext']: + for error in response['responseContext']['errors']['error']: + if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id': + flask.abort(404, 'This channel does not exist') + raise + + + info = {} + info['current_tab'] = tab + + + # stuff from microformat (info given by youtube for every page on channel) + info['short_description'] = microformat['description'] + info['channel_name'] = microformat['title'] + info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url'] + channel_url = microformat['urlCanonical'].rstrip('/') + channel_id = channel_url[channel_url.rfind('/')+1:] + info['channel_id'] = channel_id + info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id + + info['items'] = [] + + # empty channel + if 'contents' not in response and 'continuationContents' not in response: + return info + + + # find the tab with content + # example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg + # TODO: maybe use the 'selected' attribute for this? + if 'continuationContents' not in response: + tab_renderer = None + tab_content = None + for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']: + try: + tab_renderer = tab_json['tabRenderer'] + except KeyError: + tab_renderer = tab_json['expandableTabRenderer'] + try: + tab_content = tab_renderer['content'] + break + except KeyError: + pass + else: # didn't break + raise Exception("No tabs found with content") + assert tab == tab_renderer['title'].lower() + + + # extract tab-specific info + if tab in ('videos', 'playlists', 'search'): # find the list of items + if 'continuationContents' in response: + try: + items = response['continuationContents']['gridContinuation']['items'] + except KeyError: + items = response['continuationContents']['sectionListContinuation']['contents'] # for search + else: + contents = tab_content['sectionListRenderer']['contents'] + if 'itemSectionRenderer' in contents[0]: + item_section = contents[0]['itemSectionRenderer']['contents'][0] + try: + items = item_section['gridRenderer']['items'] + except KeyError: + if "messageRenderer" in item_section: + items = [] + else: + raise Exception('gridRenderer missing but messageRenderer not found') + else: + items = contents # for search + + additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} + info['items'] = [renderer_info(renderer, additional_info) for renderer in items] + + elif tab == 'about': + channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer'] + + + info['links'] = [] + for link_json in channel_metadata.get('primaryLinks', ()): + url = link_json['navigationEndpoint']['urlEndpoint']['url'] + if url.startswith('/redirect'): # youtube puts these on external links to do tracking + query_string = url[url.find('?')+1: ] + url = urllib.parse.parse_qs(query_string)['q'][0] + + text = get_plain_text(link_json['title']) + + info['links'].append( (text, url) ) + + + info['stats'] = [] + for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): + try: + stat = channel_metadata[stat_name] + except KeyError: + continue + info['stats'].append(get_plain_text(stat)) + + if 'description' in channel_metadata: + info['description'] = get_text(channel_metadata['description']) + else: + info['description'] = '' + + else: + raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) + + return info + + -- cgit v1.2.3