aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/yt_data_extract/everything_else.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/yt_data_extract/everything_else.py')
-rw-r--r--youtube/yt_data_extract/everything_else.py261
1 files changed, 202 insertions, 59 deletions
diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py
index f9c47cb..5930111 100644
--- a/youtube/yt_data_extract/everything_else.py
+++ b/youtube/yt_data_extract/everything_else.py
@@ -9,7 +9,7 @@ import re
import urllib
from math import ceil
-def extract_channel_info(polymer_json, tab):
+def extract_channel_info(polymer_json, tab, continuation=False):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
@@ -23,7 +23,8 @@ def extract_channel_info(polymer_json, tab):
# channel doesn't exist or was terminated
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
- if not metadata:
+ # metadata and microformat are not present for continuation requests
+ if not metadata and not continuation:
if response.get('alerts'):
error_string = ' '.join(
extract_str(deep_get(alert, 'alertRenderer', 'text'), default='')
@@ -44,7 +45,7 @@ def extract_channel_info(polymer_json, tab):
info['approx_subscriber_count'] = extract_approx_int(deep_get(response,
'header', 'c4TabbedHeaderRenderer', 'subscriberCountText'))
- # stuff from microformat (info given by youtube for every page on channel)
+ # stuff from microformat (info given by youtube for first page on channel)
info['short_description'] = metadata.get('description')
if info['short_description'] and len(info['short_description']) > 730:
info['short_description'] = info['short_description'][0:730] + '...'
@@ -69,32 +70,99 @@ def extract_channel_info(polymer_json, tab):
info['ctoken'] = None
# empty channel
- if 'contents' not in response and 'continuationContents' not in response:
- return info
+ #if 'contents' not in response and 'continuationContents' not in response:
+ # return info
- if tab in ('videos', 'playlists', 'search'):
+ if tab in ('videos', 'shorts', 'streams', 'playlists', 'search'):
items, ctoken = extract_items(response)
- additional_info = {'author': info['channel_name'], 'author_url': info['channel_url']}
+ additional_info = {
+ 'author': info['channel_name'],
+ 'author_id': info['channel_id'],
+ 'author_url': info['channel_url'],
+ }
info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
info['ctoken'] = ctoken
if tab in ('search', 'playlists'):
info['is_last_page'] = (ctoken is None)
elif tab == 'about':
- items, _ = extract_items(response, item_types={'channelAboutFullMetadataRenderer'})
- if not items:
- info['error'] = 'Could not find channelAboutFullMetadataRenderer'
- return info
- channel_metadata = items[0]['channelAboutFullMetadataRenderer']
-
- info['links'] = []
- for link_json in channel_metadata.get('primaryLinks', ()):
- url = remove_redirect(deep_get(link_json, 'navigationEndpoint', 'urlEndpoint', 'url'))
- text = extract_str(link_json.get('title'))
- info['links'].append( (text, url) )
-
- info['date_joined'] = extract_date(channel_metadata.get('joinedDateText'))
- info['view_count'] = extract_int(channel_metadata.get('viewCountText'))
- info['description'] = extract_str(channel_metadata.get('description'), default='')
+ # Latest type
+ items, _ = extract_items(response, item_types={'aboutChannelRenderer'})
+ if items:
+ a_metadata = deep_get(items, 0, 'aboutChannelRenderer',
+ 'metadata', 'aboutChannelViewModel')
+ if not a_metadata:
+ info['error'] = 'Could not find aboutChannelViewModel'
+ return info
+
+ info['links'] = []
+ for link_outer in a_metadata.get('links', ()):
+ link = link_outer.get('channelExternalLinkViewModel') or {}
+ link_content = extract_str(deep_get(link, 'link', 'content'))
+ for run in deep_get(link, 'link', 'commandRuns') or ():
+ url = remove_redirect(deep_get(run, 'onTap',
+ 'innertubeCommand', 'urlEndpoint', 'url'))
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ if link_content is None or (link_content in url):
+ break
+ else: # didn't break
+ url = link_content
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ text = extract_str(deep_get(link, 'title', 'content'))
+ info['links'].append( (text, url) )
+
+ info['date_joined'] = extract_date(
+ a_metadata.get('joinedDateText')
+ )
+ info['view_count'] = extract_int(a_metadata.get('viewCountText'))
+ info['approx_view_count'] = extract_approx_int(
+ a_metadata.get('viewCountText')
+ )
+ info['description'] = extract_str(
+ a_metadata.get('description'), default=''
+ )
+ info['approx_video_count'] = extract_approx_int(
+ a_metadata.get('videoCountText')
+ )
+ info['approx_subscriber_count'] = extract_approx_int(
+ a_metadata.get('subscriberCountText')
+ )
+ info['country'] = extract_str(a_metadata.get('country'))
+ info['canonical_url'] = extract_str(
+ a_metadata.get('canonicalChannelUrl')
+ )
+
+ # Old type
+ else:
+ items, _ = extract_items(response,
+ item_types={'channelAboutFullMetadataRenderer'})
+ if not items:
+ info['error'] = 'Could not find aboutChannelRenderer or channelAboutFullMetadataRenderer'
+ return info
+ a_metadata = items[0]['channelAboutFullMetadataRenderer']
+
+ info['links'] = []
+ for link_json in a_metadata.get('primaryLinks', ()):
+ url = remove_redirect(deep_get(link_json, 'navigationEndpoint',
+ 'urlEndpoint', 'url'))
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ text = extract_str(link_json.get('title'))
+ info['links'].append( (text, url) )
+
+ info['date_joined'] = extract_date(a_metadata.get('joinedDateText'))
+ info['view_count'] = extract_int(a_metadata.get('viewCountText'))
+ info['description'] = extract_str(a_metadata.get(
+ 'description'), default='')
+
+ info['approx_video_count'] = None
+ info['approx_subscriber_count'] = None
+ info['country'] = None
+ info['canonical_url'] = None
else:
raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
@@ -150,39 +218,112 @@ def extract_playlist_metadata(polymer_json):
return {'error': err}
metadata = {'error': None}
+ metadata['title'] = None
+ metadata['first_video_id'] = None
+ metadata['thumbnail'] = None
+ metadata['video_count'] = None
+ metadata['description'] = ''
+ metadata['author'] = None
+ metadata['author_id'] = None
+ metadata['author_url'] = None
+ metadata['view_count'] = None
+ metadata['like_count'] = None
+ metadata['time_published'] = None
+
header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
- metadata['title'] = extract_str(header.get('title'))
- metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
- first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
- 'thumbnail', 'thumbnails', 0, 'url', default=''))
- if first_id:
- conservative_update(metadata, 'first_video_id', first_id.group(1))
- if metadata['first_video_id'] is None:
- metadata['thumbnail'] = None
+ if header:
+ # Classic playlistHeaderRenderer format
+ metadata['title'] = extract_str(header.get('title'))
+ metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
+ first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
+ 'thumbnail', 'thumbnails', 0, 'url', default=''))
+ if first_id:
+ conservative_update(metadata, 'first_video_id', first_id.group(1))
+
+ metadata['video_count'] = extract_int(header.get('numVideosText'))
+ metadata['description'] = extract_str(header.get('descriptionText'), default='')
+ metadata['author'] = extract_str(header.get('ownerText'))
+ metadata['author_id'] = multi_deep_get(header,
+ ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
+ ['ownerEndpoint', 'browseEndpoint', 'browseId'])
+ metadata['view_count'] = extract_int(header.get('viewCountText'))
+ metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
+ for stat in header.get('stats', ()):
+ text = extract_str(stat)
+ if 'videos' in text or 'episodes' in text:
+ conservative_update(metadata, 'video_count', extract_int(text))
+ elif 'views' in text:
+ conservative_update(metadata, 'view_count', extract_int(text))
+ elif 'updated' in text:
+ metadata['time_published'] = extract_date(text)
else:
- metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg'
-
- metadata['video_count'] = extract_int(header.get('numVideosText'))
- metadata['description'] = extract_str(header.get('descriptionText'), default='')
- metadata['author'] = extract_str(header.get('ownerText'))
- metadata['author_id'] = multi_deep_get(header,
- ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
- ['ownerEndpoint', 'browseEndpoint', 'browseId'])
+ # New pageHeaderRenderer format (YouTube 2024+)
+ page_header = deep_get(response, 'header', 'pageHeaderRenderer', default={})
+ metadata['title'] = page_header.get('pageTitle')
+ view_model = deep_get(page_header, 'content', 'pageHeaderViewModel', default={})
+
+ # Extract title from viewModel if not found
+ if not metadata['title']:
+ metadata['title'] = deep_get(view_model,
+ 'title', 'dynamicTextViewModel', 'text', 'content')
+
+ # Extract metadata from rows (author, video count, views, etc.)
+ meta_rows = deep_get(view_model,
+ 'metadata', 'contentMetadataViewModel', 'metadataRows', default=[])
+ for row in meta_rows:
+ for part in row.get('metadataParts', []):
+ text_content = deep_get(part, 'text', 'content', default='')
+ # Author from avatarStack
+ avatar_stack = deep_get(part, 'avatarStack', 'avatarStackViewModel', default={})
+ if avatar_stack:
+ author_text = deep_get(avatar_stack, 'text', 'content')
+ if author_text:
+ metadata['author'] = author_text
+ # Extract author_id from commandRuns
+ for run in deep_get(avatar_stack, 'text', 'commandRuns', default=[]):
+ browse_id = deep_get(run, 'onTap', 'innertubeCommand',
+ 'browseEndpoint', 'browseId')
+ if browse_id:
+ metadata['author_id'] = browse_id
+ # Video/episode count
+ if text_content and ('video' in text_content.lower() or 'episode' in text_content.lower()):
+ conservative_update(metadata, 'video_count', extract_int(text_content))
+ # View count
+ elif text_content and 'view' in text_content.lower():
+ conservative_update(metadata, 'view_count', extract_int(text_content))
+ # Last updated
+ elif text_content and 'updated' in text_content.lower():
+ metadata['time_published'] = extract_date(text_content)
+
+ # Extract description from sidebar if available
+ sidebar = deep_get(response, 'sidebar', 'playlistSidebarRenderer', 'items', default=[])
+ for sidebar_item in sidebar:
+ desc = deep_get(sidebar_item, 'playlistSidebarPrimaryInfoRenderer',
+ 'description', 'simpleText')
+ if desc:
+ metadata['description'] = desc
+
if metadata['author_id']:
metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
+
+ if metadata['first_video_id'] is None:
+ metadata['thumbnail'] = None
else:
- metadata['author_url'] = None
- metadata['view_count'] = extract_int(header.get('viewCountText'))
- metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
- for stat in header.get('stats', ()):
- text = extract_str(stat)
- if 'videos' in text:
- conservative_update(metadata, 'video_count', extract_int(text))
- elif 'views' in text:
- conservative_update(metadata, 'view_count', extract_int(text))
- elif 'updated' in text:
- metadata['time_published'] = extract_date(text)
+ metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg"
+
+ microformat = deep_get(response, 'microformat', 'microformatDataRenderer',
+ default={})
+ conservative_update(
+ metadata, 'title', extract_str(microformat.get('title'))
+ )
+ conservative_update(
+ metadata, 'description', extract_str(microformat.get('description'))
+ )
+ conservative_update(
+ metadata, 'thumbnail', deep_get(microformat, 'thumbnail',
+ 'thumbnails', -1, 'url')
+ )
return metadata
@@ -191,13 +332,11 @@ def extract_playlist_info(polymer_json):
if err:
return {'error': err}
info = {'error': None}
- first_page = 'continuationContents' not in response
video_list, _ = extract_items(response)
info['items'] = [extract_item_info(renderer) for renderer in video_list]
- if first_page:
- info['metadata'] = extract_playlist_metadata(polymer_json)
+ info['metadata'] = extract_playlist_metadata(polymer_json)
return info
@@ -220,15 +359,13 @@ def _ctoken_metadata(ctoken):
result['sort'] = 0
return result
-def extract_comments_info(polymer_json):
+def extract_comments_info(polymer_json, ctoken=None):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
info = {'error': None}
- url = multi_deep_get(polymer_json, [1, 'url'], ['url'])
- if url:
- ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
+ if ctoken:
metadata = _ctoken_metadata(ctoken)
else:
metadata = {}
@@ -256,9 +393,13 @@ def extract_comments_info(polymer_json):
comment_info['reply_count'] = extract_int(deep_get(comment_thread,
'replies', 'commentRepliesRenderer', 'moreText'
), default=1) # With 1 reply, the text reads "View reply"
- comment_info['reply_ctoken'] = deep_get(comment_thread,
- 'replies', 'commentRepliesRenderer', 'continuations', 0,
- 'nextContinuationData', 'continuation'
+ comment_info['reply_ctoken'] = multi_deep_get(
+ comment_thread,
+ ['replies', 'commentRepliesRenderer', 'contents', 0,
+ 'continuationItemRenderer', 'button', 'buttonRenderer',
+ 'command', 'continuationCommand', 'token'],
+ ['replies', 'commentRepliesRenderer', 'continuations', 0,
+ 'nextContinuationData', 'continuation']
)
comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={})
elif 'commentRenderer' in comment: # replies
@@ -282,6 +423,8 @@ def extract_comments_info(polymer_json):
comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText'))
comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText'))
comment_info['like_count'] = comment_renderer.get('likeCount')
+ comment_info['approx_like_count'] = extract_approx_int(
+ comment_renderer.get('voteCount'))
liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount'))
info['comments'].append(comment_info)