diff options
Diffstat (limited to 'youtube/yt_data_extract/everything_else.py')
| -rw-r--r-- | youtube/yt_data_extract/everything_else.py | 261 |
1 files changed, 202 insertions, 59 deletions
diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py index f9c47cb..5930111 100644 --- a/youtube/yt_data_extract/everything_else.py +++ b/youtube/yt_data_extract/everything_else.py @@ -9,7 +9,7 @@ import re import urllib from math import ceil -def extract_channel_info(polymer_json, tab): +def extract_channel_info(polymer_json, tab, continuation=False): response, err = extract_response(polymer_json) if err: return {'error': err} @@ -23,7 +23,8 @@ def extract_channel_info(polymer_json, tab): # channel doesn't exist or was terminated # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org - if not metadata: + # metadata and microformat are not present for continuation requests + if not metadata and not continuation: if response.get('alerts'): error_string = ' '.join( extract_str(deep_get(alert, 'alertRenderer', 'text'), default='') @@ -44,7 +45,7 @@ def extract_channel_info(polymer_json, tab): info['approx_subscriber_count'] = extract_approx_int(deep_get(response, 'header', 'c4TabbedHeaderRenderer', 'subscriberCountText')) - # stuff from microformat (info given by youtube for every page on channel) + # stuff from microformat (info given by youtube for first page on channel) info['short_description'] = metadata.get('description') if info['short_description'] and len(info['short_description']) > 730: info['short_description'] = info['short_description'][0:730] + '...' @@ -69,32 +70,99 @@ def extract_channel_info(polymer_json, tab): info['ctoken'] = None # empty channel - if 'contents' not in response and 'continuationContents' not in response: - return info + #if 'contents' not in response and 'continuationContents' not in response: + # return info - if tab in ('videos', 'playlists', 'search'): + if tab in ('videos', 'shorts', 'streams', 'playlists', 'search'): items, ctoken = extract_items(response) - additional_info = {'author': info['channel_name'], 'author_url': info['channel_url']} + additional_info = { + 'author': info['channel_name'], + 'author_id': info['channel_id'], + 'author_url': info['channel_url'], + } info['items'] = [extract_item_info(renderer, additional_info) for renderer in items] info['ctoken'] = ctoken if tab in ('search', 'playlists'): info['is_last_page'] = (ctoken is None) elif tab == 'about': - items, _ = extract_items(response, item_types={'channelAboutFullMetadataRenderer'}) - if not items: - info['error'] = 'Could not find channelAboutFullMetadataRenderer' - return info - channel_metadata = items[0]['channelAboutFullMetadataRenderer'] - - info['links'] = [] - for link_json in channel_metadata.get('primaryLinks', ()): - url = remove_redirect(deep_get(link_json, 'navigationEndpoint', 'urlEndpoint', 'url')) - text = extract_str(link_json.get('title')) - info['links'].append( (text, url) ) - - info['date_joined'] = extract_date(channel_metadata.get('joinedDateText')) - info['view_count'] = extract_int(channel_metadata.get('viewCountText')) - info['description'] = extract_str(channel_metadata.get('description'), default='') + # Latest type + items, _ = extract_items(response, item_types={'aboutChannelRenderer'}) + if items: + a_metadata = deep_get(items, 0, 'aboutChannelRenderer', + 'metadata', 'aboutChannelViewModel') + if not a_metadata: + info['error'] = 'Could not find aboutChannelViewModel' + return info + + info['links'] = [] + for link_outer in a_metadata.get('links', ()): + link = link_outer.get('channelExternalLinkViewModel') or {} + link_content = extract_str(deep_get(link, 'link', 'content')) + for run in deep_get(link, 'link', 'commandRuns') or (): + url = remove_redirect(deep_get(run, 'onTap', + 'innertubeCommand', 'urlEndpoint', 'url')) + if url and not (url.startswith('http://') + or url.startswith('https://')): + url = 'https://' + url + if link_content is None or (link_content in url): + break + else: # didn't break + url = link_content + if url and not (url.startswith('http://') + or url.startswith('https://')): + url = 'https://' + url + text = extract_str(deep_get(link, 'title', 'content')) + info['links'].append( (text, url) ) + + info['date_joined'] = extract_date( + a_metadata.get('joinedDateText') + ) + info['view_count'] = extract_int(a_metadata.get('viewCountText')) + info['approx_view_count'] = extract_approx_int( + a_metadata.get('viewCountText') + ) + info['description'] = extract_str( + a_metadata.get('description'), default='' + ) + info['approx_video_count'] = extract_approx_int( + a_metadata.get('videoCountText') + ) + info['approx_subscriber_count'] = extract_approx_int( + a_metadata.get('subscriberCountText') + ) + info['country'] = extract_str(a_metadata.get('country')) + info['canonical_url'] = extract_str( + a_metadata.get('canonicalChannelUrl') + ) + + # Old type + else: + items, _ = extract_items(response, + item_types={'channelAboutFullMetadataRenderer'}) + if not items: + info['error'] = 'Could not find aboutChannelRenderer or channelAboutFullMetadataRenderer' + return info + a_metadata = items[0]['channelAboutFullMetadataRenderer'] + + info['links'] = [] + for link_json in a_metadata.get('primaryLinks', ()): + url = remove_redirect(deep_get(link_json, 'navigationEndpoint', + 'urlEndpoint', 'url')) + if url and not (url.startswith('http://') + or url.startswith('https://')): + url = 'https://' + url + text = extract_str(link_json.get('title')) + info['links'].append( (text, url) ) + + info['date_joined'] = extract_date(a_metadata.get('joinedDateText')) + info['view_count'] = extract_int(a_metadata.get('viewCountText')) + info['description'] = extract_str(a_metadata.get( + 'description'), default='') + + info['approx_video_count'] = None + info['approx_subscriber_count'] = None + info['country'] = None + info['canonical_url'] = None else: raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) @@ -150,39 +218,112 @@ def extract_playlist_metadata(polymer_json): return {'error': err} metadata = {'error': None} + metadata['title'] = None + metadata['first_video_id'] = None + metadata['thumbnail'] = None + metadata['video_count'] = None + metadata['description'] = '' + metadata['author'] = None + metadata['author_id'] = None + metadata['author_url'] = None + metadata['view_count'] = None + metadata['like_count'] = None + metadata['time_published'] = None + header = deep_get(response, 'header', 'playlistHeaderRenderer', default={}) - metadata['title'] = extract_str(header.get('title')) - metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId') - first_id = re.search(r'([a-z_\-]{11})', deep_get(header, - 'thumbnail', 'thumbnails', 0, 'url', default='')) - if first_id: - conservative_update(metadata, 'first_video_id', first_id.group(1)) - if metadata['first_video_id'] is None: - metadata['thumbnail'] = None + if header: + # Classic playlistHeaderRenderer format + metadata['title'] = extract_str(header.get('title')) + metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId') + first_id = re.search(r'([a-z_\-]{11})', deep_get(header, + 'thumbnail', 'thumbnails', 0, 'url', default='')) + if first_id: + conservative_update(metadata, 'first_video_id', first_id.group(1)) + + metadata['video_count'] = extract_int(header.get('numVideosText')) + metadata['description'] = extract_str(header.get('descriptionText'), default='') + metadata['author'] = extract_str(header.get('ownerText')) + metadata['author_id'] = multi_deep_get(header, + ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], + ['ownerEndpoint', 'browseEndpoint', 'browseId']) + metadata['view_count'] = extract_int(header.get('viewCountText')) + metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText')) + for stat in header.get('stats', ()): + text = extract_str(stat) + if 'videos' in text or 'episodes' in text: + conservative_update(metadata, 'video_count', extract_int(text)) + elif 'views' in text: + conservative_update(metadata, 'view_count', extract_int(text)) + elif 'updated' in text: + metadata['time_published'] = extract_date(text) else: - metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg' - - metadata['video_count'] = extract_int(header.get('numVideosText')) - metadata['description'] = extract_str(header.get('descriptionText'), default='') - metadata['author'] = extract_str(header.get('ownerText')) - metadata['author_id'] = multi_deep_get(header, - ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], - ['ownerEndpoint', 'browseEndpoint', 'browseId']) + # New pageHeaderRenderer format (YouTube 2024+) + page_header = deep_get(response, 'header', 'pageHeaderRenderer', default={}) + metadata['title'] = page_header.get('pageTitle') + view_model = deep_get(page_header, 'content', 'pageHeaderViewModel', default={}) + + # Extract title from viewModel if not found + if not metadata['title']: + metadata['title'] = deep_get(view_model, + 'title', 'dynamicTextViewModel', 'text', 'content') + + # Extract metadata from rows (author, video count, views, etc.) + meta_rows = deep_get(view_model, + 'metadata', 'contentMetadataViewModel', 'metadataRows', default=[]) + for row in meta_rows: + for part in row.get('metadataParts', []): + text_content = deep_get(part, 'text', 'content', default='') + # Author from avatarStack + avatar_stack = deep_get(part, 'avatarStack', 'avatarStackViewModel', default={}) + if avatar_stack: + author_text = deep_get(avatar_stack, 'text', 'content') + if author_text: + metadata['author'] = author_text + # Extract author_id from commandRuns + for run in deep_get(avatar_stack, 'text', 'commandRuns', default=[]): + browse_id = deep_get(run, 'onTap', 'innertubeCommand', + 'browseEndpoint', 'browseId') + if browse_id: + metadata['author_id'] = browse_id + # Video/episode count + if text_content and ('video' in text_content.lower() or 'episode' in text_content.lower()): + conservative_update(metadata, 'video_count', extract_int(text_content)) + # View count + elif text_content and 'view' in text_content.lower(): + conservative_update(metadata, 'view_count', extract_int(text_content)) + # Last updated + elif text_content and 'updated' in text_content.lower(): + metadata['time_published'] = extract_date(text_content) + + # Extract description from sidebar if available + sidebar = deep_get(response, 'sidebar', 'playlistSidebarRenderer', 'items', default=[]) + for sidebar_item in sidebar: + desc = deep_get(sidebar_item, 'playlistSidebarPrimaryInfoRenderer', + 'description', 'simpleText') + if desc: + metadata['description'] = desc + if metadata['author_id']: metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id'] + + if metadata['first_video_id'] is None: + metadata['thumbnail'] = None else: - metadata['author_url'] = None - metadata['view_count'] = extract_int(header.get('viewCountText')) - metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText')) - for stat in header.get('stats', ()): - text = extract_str(stat) - if 'videos' in text: - conservative_update(metadata, 'video_count', extract_int(text)) - elif 'views' in text: - conservative_update(metadata, 'view_count', extract_int(text)) - elif 'updated' in text: - metadata['time_published'] = extract_date(text) + metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg" + + microformat = deep_get(response, 'microformat', 'microformatDataRenderer', + default={}) + conservative_update( + metadata, 'title', extract_str(microformat.get('title')) + ) + conservative_update( + metadata, 'description', extract_str(microformat.get('description')) + ) + conservative_update( + metadata, 'thumbnail', deep_get(microformat, 'thumbnail', + 'thumbnails', -1, 'url') + ) return metadata @@ -191,13 +332,11 @@ def extract_playlist_info(polymer_json): if err: return {'error': err} info = {'error': None} - first_page = 'continuationContents' not in response video_list, _ = extract_items(response) info['items'] = [extract_item_info(renderer) for renderer in video_list] - if first_page: - info['metadata'] = extract_playlist_metadata(polymer_json) + info['metadata'] = extract_playlist_metadata(polymer_json) return info @@ -220,15 +359,13 @@ def _ctoken_metadata(ctoken): result['sort'] = 0 return result -def extract_comments_info(polymer_json): +def extract_comments_info(polymer_json, ctoken=None): response, err = extract_response(polymer_json) if err: return {'error': err} info = {'error': None} - url = multi_deep_get(polymer_json, [1, 'url'], ['url']) - if url: - ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0] + if ctoken: metadata = _ctoken_metadata(ctoken) else: metadata = {} @@ -256,9 +393,13 @@ def extract_comments_info(polymer_json): comment_info['reply_count'] = extract_int(deep_get(comment_thread, 'replies', 'commentRepliesRenderer', 'moreText' ), default=1) # With 1 reply, the text reads "View reply" - comment_info['reply_ctoken'] = deep_get(comment_thread, - 'replies', 'commentRepliesRenderer', 'continuations', 0, - 'nextContinuationData', 'continuation' + comment_info['reply_ctoken'] = multi_deep_get( + comment_thread, + ['replies', 'commentRepliesRenderer', 'contents', 0, + 'continuationItemRenderer', 'button', 'buttonRenderer', + 'command', 'continuationCommand', 'token'], + ['replies', 'commentRepliesRenderer', 'continuations', 0, + 'nextContinuationData', 'continuation'] ) comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={}) elif 'commentRenderer' in comment: # replies @@ -282,6 +423,8 @@ def extract_comments_info(polymer_json): comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText')) comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText')) comment_info['like_count'] = comment_renderer.get('likeCount') + comment_info['approx_like_count'] = extract_approx_int( + comment_renderer.get('voteCount')) liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount')) info['comments'].append(comment_info) |
