diff options
Diffstat (limited to 'youtube/yt_data_extract/common.py')
-rw-r--r-- | youtube/yt_data_extract/common.py | 169 |
1 files changed, 142 insertions, 27 deletions
diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py index 2b394e6..7903db5 100644 --- a/youtube/yt_data_extract/common.py +++ b/youtube/yt_data_extract/common.py @@ -1,6 +1,7 @@ import re import urllib.parse import collections +import collections.abc def get(object, key, default=None, types=()): '''Like dict.get(), but returns default if the result doesn't match one of the types. @@ -62,17 +63,40 @@ def multi_deep_get(object, *key_sequences, default=None, types=()): continue return default + +def _is_empty(value): + '''Determines if value is None or an empty iterable, such as '' and []''' + if value is None: + return True + elif isinstance(value, collections.abc.Iterable) and not value: + return True + return False + + def liberal_update(obj, key, value): - '''Updates obj[key] with value as long as value is not None. - Ensures obj[key] will at least get a value of None, however''' - if (value is not None) or (key not in obj): + '''Updates obj[key] with value as long as value is not None or empty. + Ensures obj[key] will at least get an empty value, however''' + if (not _is_empty(value)) or (key not in obj): obj[key] = value def conservative_update(obj, key, value): - '''Only updates obj if it doesn't have key or obj[key] is None''' - if obj.get(key) is None: + '''Only updates obj if it doesn't have key or obj[key] is None/empty''' + if _is_empty(obj.get(key)): obj[key] = value + +def liberal_dict_update(dict1, dict2): + '''Update dict1 with keys from dict2 using liberal_update''' + for key, value in dict2.items(): + liberal_update(dict1, key, value) + + +def conservative_dict_update(dict1, dict2): + '''Update dict1 with keys from dict2 using conservative_update''' + for key, value in dict2.items(): + conservative_update(dict1, key, value) + + def concat_or_none(*strings): '''Concatenates strings. Returns None if any of the arguments are None''' result = '' @@ -85,7 +109,7 @@ def concat_or_none(*strings): def remove_redirect(url): if url is None: return None - if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking + if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # YouTube puts these on external links to do tracking query_string = url[url.find('?')+1: ] return urllib.parse.parse_qs(query_string)['q'][0] return url @@ -109,14 +133,14 @@ def _recover_urls(runs): for run in runs: url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url') text = run.get('text', '') - # second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text + # second condition is necessary because YouTube makes other things into urls, such as hashtags, which we want to keep as text if url is not None and (text.startswith('http://') or text.startswith('https://')): url = remove_redirect(url) run['url'] = url - run['text'] = url # youtube truncates the url text, use actual url instead + run['text'] = url # YouTube truncates the url text, use actual url instead def extract_str(node, default=None, recover_urls=False): - '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)''' + '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix YouTube's truncation of url text (most prominently seen in descriptions)''' if isinstance(node, str): return node @@ -142,14 +166,17 @@ def extract_formatted_text(node): return [{'text': node['simpleText']}] return [] -def extract_int(string, default=None): +def extract_int(string, default=None, whole_word=True): if isinstance(string, int): return string if not isinstance(string, str): string = extract_str(string) if not string: return default - match = re.search(r'\b(\d+)\b', string.replace(',', '')) + if whole_word: + match = re.search(r'\b(\d+)\b', string.replace(',', '')) + else: + match = re.search(r'(\d+)', string.replace(',', '')) if match is None: return default try: @@ -158,7 +185,7 @@ def extract_int(string, default=None): return default def extract_approx_int(string): - '''e.g. "15.1M" from "15.1M subscribers"''' + '''e.g. "15.1M" from "15.1M subscribers" or '4,353' from 4353''' if not isinstance(string, str): string = extract_str(string) if not string: @@ -166,7 +193,10 @@ def extract_approx_int(string): match = re.search(r'\b(\d+(?:\.\d+)?[KMBTkmbt]?)\b', string.replace(',', '')) if match is None: return None - return match.group(1) + result = match.group(1) + if re.fullmatch(r'\d+', result): + result = '{:,}'.format(int(result)) + return result MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'} def extract_date(date_text): @@ -213,8 +243,6 @@ def extract_item_info(item, additional_info={}): info['type'] = 'unsupported' return info - info.update(additional_info) - # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer' # camelCase split, https://stackoverflow.com/a/37697078 type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()] @@ -224,6 +252,9 @@ def extract_item_info(item, additional_info={}): primary_type = type_parts[-2] if primary_type == 'video': info['type'] = 'video' + elif type_parts[0] == 'reel': # shorts + info['type'] = 'video' + primary_type = 'video' elif primary_type in ('playlist', 'radio', 'show'): info['type'] = 'playlist' info['playlist_type'] = primary_type @@ -245,7 +276,11 @@ def extract_item_info(item, additional_info={}): ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] )) info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None - info['description'] = extract_formatted_text(multi_get(item, 'descriptionSnippet', 'descriptionText')) + info['description'] = extract_formatted_text(multi_deep_get( + item, + ['descriptionText'], ['descriptionSnippet'], + ['detailedMetadataSnippets', 0, 'snippetText'], + )) info['thumbnail'] = normalize_url(multi_deep_get(item, ['thumbnail', 'thumbnails', 0, 'url'], # videos ['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists @@ -266,7 +301,11 @@ def extract_item_info(item, additional_info={}): info['time_published'] = timestamp.group(1) if primary_type == 'video': - info['id'] = item.get('videoId') + info['id'] = multi_deep_get(item, + ['videoId'], + ['navigationEndpoint', 'watchEndpoint', 'videoId'], + ['navigationEndpoint', 'reelWatchEndpoint', 'videoId'] # shorts + ) info['view_count'] = extract_int(item.get('viewCountText')) # dig into accessibility data to get view_count for videos marked as recommended, and to get time_published @@ -284,17 +323,35 @@ def extract_item_info(item, additional_info={}): if info['view_count']: info['approx_view_count'] = '{:,}'.format(info['view_count']) else: - info['approx_view_count'] = extract_approx_int(item.get('shortViewCountText')) + info['approx_view_count'] = extract_approx_int(multi_get(item, + 'shortViewCountText', + 'viewCountText' # shorts + )) # handle case where it is "No views" if not info['approx_view_count']: if ('No views' in item.get('shortViewCountText', '') - or 'no views' in accessibility_label.lower()): + or 'no views' in accessibility_label.lower() + or 'No views' in extract_str(item.get('viewCountText', '')) # shorts + ): info['view_count'] = 0 info['approx_view_count'] = '0' info['duration'] = extract_str(item.get('lengthText')) + # dig into accessibility data to get duration for shorts + accessibility_label = deep_get(item, + 'accessibility', 'accessibilityData', 'label', + default='') + duration = re.search(r'(\d+) (second|seconds|minute) - play video$', + accessibility_label) + if duration: + if duration.group(2) == 'minute': + conservative_update(info, 'duration', '1:00') + else: + conservative_update(info, + 'duration', '0:' + duration.group(1).zfill(2)) + # if it's an item in a playlist, get its index if 'index' in item: # url has wrong index on playlist page info['index'] = extract_int(item.get('index')) @@ -335,6 +392,9 @@ def extract_item_info(item, additional_info={}): conservative_update(info, 'video_count', extract_int(deep_get( overlay, 'thumbnailOverlayBottomPanelRenderer', 'text' ))) + + info.update(additional_info) + return info def extract_response(polymer_json): @@ -363,6 +423,8 @@ _item_types = { 'gridVideoRenderer', 'playlistVideoRenderer', + 'reelItemRenderer', + 'playlistRenderer', 'compactPlaylistRenderer', 'gridPlaylistRenderer', @@ -402,6 +464,7 @@ nested_renderer_dispatch = { 'twoColumnBrowseResultsRenderer': _traverse_browse_renderer, 'twoColumnSearchResultsRenderer': lambda r: get(r, 'primaryContents', {}), 'richItemRenderer': lambda r: get(r, 'content', {}), + 'engagementPanelSectionListRenderer': lambda r: get(r, 'content', {}), } # these renderers contain a list of renderers inside them @@ -411,6 +474,8 @@ nested_renderer_list_dispatch = { 'gridRenderer': _traverse_standard_list, 'richGridRenderer': _traverse_standard_list, 'playlistVideoListRenderer': _traverse_standard_list, + 'structuredDescriptionContentRenderer': _traverse_standard_list, + 'slimVideoMetadataSectionRenderer': _traverse_standard_list, 'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[]), None), } def get_nested_renderer_list_function(key): @@ -474,8 +539,27 @@ def extract_items_from_renderer(renderer, item_types=_item_types): renderer = None -def extract_items(response, item_types=_item_types): + +def extract_items_from_renderer_list(renderers, item_types=_item_types): + '''Same as extract_items_from_renderer, but provide a list of renderers''' + items = [] + ctoken = None + for renderer in renderers: + new_items, new_ctoken = extract_items_from_renderer( + renderer, + item_types=item_types) + items += new_items + # prioritize ctoken associated with items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + return items, ctoken + + +def extract_items(response, item_types=_item_types, + search_engagement_panels=False): '''return items, ctoken''' + items = [] + ctoken = None if 'continuationContents' in response: # sometimes there's another, empty, junk [something]Continuation key # find real one @@ -483,13 +567,44 @@ def extract_items(response, item_types=_item_types): 'continuationContents', {}).items(): # e.g. commentSectionContinuation, playlistVideoListContinuation if key.endswith('Continuation'): - items, cont = extract_items_from_renderer({key: renderer_cont}, + items, ctoken = extract_items_from_renderer( + {key: renderer_cont}, item_types=item_types) if items: - return items, cont - return [], None - elif 'contents' in response: + break + if ('onResponseReceivedEndpoints' in response + or 'onResponseReceivedActions' in response): + for endpoint in multi_get(response, + 'onResponseReceivedEndpoints', + 'onResponseReceivedActions', + []): + new_items, new_ctoken = extract_items_from_renderer_list( + multi_deep_get( + endpoint, + ['reloadContinuationItemsCommand', 'continuationItems'], + ['appendContinuationItemsAction', 'continuationItems'], + default=[] + ), + item_types=item_types, + ) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + if 'contents' in response: renderer = get(response, 'contents', {}) - return extract_items_from_renderer(renderer, item_types=item_types) - else: - return [], None + new_items, new_ctoken = extract_items_from_renderer( + renderer, + item_types=item_types) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + + if search_engagement_panels and 'engagementPanels' in response: + new_items, new_ctoken = extract_items_from_renderer_list( + response['engagementPanels'], item_types=item_types + ) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + + return items, ctoken |