diff options
Diffstat (limited to 'youtube/yt_data_extract')
-rw-r--r-- | youtube/yt_data_extract/__init__.py | 4 | ||||
-rw-r--r-- | youtube/yt_data_extract/common.py | 169 | ||||
-rw-r--r-- | youtube/yt_data_extract/everything_else.py | 149 | ||||
-rw-r--r-- | youtube/yt_data_extract/watch_extraction.py | 323 |
4 files changed, 508 insertions, 137 deletions
diff --git a/youtube/yt_data_extract/__init__.py b/youtube/yt_data_extract/__init__.py index ad7bd03..de1812d 100644 --- a/youtube/yt_data_extract/__init__.py +++ b/youtube/yt_data_extract/__init__.py @@ -7,7 +7,7 @@ from .everything_else import (extract_channel_info, extract_search_info, extract_playlist_metadata, extract_playlist_info, extract_comments_info) from .watch_extraction import (extract_watch_info, get_caption_url, - update_with_age_restricted_info, requires_decryption, + update_with_new_urls, requires_decryption, extract_decryption_function, decrypt_signatures, _formats, update_format_with_type_info, extract_hls_formats, - extract_watch_info_from_html) + extract_watch_info_from_html, captions_available) diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py index 2b394e6..7903db5 100644 --- a/youtube/yt_data_extract/common.py +++ b/youtube/yt_data_extract/common.py @@ -1,6 +1,7 @@ import re import urllib.parse import collections +import collections.abc def get(object, key, default=None, types=()): '''Like dict.get(), but returns default if the result doesn't match one of the types. @@ -62,17 +63,40 @@ def multi_deep_get(object, *key_sequences, default=None, types=()): continue return default + +def _is_empty(value): + '''Determines if value is None or an empty iterable, such as '' and []''' + if value is None: + return True + elif isinstance(value, collections.abc.Iterable) and not value: + return True + return False + + def liberal_update(obj, key, value): - '''Updates obj[key] with value as long as value is not None. - Ensures obj[key] will at least get a value of None, however''' - if (value is not None) or (key not in obj): + '''Updates obj[key] with value as long as value is not None or empty. + Ensures obj[key] will at least get an empty value, however''' + if (not _is_empty(value)) or (key not in obj): obj[key] = value def conservative_update(obj, key, value): - '''Only updates obj if it doesn't have key or obj[key] is None''' - if obj.get(key) is None: + '''Only updates obj if it doesn't have key or obj[key] is None/empty''' + if _is_empty(obj.get(key)): obj[key] = value + +def liberal_dict_update(dict1, dict2): + '''Update dict1 with keys from dict2 using liberal_update''' + for key, value in dict2.items(): + liberal_update(dict1, key, value) + + +def conservative_dict_update(dict1, dict2): + '''Update dict1 with keys from dict2 using conservative_update''' + for key, value in dict2.items(): + conservative_update(dict1, key, value) + + def concat_or_none(*strings): '''Concatenates strings. Returns None if any of the arguments are None''' result = '' @@ -85,7 +109,7 @@ def concat_or_none(*strings): def remove_redirect(url): if url is None: return None - if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking + if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # YouTube puts these on external links to do tracking query_string = url[url.find('?')+1: ] return urllib.parse.parse_qs(query_string)['q'][0] return url @@ -109,14 +133,14 @@ def _recover_urls(runs): for run in runs: url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url') text = run.get('text', '') - # second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text + # second condition is necessary because YouTube makes other things into urls, such as hashtags, which we want to keep as text if url is not None and (text.startswith('http://') or text.startswith('https://')): url = remove_redirect(url) run['url'] = url - run['text'] = url # youtube truncates the url text, use actual url instead + run['text'] = url # YouTube truncates the url text, use actual url instead def extract_str(node, default=None, recover_urls=False): - '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)''' + '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix YouTube's truncation of url text (most prominently seen in descriptions)''' if isinstance(node, str): return node @@ -142,14 +166,17 @@ def extract_formatted_text(node): return [{'text': node['simpleText']}] return [] -def extract_int(string, default=None): +def extract_int(string, default=None, whole_word=True): if isinstance(string, int): return string if not isinstance(string, str): string = extract_str(string) if not string: return default - match = re.search(r'\b(\d+)\b', string.replace(',', '')) + if whole_word: + match = re.search(r'\b(\d+)\b', string.replace(',', '')) + else: + match = re.search(r'(\d+)', string.replace(',', '')) if match is None: return default try: @@ -158,7 +185,7 @@ def extract_int(string, default=None): return default def extract_approx_int(string): - '''e.g. "15.1M" from "15.1M subscribers"''' + '''e.g. "15.1M" from "15.1M subscribers" or '4,353' from 4353''' if not isinstance(string, str): string = extract_str(string) if not string: @@ -166,7 +193,10 @@ def extract_approx_int(string): match = re.search(r'\b(\d+(?:\.\d+)?[KMBTkmbt]?)\b', string.replace(',', '')) if match is None: return None - return match.group(1) + result = match.group(1) + if re.fullmatch(r'\d+', result): + result = '{:,}'.format(int(result)) + return result MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'} def extract_date(date_text): @@ -213,8 +243,6 @@ def extract_item_info(item, additional_info={}): info['type'] = 'unsupported' return info - info.update(additional_info) - # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer' # camelCase split, https://stackoverflow.com/a/37697078 type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()] @@ -224,6 +252,9 @@ def extract_item_info(item, additional_info={}): primary_type = type_parts[-2] if primary_type == 'video': info['type'] = 'video' + elif type_parts[0] == 'reel': # shorts + info['type'] = 'video' + primary_type = 'video' elif primary_type in ('playlist', 'radio', 'show'): info['type'] = 'playlist' info['playlist_type'] = primary_type @@ -245,7 +276,11 @@ def extract_item_info(item, additional_info={}): ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] )) info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None - info['description'] = extract_formatted_text(multi_get(item, 'descriptionSnippet', 'descriptionText')) + info['description'] = extract_formatted_text(multi_deep_get( + item, + ['descriptionText'], ['descriptionSnippet'], + ['detailedMetadataSnippets', 0, 'snippetText'], + )) info['thumbnail'] = normalize_url(multi_deep_get(item, ['thumbnail', 'thumbnails', 0, 'url'], # videos ['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists @@ -266,7 +301,11 @@ def extract_item_info(item, additional_info={}): info['time_published'] = timestamp.group(1) if primary_type == 'video': - info['id'] = item.get('videoId') + info['id'] = multi_deep_get(item, + ['videoId'], + ['navigationEndpoint', 'watchEndpoint', 'videoId'], + ['navigationEndpoint', 'reelWatchEndpoint', 'videoId'] # shorts + ) info['view_count'] = extract_int(item.get('viewCountText')) # dig into accessibility data to get view_count for videos marked as recommended, and to get time_published @@ -284,17 +323,35 @@ def extract_item_info(item, additional_info={}): if info['view_count']: info['approx_view_count'] = '{:,}'.format(info['view_count']) else: - info['approx_view_count'] = extract_approx_int(item.get('shortViewCountText')) + info['approx_view_count'] = extract_approx_int(multi_get(item, + 'shortViewCountText', + 'viewCountText' # shorts + )) # handle case where it is "No views" if not info['approx_view_count']: if ('No views' in item.get('shortViewCountText', '') - or 'no views' in accessibility_label.lower()): + or 'no views' in accessibility_label.lower() + or 'No views' in extract_str(item.get('viewCountText', '')) # shorts + ): info['view_count'] = 0 info['approx_view_count'] = '0' info['duration'] = extract_str(item.get('lengthText')) + # dig into accessibility data to get duration for shorts + accessibility_label = deep_get(item, + 'accessibility', 'accessibilityData', 'label', + default='') + duration = re.search(r'(\d+) (second|seconds|minute) - play video$', + accessibility_label) + if duration: + if duration.group(2) == 'minute': + conservative_update(info, 'duration', '1:00') + else: + conservative_update(info, + 'duration', '0:' + duration.group(1).zfill(2)) + # if it's an item in a playlist, get its index if 'index' in item: # url has wrong index on playlist page info['index'] = extract_int(item.get('index')) @@ -335,6 +392,9 @@ def extract_item_info(item, additional_info={}): conservative_update(info, 'video_count', extract_int(deep_get( overlay, 'thumbnailOverlayBottomPanelRenderer', 'text' ))) + + info.update(additional_info) + return info def extract_response(polymer_json): @@ -363,6 +423,8 @@ _item_types = { 'gridVideoRenderer', 'playlistVideoRenderer', + 'reelItemRenderer', + 'playlistRenderer', 'compactPlaylistRenderer', 'gridPlaylistRenderer', @@ -402,6 +464,7 @@ nested_renderer_dispatch = { 'twoColumnBrowseResultsRenderer': _traverse_browse_renderer, 'twoColumnSearchResultsRenderer': lambda r: get(r, 'primaryContents', {}), 'richItemRenderer': lambda r: get(r, 'content', {}), + 'engagementPanelSectionListRenderer': lambda r: get(r, 'content', {}), } # these renderers contain a list of renderers inside them @@ -411,6 +474,8 @@ nested_renderer_list_dispatch = { 'gridRenderer': _traverse_standard_list, 'richGridRenderer': _traverse_standard_list, 'playlistVideoListRenderer': _traverse_standard_list, + 'structuredDescriptionContentRenderer': _traverse_standard_list, + 'slimVideoMetadataSectionRenderer': _traverse_standard_list, 'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[]), None), } def get_nested_renderer_list_function(key): @@ -474,8 +539,27 @@ def extract_items_from_renderer(renderer, item_types=_item_types): renderer = None -def extract_items(response, item_types=_item_types): + +def extract_items_from_renderer_list(renderers, item_types=_item_types): + '''Same as extract_items_from_renderer, but provide a list of renderers''' + items = [] + ctoken = None + for renderer in renderers: + new_items, new_ctoken = extract_items_from_renderer( + renderer, + item_types=item_types) + items += new_items + # prioritize ctoken associated with items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + return items, ctoken + + +def extract_items(response, item_types=_item_types, + search_engagement_panels=False): '''return items, ctoken''' + items = [] + ctoken = None if 'continuationContents' in response: # sometimes there's another, empty, junk [something]Continuation key # find real one @@ -483,13 +567,44 @@ def extract_items(response, item_types=_item_types): 'continuationContents', {}).items(): # e.g. commentSectionContinuation, playlistVideoListContinuation if key.endswith('Continuation'): - items, cont = extract_items_from_renderer({key: renderer_cont}, + items, ctoken = extract_items_from_renderer( + {key: renderer_cont}, item_types=item_types) if items: - return items, cont - return [], None - elif 'contents' in response: + break + if ('onResponseReceivedEndpoints' in response + or 'onResponseReceivedActions' in response): + for endpoint in multi_get(response, + 'onResponseReceivedEndpoints', + 'onResponseReceivedActions', + []): + new_items, new_ctoken = extract_items_from_renderer_list( + multi_deep_get( + endpoint, + ['reloadContinuationItemsCommand', 'continuationItems'], + ['appendContinuationItemsAction', 'continuationItems'], + default=[] + ), + item_types=item_types, + ) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + if 'contents' in response: renderer = get(response, 'contents', {}) - return extract_items_from_renderer(renderer, item_types=item_types) - else: - return [], None + new_items, new_ctoken = extract_items_from_renderer( + renderer, + item_types=item_types) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + + if search_engagement_panels and 'engagementPanels' in response: + new_items, new_ctoken = extract_items_from_renderer_list( + response['engagementPanels'], item_types=item_types + ) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + + return items, ctoken diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py index f9c47cb..0f64649 100644 --- a/youtube/yt_data_extract/everything_else.py +++ b/youtube/yt_data_extract/everything_else.py @@ -9,7 +9,7 @@ import re import urllib from math import ceil -def extract_channel_info(polymer_json, tab): +def extract_channel_info(polymer_json, tab, continuation=False): response, err = extract_response(polymer_json) if err: return {'error': err} @@ -23,7 +23,8 @@ def extract_channel_info(polymer_json, tab): # channel doesn't exist or was terminated # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org - if not metadata: + # metadata and microformat are not present for continuation requests + if not metadata and not continuation: if response.get('alerts'): error_string = ' '.join( extract_str(deep_get(alert, 'alertRenderer', 'text'), default='') @@ -44,7 +45,7 @@ def extract_channel_info(polymer_json, tab): info['approx_subscriber_count'] = extract_approx_int(deep_get(response, 'header', 'c4TabbedHeaderRenderer', 'subscriberCountText')) - # stuff from microformat (info given by youtube for every page on channel) + # stuff from microformat (info given by youtube for first page on channel) info['short_description'] = metadata.get('description') if info['short_description'] and len(info['short_description']) > 730: info['short_description'] = info['short_description'][0:730] + '...' @@ -69,32 +70,99 @@ def extract_channel_info(polymer_json, tab): info['ctoken'] = None # empty channel - if 'contents' not in response and 'continuationContents' not in response: - return info + #if 'contents' not in response and 'continuationContents' not in response: + # return info - if tab in ('videos', 'playlists', 'search'): + if tab in ('videos', 'shorts', 'streams', 'playlists', 'search'): items, ctoken = extract_items(response) - additional_info = {'author': info['channel_name'], 'author_url': info['channel_url']} + additional_info = { + 'author': info['channel_name'], + 'author_id': info['channel_id'], + 'author_url': info['channel_url'], + } info['items'] = [extract_item_info(renderer, additional_info) for renderer in items] info['ctoken'] = ctoken if tab in ('search', 'playlists'): info['is_last_page'] = (ctoken is None) elif tab == 'about': - items, _ = extract_items(response, item_types={'channelAboutFullMetadataRenderer'}) - if not items: - info['error'] = 'Could not find channelAboutFullMetadataRenderer' - return info - channel_metadata = items[0]['channelAboutFullMetadataRenderer'] - - info['links'] = [] - for link_json in channel_metadata.get('primaryLinks', ()): - url = remove_redirect(deep_get(link_json, 'navigationEndpoint', 'urlEndpoint', 'url')) - text = extract_str(link_json.get('title')) - info['links'].append( (text, url) ) - - info['date_joined'] = extract_date(channel_metadata.get('joinedDateText')) - info['view_count'] = extract_int(channel_metadata.get('viewCountText')) - info['description'] = extract_str(channel_metadata.get('description'), default='') + # Latest type + items, _ = extract_items(response, item_types={'aboutChannelRenderer'}) + if items: + a_metadata = deep_get(items, 0, 'aboutChannelRenderer', + 'metadata', 'aboutChannelViewModel') + if not a_metadata: + info['error'] = 'Could not find aboutChannelViewModel' + return info + + info['links'] = [] + for link_outer in a_metadata.get('links', ()): + link = link_outer.get('channelExternalLinkViewModel') or {} + link_content = extract_str(deep_get(link, 'link', 'content')) + for run in deep_get(link, 'link', 'commandRuns') or (): + url = remove_redirect(deep_get(run, 'onTap', + 'innertubeCommand', 'urlEndpoint', 'url')) + if url and not (url.startswith('http://') + or url.startswith('https://')): + url = 'https://' + url + if link_content is None or (link_content in url): + break + else: # didn't break + url = link_content + if url and not (url.startswith('http://') + or url.startswith('https://')): + url = 'https://' + url + text = extract_str(deep_get(link, 'title', 'content')) + info['links'].append( (text, url) ) + + info['date_joined'] = extract_date( + a_metadata.get('joinedDateText') + ) + info['view_count'] = extract_int(a_metadata.get('viewCountText')) + info['approx_view_count'] = extract_approx_int( + a_metadata.get('viewCountText') + ) + info['description'] = extract_str( + a_metadata.get('description'), default='' + ) + info['approx_video_count'] = extract_approx_int( + a_metadata.get('videoCountText') + ) + info['approx_subscriber_count'] = extract_approx_int( + a_metadata.get('subscriberCountText') + ) + info['country'] = extract_str(a_metadata.get('country')) + info['canonical_url'] = extract_str( + a_metadata.get('canonicalChannelUrl') + ) + + # Old type + else: + items, _ = extract_items(response, + item_types={'channelAboutFullMetadataRenderer'}) + if not items: + info['error'] = 'Could not find aboutChannelRenderer or channelAboutFullMetadataRenderer' + return info + a_metadata = items[0]['channelAboutFullMetadataRenderer'] + + info['links'] = [] + for link_json in a_metadata.get('primaryLinks', ()): + url = remove_redirect(deep_get(link_json, 'navigationEndpoint', + 'urlEndpoint', 'url')) + if url and not (url.startswith('http://') + or url.startswith('https://')): + url = 'https://' + url + text = extract_str(link_json.get('title')) + info['links'].append( (text, url) ) + + info['date_joined'] = extract_date(a_metadata.get('joinedDateText')) + info['view_count'] = extract_int(a_metadata.get('viewCountText')) + info['description'] = extract_str(a_metadata.get( + 'description'), default='') + + info['approx_video_count'] = None + info['approx_subscriber_count'] = None + info['country'] = None + info['canonical_url'] = None else: raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) @@ -161,7 +229,7 @@ def extract_playlist_metadata(polymer_json): if metadata['first_video_id'] is None: metadata['thumbnail'] = None else: - metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg' + metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg" metadata['video_count'] = extract_int(header.get('numVideosText')) metadata['description'] = extract_str(header.get('descriptionText'), default='') @@ -184,6 +252,19 @@ def extract_playlist_metadata(polymer_json): elif 'updated' in text: metadata['time_published'] = extract_date(text) + microformat = deep_get(response, 'microformat', 'microformatDataRenderer', + default={}) + conservative_update( + metadata, 'title', extract_str(microformat.get('title')) + ) + conservative_update( + metadata, 'description', extract_str(microformat.get('description')) + ) + conservative_update( + metadata, 'thumbnail', deep_get(microformat, 'thumbnail', + 'thumbnails', -1, 'url') + ) + return metadata def extract_playlist_info(polymer_json): @@ -191,13 +272,11 @@ def extract_playlist_info(polymer_json): if err: return {'error': err} info = {'error': None} - first_page = 'continuationContents' not in response video_list, _ = extract_items(response) info['items'] = [extract_item_info(renderer) for renderer in video_list] - if first_page: - info['metadata'] = extract_playlist_metadata(polymer_json) + info['metadata'] = extract_playlist_metadata(polymer_json) return info @@ -220,15 +299,13 @@ def _ctoken_metadata(ctoken): result['sort'] = 0 return result -def extract_comments_info(polymer_json): +def extract_comments_info(polymer_json, ctoken=None): response, err = extract_response(polymer_json) if err: return {'error': err} info = {'error': None} - url = multi_deep_get(polymer_json, [1, 'url'], ['url']) - if url: - ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0] + if ctoken: metadata = _ctoken_metadata(ctoken) else: metadata = {} @@ -256,9 +333,13 @@ def extract_comments_info(polymer_json): comment_info['reply_count'] = extract_int(deep_get(comment_thread, 'replies', 'commentRepliesRenderer', 'moreText' ), default=1) # With 1 reply, the text reads "View reply" - comment_info['reply_ctoken'] = deep_get(comment_thread, - 'replies', 'commentRepliesRenderer', 'continuations', 0, - 'nextContinuationData', 'continuation' + comment_info['reply_ctoken'] = multi_deep_get( + comment_thread, + ['replies', 'commentRepliesRenderer', 'contents', 0, + 'continuationItemRenderer', 'button', 'buttonRenderer', + 'command', 'continuationCommand', 'token'], + ['replies', 'commentRepliesRenderer', 'continuations', 0, + 'nextContinuationData', 'continuation'] ) comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={}) elif 'commentRenderer' in comment: # replies @@ -282,6 +363,8 @@ def extract_comments_info(polymer_json): comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText')) comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText')) comment_info['like_count'] = comment_renderer.get('likeCount') + comment_info['approx_like_count'] = extract_approx_int( + comment_renderer.get('voteCount')) liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount')) info['comments'].append(comment_info) diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py index db53581..e09e2d3 100644 --- a/youtube/yt_data_extract/watch_extraction.py +++ b/youtube/yt_data_extract/watch_extraction.py @@ -2,7 +2,8 @@ from .common import (get, multi_get, deep_get, multi_deep_get, liberal_update, conservative_update, remove_redirect, normalize_url, extract_str, extract_formatted_text, extract_int, extract_approx_int, extract_date, check_missing_keys, extract_item_info, extract_items, - extract_response, concat_or_none) + extract_response, concat_or_none, liberal_dict_update, + conservative_dict_update) import json import urllib.parse @@ -116,7 +117,99 @@ _formats = { '397': {'vcodec': 'av01.0.05M.08'}, } -def _extract_metadata_row_info(video_renderer_info): + +def _extract_from_video_information_renderer(renderer_content): + subtitle = extract_str(renderer_content.get('expandedSubtitle'), + default='') + info = { + 'title': extract_str(renderer_content.get('title')), + 'view_count': extract_int(subtitle), + 'unlisted': False, + 'live': 'watching' in subtitle, + } + for badge in renderer_content.get('badges', []): + if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted': + info['unlisted'] = True + return info + +def _extract_likes_dislikes(renderer_content): + def extract_button_count(toggle_button_renderer): + # all the digits can be found in the accessibility data + count = extract_int(multi_deep_get( + toggle_button_renderer, + ['defaultText', 'accessibility', 'accessibilityData', 'label'], + ['accessibility', 'label'], + ['accessibilityData', 'accessibilityData', 'label'], + ['accessibilityText'], + )) + + # this count doesn't have all the digits, it's like 53K for instance + dumb_count = extract_int(extract_str(multi_get( + toggle_button_renderer, ['defaultText', 'title']))) + + # The accessibility text will be "No likes" or "No dislikes" or + # something like that, but dumb count will be 0 + if dumb_count == 0: + count = 0 + return count + + info = { + 'like_count': None, + 'dislike_count': None, + } + for button in renderer_content.get('buttons', ()): + if 'slimMetadataToggleButtonRenderer' in button: + button_renderer = button['slimMetadataToggleButtonRenderer'] + count = extract_button_count(deep_get(button_renderer, + 'button', + 'toggleButtonRenderer')) + if 'isLike' in button_renderer: + info['like_count'] = count + elif 'isDislike' in button_renderer: + info['dislike_count'] = count + elif 'slimMetadataButtonRenderer' in button: + button_renderer = button['slimMetadataButtonRenderer'] + liberal_update(info, 'like_count', extract_button_count( + multi_deep_get(button_renderer, + ['button', 'segmentedLikeDislikeButtonRenderer', + 'likeButton', 'toggleButtonRenderer'], + ['button', 'segmentedLikeDislikeButtonViewModel', + 'likeButtonViewModel', 'likeButtonViewModel', + 'toggleButtonViewModel', 'toggleButtonViewModel', + 'defaultButtonViewModel', 'buttonViewModel'] + ) + )) + '''liberal_update(info, 'dislike_count', extract_button_count( + deep_get( + button_renderer, 'button', + 'segmentedLikeDislikeButtonRenderer', + 'dislikeButton', 'toggleButtonRenderer' + ) + ))''' + return info + +def _extract_from_owner_renderer(renderer_content): + return { + 'author': extract_str(renderer_content.get('title')), + 'author_id': deep_get( + renderer_content, + 'navigationEndpoint', 'browseEndpoint', 'browseId'), + } + +def _extract_from_video_header_renderer(renderer_content): + return { + 'title': extract_str(renderer_content.get('title')), + 'time_published': extract_date(extract_str( + renderer_content.get('publishDate'))), + } + +def _extract_from_description_renderer(renderer_content): + return { + 'description': extract_str( + renderer_content.get('descriptionBodyText'), recover_urls=True), + } + +def _extract_metadata_row_info(renderer_content): # extract category and music list info = { 'category': None, @@ -124,7 +217,7 @@ def _extract_metadata_row_info(video_renderer_info): } current_song = {} - for row in deep_get(video_renderer_info, 'metadataRowContainer', 'metadataRowContainerRenderer', 'rows', default=[]): + for row in deep_get(renderer_content, 'rows', default=[]): row_title = extract_str(deep_get(row, 'metadataRowRenderer', 'title'), default='') row_content = extract_str(deep_get(row, 'metadataRowRenderer', 'contents', 0)) if row_title == 'Category': @@ -146,18 +239,69 @@ def _extract_metadata_row_info(video_renderer_info): return info -def _extract_watch_info_mobile(top_level): - info = {} - microformat = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={}) +def _extract_from_music_renderer(renderer_content): + # latest format for the music list + info = { + 'music_list': [], + } - family_safe = microformat.get('isFamilySafe') - if family_safe is None: - info['age_restricted'] = None - else: - info['age_restricted'] = not family_safe - info['allowed_countries'] = microformat.get('availableCountries', []) - info['time_published'] = microformat.get('publishDate') + for carousel in renderer_content.get('carouselLockups', []): + song = {} + carousel = carousel.get('carouselLockupRenderer', {}) + video_renderer = carousel.get('videoLockup', {}) + video_renderer_info = extract_item_info(video_renderer) + video_id = video_renderer_info.get('id') + song['url'] = concat_or_none('https://www.youtube.com/watch?v=', + video_id) + song['title'] = video_renderer_info.get('title') + for row in carousel.get('infoRows', []): + row = row.get('infoRowRenderer', {}) + title = extract_str(row.get('title')) + data = extract_str(row.get('defaultMetadata')) + if title == 'SONG': + song['title'] = data + elif title == 'ARTIST': + song['artist'] = data + elif title == 'ALBUM': + song['album'] = data + elif title == 'WRITERS': + song['writers'] = data + info['music_list'].append(song) + return info +def _extract_from_video_metadata(renderer_content): + info = _extract_from_video_information_renderer(renderer_content) + liberal_dict_update(info, _extract_likes_dislikes(renderer_content)) + liberal_dict_update(info, _extract_from_owner_renderer(renderer_content)) + liberal_dict_update(info, _extract_metadata_row_info(deep_get( + renderer_content, 'metadataRowContainer', + 'metadataRowContainerRenderer', default={} + ))) + liberal_update(info, 'title', extract_str(renderer_content.get('title'))) + liberal_update( + info, 'description', + extract_str(renderer_content.get('description'), recover_urls=True) + ) + liberal_update(info, 'time_published', + extract_date(renderer_content.get('dateText'))) + return info + +visible_extraction_dispatch = { + # Either these ones spread around in various places + 'slimVideoInformationRenderer': _extract_from_video_information_renderer, + 'slimVideoActionBarRenderer': _extract_likes_dislikes, + 'slimOwnerRenderer': _extract_from_owner_renderer, + 'videoDescriptionHeaderRenderer': _extract_from_video_header_renderer, + 'videoDescriptionMusicSectionRenderer': _extract_from_music_renderer, + 'expandableVideoDescriptionRenderer': _extract_from_description_renderer, + 'metadataRowContainerRenderer': _extract_metadata_row_info, + # OR just this one, which contains SOME of the above inside it + 'slimVideoMetadataRenderer': _extract_from_video_metadata, +} + +def _extract_watch_info_mobile(top_level): + '''Scrapes information from the visible page''' + info = {} response = top_level.get('response', {}) # this renderer has the stuff visible on the page @@ -190,47 +334,24 @@ def _extract_watch_info_mobile(top_level): else: info['playlist'] = None - # Holds the visible video info. It is inside singleColumnWatchNextResults - # but use our convenience function instead - items, _ = extract_items(response, item_types={'slimVideoMetadataRenderer'}) - if items: - video_info = items[0]['slimVideoMetadataRenderer'] - else: - print('Failed to extract video metadata') - video_info = {} - - info.update(_extract_metadata_row_info(video_info)) - info['description'] = extract_str(video_info.get('description'), recover_urls=True) - info['view_count'] = extract_int(extract_str(video_info.get('expandedSubtitle'))) - info['author'] = extract_str(deep_get(video_info, 'owner', 'slimOwnerRenderer', 'title')) - info['author_id'] = deep_get(video_info, 'owner', 'slimOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId') - info['title'] = extract_str(video_info.get('title')) - info['live'] = 'watching' in extract_str(video_info.get('expandedSubtitle'), default='') - info['unlisted'] = False - for badge in video_info.get('badges', []): - if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted': - info['unlisted'] = True - info['like_count'] = None - info['dislike_count'] = None - if not info['time_published']: - info['time_published'] = extract_date(extract_str(video_info.get('dateText', None))) - for button in video_info.get('buttons', ()): - button_renderer = button.get('slimMetadataToggleButtonRenderer', {}) - - # all the digits can be found in the accessibility data - count = extract_int(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText', 'accessibility', 'accessibilityData', 'label')) - - # this count doesn't have all the digits, it's like 53K for instance - dumb_count = extract_int(extract_str(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText'))) - - # the accessibility text will be "No likes" or "No dislikes" or something like that, but dumb count will be 0 - if dumb_count == 0: - count = 0 - - if 'isLike' in button_renderer: - info['like_count'] = count - elif 'isDislike' in button_renderer: - info['dislike_count'] = count + # use dispatch table to get information scattered in various renderers + items, _ = extract_items( + response, + item_types=visible_extraction_dispatch.keys(), + search_engagement_panels=True + ) + found = set() + for renderer in items: + name, renderer_content = list(renderer.items())[0] + found.add(name) + liberal_dict_update( + info, + visible_extraction_dispatch[name](renderer_content) + ) + # Call the function on blank dict for any that weren't found + # so that the empty keys get added + for name in visible_extraction_dispatch.keys() - found: + liberal_dict_update(info, visible_extraction_dispatch[name]({})) # comment section info items, _ = extract_items(response, item_types={ @@ -244,17 +365,18 @@ def _extract_watch_info_mobile(top_level): # https://www.androidpolice.com/2019/10/31/google-youtube-app-comment-section-below-videos/ # https://www.youtube.com/watch?v=bR5Q-wD-6qo if header_type == 'commentsEntryPointHeaderRenderer': - comment_count_text = extract_str(comment_info.get('headerText')) + comment_count_text = extract_str(multi_get( + comment_info, 'commentCount', 'headerText')) else: comment_count_text = extract_str(deep_get(comment_info, 'header', 'commentSectionHeaderRenderer', 'countText')) if comment_count_text == 'Comments': # just this with no number, means 0 comments - info['comment_count'] = 0 + info['comment_count'] = '0' else: - info['comment_count'] = extract_int(comment_count_text) + info['comment_count'] = extract_approx_int(comment_count_text) info['comments_disabled'] = False else: # no comment section present means comments are disabled - info['comment_count'] = 0 + info['comment_count'] = '0' info['comments_disabled'] = True # check for limited state @@ -274,7 +396,6 @@ def _extract_watch_info_desktop(top_level): info = { 'comment_count': None, 'comments_disabled': None, - 'allowed_countries': [], 'limited_state': None, 'playlist': None, } @@ -307,26 +428,28 @@ def _extract_watch_info_desktop(top_level): return info def update_format_with_codec_info(fmt, codec): - if (codec.startswith('av') - or codec in ('vp9', 'vp8', 'vp8.0', 'h263', 'h264', 'mp4v')): + if any(codec.startswith(c) for c in ('av', 'vp', 'h263', 'h264', 'mp4v')): if codec == 'vp8.0': codec = 'vp8' conservative_update(fmt, 'vcodec', codec) elif (codec.startswith('mp4a') - or codec in ('opus', 'mp3', 'aac', 'dtse', 'ec-3', 'vorbis')): + or codec in ('opus', 'mp3', 'aac', 'dtse', 'ec-3', 'vorbis', + 'ac-3')): conservative_update(fmt, 'acodec', codec) else: print('Warning: unrecognized codec: ' + codec) fmt_type_re = re.compile( - r'(text|audio|video)/([\w0-9]+); codecs="([\w0-9\.]+(?:, [\w0-9\.]+)*)"') + r'(text|audio|video)/([\w0-9]+); codecs="([^"]+)"') def update_format_with_type_info(fmt, yt_fmt): # 'type' for invidious api format mime_type = multi_get(yt_fmt, 'mimeType', 'type') if mime_type is None: return match = re.fullmatch(fmt_type_re, mime_type) - + if match is None: + print('Warning: Could not read mimetype', mime_type) + return type, fmt['ext'], codecs = match.groups() codecs = codecs.split(', ') for codec in codecs: @@ -349,17 +472,32 @@ def _extract_formats(info, player_response): for yt_fmt in yt_formats: itag = yt_fmt.get('itag') + # Translated audio track + # Example: https://www.youtube.com/watch?v=gF9kkB0UWYQ + # Only get the original language for now so a foreign + # translation will not be picked just because it comes first + if deep_get(yt_fmt, 'audioTrack', 'audioIsDefault') is False: + continue + fmt = {} fmt['itag'] = itag fmt['ext'] = None fmt['audio_bitrate'] = None + fmt['bitrate'] = yt_fmt.get('bitrate') fmt['acodec'] = None fmt['vcodec'] = None fmt['width'] = yt_fmt.get('width') fmt['height'] = yt_fmt.get('height') - fmt['file_size'] = yt_fmt.get('contentLength') - fmt['audio_sample_rate'] = yt_fmt.get('audioSampleRate') + fmt['file_size'] = extract_int(yt_fmt.get('contentLength')) + fmt['audio_sample_rate'] = extract_int(yt_fmt.get('audioSampleRate')) + fmt['duration_ms'] = yt_fmt.get('approxDurationMs') fmt['fps'] = yt_fmt.get('fps') + fmt['init_range'] = yt_fmt.get('initRange') + fmt['index_range'] = yt_fmt.get('indexRange') + for key in ('init_range', 'index_range'): + if fmt[key]: + fmt[key]['start'] = int(fmt[key]['start']) + fmt[key]['end'] = int(fmt[key]['end']) update_format_with_type_info(fmt, yt_fmt) cipher = dict(urllib.parse.parse_qsl(multi_get(yt_fmt, 'cipher', 'signatureCipher', default=''))) @@ -373,8 +511,16 @@ def _extract_formats(info, player_response): # update with information from big table hardcoded_itag_info = _formats.get(str(itag), {}) for key, value in hardcoded_itag_info.items(): - conservative_update(fmt, key, value) # prefer info from Youtube + conservative_update(fmt, key, value) # prefer info from YouTube fmt['quality'] = hardcoded_itag_info.get('height') + conservative_update( + fmt, 'quality', + extract_int(yt_fmt.get('quality'), whole_word=False) + ) + conservative_update( + fmt, 'quality', + extract_int(yt_fmt.get('qualityLabel'), whole_word=False) + ) info['formats'].append(fmt) @@ -397,7 +543,7 @@ def extract_hls_formats(hls_manifest): if lines[i].startswith('#EXT-X-STREAM-INF'): fmt = {'acodec': None, 'vcodec': None, 'height': None, 'width': None, 'fps': None, 'audio_bitrate': None, - 'itag': None, 'file_size': None, + 'itag': None, 'file_size': None, 'duration_ms': None, 'audio_sample_rate': None, 'url': None} properties = lines[i].split(':')[1] properties += ',' # make regex work for last key-value pair @@ -484,6 +630,25 @@ def extract_watch_info(polymer_json): info['translation_languages'] = [] captions_info = player_response.get('captions', {}) info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl')) + # Sometimes the above playerCaptionsRender is randomly missing + # Extract base_url from one of the captions by removing lang specifiers + if not info['_captions_base_url']: + base_url = normalize_url(deep_get( + captions_info, + 'playerCaptionsTracklistRenderer', + 'captionTracks', + 0, + 'baseUrl' + )) + if base_url: + url_parts = urllib.parse.urlparse(base_url) + qs = urllib.parse.parse_qs(url_parts.query) + for key in ('tlang', 'lang', 'name', 'kind', 'fmt'): + if key in qs: + del qs[key] + base_url = urllib.parse.urlunparse(url_parts._replace( + query=urllib.parse.urlencode(qs, doseq=True))) + info['_captions_base_url'] = base_url for caption_track in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()): lang_code = caption_track.get('languageCode') if not lang_code: @@ -564,9 +729,17 @@ def extract_watch_info(polymer_json): liberal_update(info, 'category', mf.get('category')) liberal_update(info, 'time_published', mf.get('publishDate')) liberal_update(info, 'time_uploaded', mf.get('uploadDate')) + family_safe = mf.get('isFamilySafe') + if family_safe is None: + conservative_update(info, 'age_restricted', None) + else: + conservative_update(info, 'age_restricted', not family_safe) + info['allowed_countries'] = mf.get('availableCountries', []) # other stuff info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None + info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec') + return info single_char_codes = { @@ -646,10 +819,15 @@ def extract_watch_info_from_html(watch_html): return extract_watch_info(fake_polymer_json) +def captions_available(info): + return bool(info['_captions_base_url']) + def get_caption_url(info, language, format, automatic=False, translation_language=None): '''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.''' url = info['_captions_base_url'] + if not url: + return None url += '&lang=' + language url += '&fmt=' + format if automatic: @@ -661,15 +839,10 @@ def get_caption_url(info, language, format, automatic=False, translation_languag url += '&tlang=' + translation_language return url -def update_with_age_restricted_info(info, video_info_page): - '''Inserts urls from 'player_response' in get_video_info page''' +def update_with_new_urls(info, player_response): + '''Inserts urls from player_response json''' ERROR_PREFIX = 'Error getting missing player or bypassing age-restriction: ' - video_info = urllib.parse.parse_qs(video_info_page) - player_response = deep_get(video_info, 'player_response', 0) - if player_response is None: - info['playability_error'] = ERROR_PREFIX + 'Could not find player_response in video_info_page' - return try: player_response = json.loads(player_response) except json.decoder.JSONDecodeError: |