From ce8a658a0e56a9dfd3d0145e53f85711c4cbfb11 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Fri, 27 Sep 2019 18:03:19 -0700 Subject: Extraction: Move item extraction into a generic, robust function --- youtube/yt_data_extract.py | 239 +++++++++++++++++++++++++++++++++------------ 1 file changed, 176 insertions(+), 63 deletions(-) diff --git a/youtube/yt_data_extract.py b/youtube/yt_data_extract.py index 551b663..892e73e 100644 --- a/youtube/yt_data_extract.py +++ b/youtube/yt_data_extract.py @@ -4,6 +4,7 @@ import html import json import re import urllib +import collections from math import ceil # videos (all of type str): @@ -58,15 +59,52 @@ def format_text_runs(runs): result += html.escape(text_run["text"]) return result +def default_get(object, key, default, types=()): + '''Like dict.get(), but returns default if the result doesn't match one of the types. + Also works for indexing lists.''' + try: + result = object[key] + except (TypeError, IndexError, KeyError): + return default + + if not types or isinstance(result, types): + return result + else: + return default + -def default_multi_get(object, *keys, default): - ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors ''' + +def default_multi_get(object, *keys, default, types=()): + '''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. + Last argument is the default value to use in case of any IndexErrors or KeyErrors. + If types is given and the result doesn't match one of those types, default is returned''' try: for key in keys: object = object[key] - return object - except (IndexError, KeyError): + except (TypeError, IndexError, KeyError): return default + else: + if not types or isinstance(object, types): + return object + else: + return default + +def multi_default_multi_get(object, *key_sequences, default=None, types=()): + '''Like default_multi_get, but can try different key sequences in case one fails. + Return default if all of them fail. key_sequences is a list of lists''' + for key_sequence in key_sequences: + _object = object + try: + for key in key_sequence: + _object = _object[key] + except (TypeError, IndexError, KeyError): + pass + else: + if not types or isinstance(_object, types): + return _object + else: + continue + return default def get_url(node): @@ -284,6 +322,7 @@ def parse_info_prepare_for_html(renderer, additional_info={}): return item +# TODO: Type checking def get_response(polymer_json): '''return response, error''' @@ -301,6 +340,123 @@ def get_response(polymer_json): return None, 'Failed to extract response' +list_types = { + 'sectionListRenderer', + 'itemSectionRenderer', + 'gridRenderer', + 'playlistVideoListRenderer', +} + +item_types = { + 'movieRenderer', + 'didYouMeanRenderer', + 'showingResultsForRenderer', + + 'videoRenderer', + 'compactVideoRenderer', + 'gridVideoRenderer', + 'playlistVideoRenderer', + + 'playlistRenderer', + 'compactPlaylistRenderer', + 'gridPlaylistRenderer', + + 'radioRenderer', + 'compactRadioRenderer', + 'gridRadioRenderer', + + 'showRenderer', + 'compactShowRenderer', + 'gridShowRenderer', + + + 'channelRenderer', + 'compactChannelRenderer', + 'gridChannelRenderer', + + 'channelAboutFullMetadataRenderer', +} + +def traverse_browse_renderer(renderer): + for tab in default_get(renderer, 'tabs', (), types=(list, tuple)): + tab_renderer = multi_default_multi_get(tab, ['tabRenderer'], ['expandableTabRenderer'], default=None, types=dict) + if tab_renderer is None: + continue + if tab_renderer.get('selected', False): + return default_get(tab_renderer, 'content', {}, types=(dict)) + print('Could not find tab with content') + return {} + +# these renderers contain one inside them +nested_renderer_dispatch = { + 'singleColumnBrowseResultsRenderer': traverse_browse_renderer, + 'twoColumnBrowseResultsRenderer': traverse_browse_renderer, + 'twoColumnSearchResultsRenderer': lambda renderer: default_get(renderer, 'primaryContents', {}, types=dict), +} + +def extract_items(response): + '''return items, ctoken''' + if 'continuationContents' in response: + # always has just the one [something]Continuation key, but do this just in case they add some tracking key or something + for key, renderer_continuation in default_get(response, 'continuationContents', {}, types=dict).items(): + if key.endswith('Continuation'): # e.g. commentSectionContinuation, playlistVideoListContinuation + items = multi_default_multi_get(renderer_continuation, ['contents'], ['items'], default=None, types=(list, tuple)) + ctoken = default_multi_get(renderer_continuation, 'continuations', 0, 'nextContinuationData', 'continuation', default=None, types=str) + return items, ctoken + return [], None + elif 'contents' in response: + ctoken = None + items = [] + + iter_stack = collections.deque() + current_iter = iter(()) + + renderer = default_get(response, 'contents', {}, types=dict) + + while True: + # mode 1: dig into the current renderer + # Will stay in mode 1 (via continue) if a new renderer is found inside this one + # Otherwise, after finding that it is an item renderer, + # contains a list, or contains nothing, + # falls through into mode 2 to get a new renderer + if len(renderer) != 0: + key, value = list(renderer.items())[0] + + # has a list in it, add it to the iter stack + if key in list_types: + renderer_list = multi_default_multi_get(value, ['contents'], ['items'], default=(), types=(list, tuple)) + if renderer_list: + iter_stack.append(current_iter) + current_iter = iter(renderer_list) + + continuation = default_multi_get(value, 'continuations', 0, 'nextContinuationData', 'continuation', default=None, types=str) + if continuation: + ctoken = continuation + + # new renderer nested inside this one + elif key in nested_renderer_dispatch: + renderer = nested_renderer_dispatch[key](value) + continue # back to mode 1 + + # the renderer is an item + elif key in item_types: + items.append(renderer) + + + # mode 2: get a new renderer by iterating. + # goes up the stack for an iterator if one has been exhausted + while current_iter is not None: + try: + renderer = current_iter.__next__() + break + except StopIteration: + try: + current_iter = iter_stack.pop() # go back up the stack + except IndexError: + return items, ctoken + + else: + return [], None def extract_channel_info(polymer_json, tab): response, err = get_response(polymer_json) @@ -341,54 +497,21 @@ def extract_channel_info(polymer_json, tab): return info - # find the tab with content - # example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg - # TODO: maybe use the 'selected' attribute for this? - if 'continuationContents' not in response: - tab_renderer = None - tab_content = None - for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']: - try: - tab_renderer = tab_json['tabRenderer'] - except KeyError: - tab_renderer = tab_json['expandableTabRenderer'] + items, _ = extract_items(response) + if tab in ('videos', 'playlists', 'search'): + additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} + info['items'] = [renderer_info(renderer, additional_info) for renderer in items] + + elif tab == 'about': + for item in items: try: - tab_content = tab_renderer['content'] + channel_metadata = item['channelAboutFullMetadataRenderer'] break except KeyError: pass - else: # didn't break - raise Exception("No tabs found with content") - assert tab == tab_renderer['title'].lower() - - - # extract tab-specific info - if tab in ('videos', 'playlists', 'search'): # find the list of items - if 'continuationContents' in response: - try: - items = response['continuationContents']['gridContinuation']['items'] - except KeyError: - items = response['continuationContents']['sectionListContinuation']['contents'] # for search else: - contents = tab_content['sectionListRenderer']['contents'] - if 'itemSectionRenderer' in contents[0]: - item_section = contents[0]['itemSectionRenderer']['contents'][0] - try: - items = item_section['gridRenderer']['items'] - except KeyError: - if "messageRenderer" in item_section: - items = [] - else: - raise Exception('gridRenderer missing but messageRenderer not found') - else: - items = contents # for search - - additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} - info['items'] = [renderer_info(renderer, additional_info) for renderer in items] - - elif tab == 'about': - channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer'] - + info['error'] = 'Could not find channelAboutFullMetadataRenderer' + return info info['links'] = [] for link_json in channel_metadata.get('primaryLinks', ()): @@ -428,10 +551,9 @@ def extract_search_info(polymer_json): info['estimated_results'] = int(response['estimatedResults']) info['estimated_pages'] = ceil(info['estimated_results']/20) - # almost always is the first "section", but if there's an advertisement for a google product like Stadia or Home in the search results, then that becomes the first "section" and the search results are in the second. So just join all of them for resiliency - results = [] - for section in response['contents']['twoColumnSearchResultsRenderer']['primaryContents']['sectionListRenderer']['contents']: - results += section['itemSectionRenderer']['contents'] + + results, _ = extract_items(response) + info['items'] = [] info['corrections'] = {'type': None} @@ -491,12 +613,8 @@ def extract_playlist_info(polymer_json): if err: return {'error': err} info = {'error': None} - try: # first page - video_list = response['contents']['singleColumnBrowseResultsRenderer']['tabs'][0]['tabRenderer']['content']['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['playlistVideoListRenderer']['contents'] - first_page = True - except KeyError: # other pages - video_list = response['continuationContents']['playlistVideoListContinuation']['contents'] - first_page = False + first_page = 'continuationContents' not in response + video_list, _ = extract_items(response) info['items'] = [renderer_info(renderer) for renderer in video_list] @@ -539,12 +657,7 @@ def parse_comments_polymer(polymer_json): ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0] metadata = ctoken_metadata(ctoken) - try: - comments_raw = response['continuationContents']['commentSectionContinuation']['items'] - except KeyError: - comments_raw = response['continuationContents']['commentRepliesContinuation']['contents'] - - ctoken = default_multi_get(response, 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='') + comments_raw, ctoken = extract_items(response) comments = [] for comment_json in comments_raw: -- cgit v1.2.3