aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/yt_data_extract
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/yt_data_extract')
-rw-r--r--youtube/yt_data_extract/__init__.py4
-rw-r--r--youtube/yt_data_extract/common.py169
-rw-r--r--youtube/yt_data_extract/everything_else.py149
-rw-r--r--youtube/yt_data_extract/watch_extraction.py323
4 files changed, 508 insertions, 137 deletions
diff --git a/youtube/yt_data_extract/__init__.py b/youtube/yt_data_extract/__init__.py
index ad7bd03..de1812d 100644
--- a/youtube/yt_data_extract/__init__.py
+++ b/youtube/yt_data_extract/__init__.py
@@ -7,7 +7,7 @@ from .everything_else import (extract_channel_info, extract_search_info,
extract_playlist_metadata, extract_playlist_info, extract_comments_info)
from .watch_extraction import (extract_watch_info, get_caption_url,
- update_with_age_restricted_info, requires_decryption,
+ update_with_new_urls, requires_decryption,
extract_decryption_function, decrypt_signatures, _formats,
update_format_with_type_info, extract_hls_formats,
- extract_watch_info_from_html)
+ extract_watch_info_from_html, captions_available)
diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py
index 2b394e6..7903db5 100644
--- a/youtube/yt_data_extract/common.py
+++ b/youtube/yt_data_extract/common.py
@@ -1,6 +1,7 @@
import re
import urllib.parse
import collections
+import collections.abc
def get(object, key, default=None, types=()):
'''Like dict.get(), but returns default if the result doesn't match one of the types.
@@ -62,17 +63,40 @@ def multi_deep_get(object, *key_sequences, default=None, types=()):
continue
return default
+
+def _is_empty(value):
+ '''Determines if value is None or an empty iterable, such as '' and []'''
+ if value is None:
+ return True
+ elif isinstance(value, collections.abc.Iterable) and not value:
+ return True
+ return False
+
+
def liberal_update(obj, key, value):
- '''Updates obj[key] with value as long as value is not None.
- Ensures obj[key] will at least get a value of None, however'''
- if (value is not None) or (key not in obj):
+ '''Updates obj[key] with value as long as value is not None or empty.
+ Ensures obj[key] will at least get an empty value, however'''
+ if (not _is_empty(value)) or (key not in obj):
obj[key] = value
def conservative_update(obj, key, value):
- '''Only updates obj if it doesn't have key or obj[key] is None'''
- if obj.get(key) is None:
+ '''Only updates obj if it doesn't have key or obj[key] is None/empty'''
+ if _is_empty(obj.get(key)):
obj[key] = value
+
+def liberal_dict_update(dict1, dict2):
+ '''Update dict1 with keys from dict2 using liberal_update'''
+ for key, value in dict2.items():
+ liberal_update(dict1, key, value)
+
+
+def conservative_dict_update(dict1, dict2):
+ '''Update dict1 with keys from dict2 using conservative_update'''
+ for key, value in dict2.items():
+ conservative_update(dict1, key, value)
+
+
def concat_or_none(*strings):
'''Concatenates strings. Returns None if any of the arguments are None'''
result = ''
@@ -85,7 +109,7 @@ def concat_or_none(*strings):
def remove_redirect(url):
if url is None:
return None
- if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking
+ if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # YouTube puts these on external links to do tracking
query_string = url[url.find('?')+1: ]
return urllib.parse.parse_qs(query_string)['q'][0]
return url
@@ -109,14 +133,14 @@ def _recover_urls(runs):
for run in runs:
url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
text = run.get('text', '')
- # second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text
+ # second condition is necessary because YouTube makes other things into urls, such as hashtags, which we want to keep as text
if url is not None and (text.startswith('http://') or text.startswith('https://')):
url = remove_redirect(url)
run['url'] = url
- run['text'] = url # youtube truncates the url text, use actual url instead
+ run['text'] = url # YouTube truncates the url text, use actual url instead
def extract_str(node, default=None, recover_urls=False):
- '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)'''
+ '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix YouTube's truncation of url text (most prominently seen in descriptions)'''
if isinstance(node, str):
return node
@@ -142,14 +166,17 @@ def extract_formatted_text(node):
return [{'text': node['simpleText']}]
return []
-def extract_int(string, default=None):
+def extract_int(string, default=None, whole_word=True):
if isinstance(string, int):
return string
if not isinstance(string, str):
string = extract_str(string)
if not string:
return default
- match = re.search(r'\b(\d+)\b', string.replace(',', ''))
+ if whole_word:
+ match = re.search(r'\b(\d+)\b', string.replace(',', ''))
+ else:
+ match = re.search(r'(\d+)', string.replace(',', ''))
if match is None:
return default
try:
@@ -158,7 +185,7 @@ def extract_int(string, default=None):
return default
def extract_approx_int(string):
- '''e.g. "15.1M" from "15.1M subscribers"'''
+ '''e.g. "15.1M" from "15.1M subscribers" or '4,353' from 4353'''
if not isinstance(string, str):
string = extract_str(string)
if not string:
@@ -166,7 +193,10 @@ def extract_approx_int(string):
match = re.search(r'\b(\d+(?:\.\d+)?[KMBTkmbt]?)\b', string.replace(',', ''))
if match is None:
return None
- return match.group(1)
+ result = match.group(1)
+ if re.fullmatch(r'\d+', result):
+ result = '{:,}'.format(int(result))
+ return result
MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'}
def extract_date(date_text):
@@ -213,8 +243,6 @@ def extract_item_info(item, additional_info={}):
info['type'] = 'unsupported'
return info
- info.update(additional_info)
-
# type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
# camelCase split, https://stackoverflow.com/a/37697078
type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
@@ -224,6 +252,9 @@ def extract_item_info(item, additional_info={}):
primary_type = type_parts[-2]
if primary_type == 'video':
info['type'] = 'video'
+ elif type_parts[0] == 'reel': # shorts
+ info['type'] = 'video'
+ primary_type = 'video'
elif primary_type in ('playlist', 'radio', 'show'):
info['type'] = 'playlist'
info['playlist_type'] = primary_type
@@ -245,7 +276,11 @@ def extract_item_info(item, additional_info={}):
['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
))
info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
- info['description'] = extract_formatted_text(multi_get(item, 'descriptionSnippet', 'descriptionText'))
+ info['description'] = extract_formatted_text(multi_deep_get(
+ item,
+ ['descriptionText'], ['descriptionSnippet'],
+ ['detailedMetadataSnippets', 0, 'snippetText'],
+ ))
info['thumbnail'] = normalize_url(multi_deep_get(item,
['thumbnail', 'thumbnails', 0, 'url'], # videos
['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists
@@ -266,7 +301,11 @@ def extract_item_info(item, additional_info={}):
info['time_published'] = timestamp.group(1)
if primary_type == 'video':
- info['id'] = item.get('videoId')
+ info['id'] = multi_deep_get(item,
+ ['videoId'],
+ ['navigationEndpoint', 'watchEndpoint', 'videoId'],
+ ['navigationEndpoint', 'reelWatchEndpoint', 'videoId'] # shorts
+ )
info['view_count'] = extract_int(item.get('viewCountText'))
# dig into accessibility data to get view_count for videos marked as recommended, and to get time_published
@@ -284,17 +323,35 @@ def extract_item_info(item, additional_info={}):
if info['view_count']:
info['approx_view_count'] = '{:,}'.format(info['view_count'])
else:
- info['approx_view_count'] = extract_approx_int(item.get('shortViewCountText'))
+ info['approx_view_count'] = extract_approx_int(multi_get(item,
+ 'shortViewCountText',
+ 'viewCountText' # shorts
+ ))
# handle case where it is "No views"
if not info['approx_view_count']:
if ('No views' in item.get('shortViewCountText', '')
- or 'no views' in accessibility_label.lower()):
+ or 'no views' in accessibility_label.lower()
+ or 'No views' in extract_str(item.get('viewCountText', '')) # shorts
+ ):
info['view_count'] = 0
info['approx_view_count'] = '0'
info['duration'] = extract_str(item.get('lengthText'))
+ # dig into accessibility data to get duration for shorts
+ accessibility_label = deep_get(item,
+ 'accessibility', 'accessibilityData', 'label',
+ default='')
+ duration = re.search(r'(\d+) (second|seconds|minute) - play video$',
+ accessibility_label)
+ if duration:
+ if duration.group(2) == 'minute':
+ conservative_update(info, 'duration', '1:00')
+ else:
+ conservative_update(info,
+ 'duration', '0:' + duration.group(1).zfill(2))
+
# if it's an item in a playlist, get its index
if 'index' in item: # url has wrong index on playlist page
info['index'] = extract_int(item.get('index'))
@@ -335,6 +392,9 @@ def extract_item_info(item, additional_info={}):
conservative_update(info, 'video_count', extract_int(deep_get(
overlay, 'thumbnailOverlayBottomPanelRenderer', 'text'
)))
+
+ info.update(additional_info)
+
return info
def extract_response(polymer_json):
@@ -363,6 +423,8 @@ _item_types = {
'gridVideoRenderer',
'playlistVideoRenderer',
+ 'reelItemRenderer',
+
'playlistRenderer',
'compactPlaylistRenderer',
'gridPlaylistRenderer',
@@ -402,6 +464,7 @@ nested_renderer_dispatch = {
'twoColumnBrowseResultsRenderer': _traverse_browse_renderer,
'twoColumnSearchResultsRenderer': lambda r: get(r, 'primaryContents', {}),
'richItemRenderer': lambda r: get(r, 'content', {}),
+ 'engagementPanelSectionListRenderer': lambda r: get(r, 'content', {}),
}
# these renderers contain a list of renderers inside them
@@ -411,6 +474,8 @@ nested_renderer_list_dispatch = {
'gridRenderer': _traverse_standard_list,
'richGridRenderer': _traverse_standard_list,
'playlistVideoListRenderer': _traverse_standard_list,
+ 'structuredDescriptionContentRenderer': _traverse_standard_list,
+ 'slimVideoMetadataSectionRenderer': _traverse_standard_list,
'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[]), None),
}
def get_nested_renderer_list_function(key):
@@ -474,8 +539,27 @@ def extract_items_from_renderer(renderer, item_types=_item_types):
renderer = None
-def extract_items(response, item_types=_item_types):
+
+def extract_items_from_renderer_list(renderers, item_types=_item_types):
+ '''Same as extract_items_from_renderer, but provide a list of renderers'''
+ items = []
+ ctoken = None
+ for renderer in renderers:
+ new_items, new_ctoken = extract_items_from_renderer(
+ renderer,
+ item_types=item_types)
+ items += new_items
+ # prioritize ctoken associated with items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+ return items, ctoken
+
+
+def extract_items(response, item_types=_item_types,
+ search_engagement_panels=False):
'''return items, ctoken'''
+ items = []
+ ctoken = None
if 'continuationContents' in response:
# sometimes there's another, empty, junk [something]Continuation key
# find real one
@@ -483,13 +567,44 @@ def extract_items(response, item_types=_item_types):
'continuationContents', {}).items():
# e.g. commentSectionContinuation, playlistVideoListContinuation
if key.endswith('Continuation'):
- items, cont = extract_items_from_renderer({key: renderer_cont},
+ items, ctoken = extract_items_from_renderer(
+ {key: renderer_cont},
item_types=item_types)
if items:
- return items, cont
- return [], None
- elif 'contents' in response:
+ break
+ if ('onResponseReceivedEndpoints' in response
+ or 'onResponseReceivedActions' in response):
+ for endpoint in multi_get(response,
+ 'onResponseReceivedEndpoints',
+ 'onResponseReceivedActions',
+ []):
+ new_items, new_ctoken = extract_items_from_renderer_list(
+ multi_deep_get(
+ endpoint,
+ ['reloadContinuationItemsCommand', 'continuationItems'],
+ ['appendContinuationItemsAction', 'continuationItems'],
+ default=[]
+ ),
+ item_types=item_types,
+ )
+ items += new_items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+ if 'contents' in response:
renderer = get(response, 'contents', {})
- return extract_items_from_renderer(renderer, item_types=item_types)
- else:
- return [], None
+ new_items, new_ctoken = extract_items_from_renderer(
+ renderer,
+ item_types=item_types)
+ items += new_items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+
+ if search_engagement_panels and 'engagementPanels' in response:
+ new_items, new_ctoken = extract_items_from_renderer_list(
+ response['engagementPanels'], item_types=item_types
+ )
+ items += new_items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+
+ return items, ctoken
diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py
index f9c47cb..0f64649 100644
--- a/youtube/yt_data_extract/everything_else.py
+++ b/youtube/yt_data_extract/everything_else.py
@@ -9,7 +9,7 @@ import re
import urllib
from math import ceil
-def extract_channel_info(polymer_json, tab):
+def extract_channel_info(polymer_json, tab, continuation=False):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
@@ -23,7 +23,8 @@ def extract_channel_info(polymer_json, tab):
# channel doesn't exist or was terminated
# example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
- if not metadata:
+ # metadata and microformat are not present for continuation requests
+ if not metadata and not continuation:
if response.get('alerts'):
error_string = ' '.join(
extract_str(deep_get(alert, 'alertRenderer', 'text'), default='')
@@ -44,7 +45,7 @@ def extract_channel_info(polymer_json, tab):
info['approx_subscriber_count'] = extract_approx_int(deep_get(response,
'header', 'c4TabbedHeaderRenderer', 'subscriberCountText'))
- # stuff from microformat (info given by youtube for every page on channel)
+ # stuff from microformat (info given by youtube for first page on channel)
info['short_description'] = metadata.get('description')
if info['short_description'] and len(info['short_description']) > 730:
info['short_description'] = info['short_description'][0:730] + '...'
@@ -69,32 +70,99 @@ def extract_channel_info(polymer_json, tab):
info['ctoken'] = None
# empty channel
- if 'contents' not in response and 'continuationContents' not in response:
- return info
+ #if 'contents' not in response and 'continuationContents' not in response:
+ # return info
- if tab in ('videos', 'playlists', 'search'):
+ if tab in ('videos', 'shorts', 'streams', 'playlists', 'search'):
items, ctoken = extract_items(response)
- additional_info = {'author': info['channel_name'], 'author_url': info['channel_url']}
+ additional_info = {
+ 'author': info['channel_name'],
+ 'author_id': info['channel_id'],
+ 'author_url': info['channel_url'],
+ }
info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
info['ctoken'] = ctoken
if tab in ('search', 'playlists'):
info['is_last_page'] = (ctoken is None)
elif tab == 'about':
- items, _ = extract_items(response, item_types={'channelAboutFullMetadataRenderer'})
- if not items:
- info['error'] = 'Could not find channelAboutFullMetadataRenderer'
- return info
- channel_metadata = items[0]['channelAboutFullMetadataRenderer']
-
- info['links'] = []
- for link_json in channel_metadata.get('primaryLinks', ()):
- url = remove_redirect(deep_get(link_json, 'navigationEndpoint', 'urlEndpoint', 'url'))
- text = extract_str(link_json.get('title'))
- info['links'].append( (text, url) )
-
- info['date_joined'] = extract_date(channel_metadata.get('joinedDateText'))
- info['view_count'] = extract_int(channel_metadata.get('viewCountText'))
- info['description'] = extract_str(channel_metadata.get('description'), default='')
+ # Latest type
+ items, _ = extract_items(response, item_types={'aboutChannelRenderer'})
+ if items:
+ a_metadata = deep_get(items, 0, 'aboutChannelRenderer',
+ 'metadata', 'aboutChannelViewModel')
+ if not a_metadata:
+ info['error'] = 'Could not find aboutChannelViewModel'
+ return info
+
+ info['links'] = []
+ for link_outer in a_metadata.get('links', ()):
+ link = link_outer.get('channelExternalLinkViewModel') or {}
+ link_content = extract_str(deep_get(link, 'link', 'content'))
+ for run in deep_get(link, 'link', 'commandRuns') or ():
+ url = remove_redirect(deep_get(run, 'onTap',
+ 'innertubeCommand', 'urlEndpoint', 'url'))
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ if link_content is None or (link_content in url):
+ break
+ else: # didn't break
+ url = link_content
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ text = extract_str(deep_get(link, 'title', 'content'))
+ info['links'].append( (text, url) )
+
+ info['date_joined'] = extract_date(
+ a_metadata.get('joinedDateText')
+ )
+ info['view_count'] = extract_int(a_metadata.get('viewCountText'))
+ info['approx_view_count'] = extract_approx_int(
+ a_metadata.get('viewCountText')
+ )
+ info['description'] = extract_str(
+ a_metadata.get('description'), default=''
+ )
+ info['approx_video_count'] = extract_approx_int(
+ a_metadata.get('videoCountText')
+ )
+ info['approx_subscriber_count'] = extract_approx_int(
+ a_metadata.get('subscriberCountText')
+ )
+ info['country'] = extract_str(a_metadata.get('country'))
+ info['canonical_url'] = extract_str(
+ a_metadata.get('canonicalChannelUrl')
+ )
+
+ # Old type
+ else:
+ items, _ = extract_items(response,
+ item_types={'channelAboutFullMetadataRenderer'})
+ if not items:
+ info['error'] = 'Could not find aboutChannelRenderer or channelAboutFullMetadataRenderer'
+ return info
+ a_metadata = items[0]['channelAboutFullMetadataRenderer']
+
+ info['links'] = []
+ for link_json in a_metadata.get('primaryLinks', ()):
+ url = remove_redirect(deep_get(link_json, 'navigationEndpoint',
+ 'urlEndpoint', 'url'))
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ text = extract_str(link_json.get('title'))
+ info['links'].append( (text, url) )
+
+ info['date_joined'] = extract_date(a_metadata.get('joinedDateText'))
+ info['view_count'] = extract_int(a_metadata.get('viewCountText'))
+ info['description'] = extract_str(a_metadata.get(
+ 'description'), default='')
+
+ info['approx_video_count'] = None
+ info['approx_subscriber_count'] = None
+ info['country'] = None
+ info['canonical_url'] = None
else:
raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
@@ -161,7 +229,7 @@ def extract_playlist_metadata(polymer_json):
if metadata['first_video_id'] is None:
metadata['thumbnail'] = None
else:
- metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg'
+ metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg"
metadata['video_count'] = extract_int(header.get('numVideosText'))
metadata['description'] = extract_str(header.get('descriptionText'), default='')
@@ -184,6 +252,19 @@ def extract_playlist_metadata(polymer_json):
elif 'updated' in text:
metadata['time_published'] = extract_date(text)
+ microformat = deep_get(response, 'microformat', 'microformatDataRenderer',
+ default={})
+ conservative_update(
+ metadata, 'title', extract_str(microformat.get('title'))
+ )
+ conservative_update(
+ metadata, 'description', extract_str(microformat.get('description'))
+ )
+ conservative_update(
+ metadata, 'thumbnail', deep_get(microformat, 'thumbnail',
+ 'thumbnails', -1, 'url')
+ )
+
return metadata
def extract_playlist_info(polymer_json):
@@ -191,13 +272,11 @@ def extract_playlist_info(polymer_json):
if err:
return {'error': err}
info = {'error': None}
- first_page = 'continuationContents' not in response
video_list, _ = extract_items(response)
info['items'] = [extract_item_info(renderer) for renderer in video_list]
- if first_page:
- info['metadata'] = extract_playlist_metadata(polymer_json)
+ info['metadata'] = extract_playlist_metadata(polymer_json)
return info
@@ -220,15 +299,13 @@ def _ctoken_metadata(ctoken):
result['sort'] = 0
return result
-def extract_comments_info(polymer_json):
+def extract_comments_info(polymer_json, ctoken=None):
response, err = extract_response(polymer_json)
if err:
return {'error': err}
info = {'error': None}
- url = multi_deep_get(polymer_json, [1, 'url'], ['url'])
- if url:
- ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
+ if ctoken:
metadata = _ctoken_metadata(ctoken)
else:
metadata = {}
@@ -256,9 +333,13 @@ def extract_comments_info(polymer_json):
comment_info['reply_count'] = extract_int(deep_get(comment_thread,
'replies', 'commentRepliesRenderer', 'moreText'
), default=1) # With 1 reply, the text reads "View reply"
- comment_info['reply_ctoken'] = deep_get(comment_thread,
- 'replies', 'commentRepliesRenderer', 'continuations', 0,
- 'nextContinuationData', 'continuation'
+ comment_info['reply_ctoken'] = multi_deep_get(
+ comment_thread,
+ ['replies', 'commentRepliesRenderer', 'contents', 0,
+ 'continuationItemRenderer', 'button', 'buttonRenderer',
+ 'command', 'continuationCommand', 'token'],
+ ['replies', 'commentRepliesRenderer', 'continuations', 0,
+ 'nextContinuationData', 'continuation']
)
comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={})
elif 'commentRenderer' in comment: # replies
@@ -282,6 +363,8 @@ def extract_comments_info(polymer_json):
comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText'))
comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText'))
comment_info['like_count'] = comment_renderer.get('likeCount')
+ comment_info['approx_like_count'] = extract_approx_int(
+ comment_renderer.get('voteCount'))
liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount'))
info['comments'].append(comment_info)
diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py
index db53581..e09e2d3 100644
--- a/youtube/yt_data_extract/watch_extraction.py
+++ b/youtube/yt_data_extract/watch_extraction.py
@@ -2,7 +2,8 @@ from .common import (get, multi_get, deep_get, multi_deep_get,
liberal_update, conservative_update, remove_redirect, normalize_url,
extract_str, extract_formatted_text, extract_int, extract_approx_int,
extract_date, check_missing_keys, extract_item_info, extract_items,
- extract_response, concat_or_none)
+ extract_response, concat_or_none, liberal_dict_update,
+ conservative_dict_update)
import json
import urllib.parse
@@ -116,7 +117,99 @@ _formats = {
'397': {'vcodec': 'av01.0.05M.08'},
}
-def _extract_metadata_row_info(video_renderer_info):
+
+def _extract_from_video_information_renderer(renderer_content):
+ subtitle = extract_str(renderer_content.get('expandedSubtitle'),
+ default='')
+ info = {
+ 'title': extract_str(renderer_content.get('title')),
+ 'view_count': extract_int(subtitle),
+ 'unlisted': False,
+ 'live': 'watching' in subtitle,
+ }
+ for badge in renderer_content.get('badges', []):
+ if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted':
+ info['unlisted'] = True
+ return info
+
+def _extract_likes_dislikes(renderer_content):
+ def extract_button_count(toggle_button_renderer):
+ # all the digits can be found in the accessibility data
+ count = extract_int(multi_deep_get(
+ toggle_button_renderer,
+ ['defaultText', 'accessibility', 'accessibilityData', 'label'],
+ ['accessibility', 'label'],
+ ['accessibilityData', 'accessibilityData', 'label'],
+ ['accessibilityText'],
+ ))
+
+ # this count doesn't have all the digits, it's like 53K for instance
+ dumb_count = extract_int(extract_str(multi_get(
+ toggle_button_renderer, ['defaultText', 'title'])))
+
+ # The accessibility text will be "No likes" or "No dislikes" or
+ # something like that, but dumb count will be 0
+ if dumb_count == 0:
+ count = 0
+ return count
+
+ info = {
+ 'like_count': None,
+ 'dislike_count': None,
+ }
+ for button in renderer_content.get('buttons', ()):
+ if 'slimMetadataToggleButtonRenderer' in button:
+ button_renderer = button['slimMetadataToggleButtonRenderer']
+ count = extract_button_count(deep_get(button_renderer,
+ 'button',
+ 'toggleButtonRenderer'))
+ if 'isLike' in button_renderer:
+ info['like_count'] = count
+ elif 'isDislike' in button_renderer:
+ info['dislike_count'] = count
+ elif 'slimMetadataButtonRenderer' in button:
+ button_renderer = button['slimMetadataButtonRenderer']
+ liberal_update(info, 'like_count', extract_button_count(
+ multi_deep_get(button_renderer,
+ ['button', 'segmentedLikeDislikeButtonRenderer',
+ 'likeButton', 'toggleButtonRenderer'],
+ ['button', 'segmentedLikeDislikeButtonViewModel',
+ 'likeButtonViewModel', 'likeButtonViewModel',
+ 'toggleButtonViewModel', 'toggleButtonViewModel',
+ 'defaultButtonViewModel', 'buttonViewModel']
+ )
+ ))
+ '''liberal_update(info, 'dislike_count', extract_button_count(
+ deep_get(
+ button_renderer, 'button',
+ 'segmentedLikeDislikeButtonRenderer',
+ 'dislikeButton', 'toggleButtonRenderer'
+ )
+ ))'''
+ return info
+
+def _extract_from_owner_renderer(renderer_content):
+ return {
+ 'author': extract_str(renderer_content.get('title')),
+ 'author_id': deep_get(
+ renderer_content,
+ 'navigationEndpoint', 'browseEndpoint', 'browseId'),
+ }
+
+def _extract_from_video_header_renderer(renderer_content):
+ return {
+ 'title': extract_str(renderer_content.get('title')),
+ 'time_published': extract_date(extract_str(
+ renderer_content.get('publishDate'))),
+ }
+
+def _extract_from_description_renderer(renderer_content):
+ return {
+ 'description': extract_str(
+ renderer_content.get('descriptionBodyText'), recover_urls=True),
+ }
+
+def _extract_metadata_row_info(renderer_content):
# extract category and music list
info = {
'category': None,
@@ -124,7 +217,7 @@ def _extract_metadata_row_info(video_renderer_info):
}
current_song = {}
- for row in deep_get(video_renderer_info, 'metadataRowContainer', 'metadataRowContainerRenderer', 'rows', default=[]):
+ for row in deep_get(renderer_content, 'rows', default=[]):
row_title = extract_str(deep_get(row, 'metadataRowRenderer', 'title'), default='')
row_content = extract_str(deep_get(row, 'metadataRowRenderer', 'contents', 0))
if row_title == 'Category':
@@ -146,18 +239,69 @@ def _extract_metadata_row_info(video_renderer_info):
return info
-def _extract_watch_info_mobile(top_level):
- info = {}
- microformat = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={})
+def _extract_from_music_renderer(renderer_content):
+ # latest format for the music list
+ info = {
+ 'music_list': [],
+ }
- family_safe = microformat.get('isFamilySafe')
- if family_safe is None:
- info['age_restricted'] = None
- else:
- info['age_restricted'] = not family_safe
- info['allowed_countries'] = microformat.get('availableCountries', [])
- info['time_published'] = microformat.get('publishDate')
+ for carousel in renderer_content.get('carouselLockups', []):
+ song = {}
+ carousel = carousel.get('carouselLockupRenderer', {})
+ video_renderer = carousel.get('videoLockup', {})
+ video_renderer_info = extract_item_info(video_renderer)
+ video_id = video_renderer_info.get('id')
+ song['url'] = concat_or_none('https://www.youtube.com/watch?v=',
+ video_id)
+ song['title'] = video_renderer_info.get('title')
+ for row in carousel.get('infoRows', []):
+ row = row.get('infoRowRenderer', {})
+ title = extract_str(row.get('title'))
+ data = extract_str(row.get('defaultMetadata'))
+ if title == 'SONG':
+ song['title'] = data
+ elif title == 'ARTIST':
+ song['artist'] = data
+ elif title == 'ALBUM':
+ song['album'] = data
+ elif title == 'WRITERS':
+ song['writers'] = data
+ info['music_list'].append(song)
+ return info
+def _extract_from_video_metadata(renderer_content):
+ info = _extract_from_video_information_renderer(renderer_content)
+ liberal_dict_update(info, _extract_likes_dislikes(renderer_content))
+ liberal_dict_update(info, _extract_from_owner_renderer(renderer_content))
+ liberal_dict_update(info, _extract_metadata_row_info(deep_get(
+ renderer_content, 'metadataRowContainer',
+ 'metadataRowContainerRenderer', default={}
+ )))
+ liberal_update(info, 'title', extract_str(renderer_content.get('title')))
+ liberal_update(
+ info, 'description',
+ extract_str(renderer_content.get('description'), recover_urls=True)
+ )
+ liberal_update(info, 'time_published',
+ extract_date(renderer_content.get('dateText')))
+ return info
+
+visible_extraction_dispatch = {
+ # Either these ones spread around in various places
+ 'slimVideoInformationRenderer': _extract_from_video_information_renderer,
+ 'slimVideoActionBarRenderer': _extract_likes_dislikes,
+ 'slimOwnerRenderer': _extract_from_owner_renderer,
+ 'videoDescriptionHeaderRenderer': _extract_from_video_header_renderer,
+ 'videoDescriptionMusicSectionRenderer': _extract_from_music_renderer,
+ 'expandableVideoDescriptionRenderer': _extract_from_description_renderer,
+ 'metadataRowContainerRenderer': _extract_metadata_row_info,
+ # OR just this one, which contains SOME of the above inside it
+ 'slimVideoMetadataRenderer': _extract_from_video_metadata,
+}
+
+def _extract_watch_info_mobile(top_level):
+ '''Scrapes information from the visible page'''
+ info = {}
response = top_level.get('response', {})
# this renderer has the stuff visible on the page
@@ -190,47 +334,24 @@ def _extract_watch_info_mobile(top_level):
else:
info['playlist'] = None
- # Holds the visible video info. It is inside singleColumnWatchNextResults
- # but use our convenience function instead
- items, _ = extract_items(response, item_types={'slimVideoMetadataRenderer'})
- if items:
- video_info = items[0]['slimVideoMetadataRenderer']
- else:
- print('Failed to extract video metadata')
- video_info = {}
-
- info.update(_extract_metadata_row_info(video_info))
- info['description'] = extract_str(video_info.get('description'), recover_urls=True)
- info['view_count'] = extract_int(extract_str(video_info.get('expandedSubtitle')))
- info['author'] = extract_str(deep_get(video_info, 'owner', 'slimOwnerRenderer', 'title'))
- info['author_id'] = deep_get(video_info, 'owner', 'slimOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId')
- info['title'] = extract_str(video_info.get('title'))
- info['live'] = 'watching' in extract_str(video_info.get('expandedSubtitle'), default='')
- info['unlisted'] = False
- for badge in video_info.get('badges', []):
- if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted':
- info['unlisted'] = True
- info['like_count'] = None
- info['dislike_count'] = None
- if not info['time_published']:
- info['time_published'] = extract_date(extract_str(video_info.get('dateText', None)))
- for button in video_info.get('buttons', ()):
- button_renderer = button.get('slimMetadataToggleButtonRenderer', {})
-
- # all the digits can be found in the accessibility data
- count = extract_int(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText', 'accessibility', 'accessibilityData', 'label'))
-
- # this count doesn't have all the digits, it's like 53K for instance
- dumb_count = extract_int(extract_str(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText')))
-
- # the accessibility text will be "No likes" or "No dislikes" or something like that, but dumb count will be 0
- if dumb_count == 0:
- count = 0
-
- if 'isLike' in button_renderer:
- info['like_count'] = count
- elif 'isDislike' in button_renderer:
- info['dislike_count'] = count
+ # use dispatch table to get information scattered in various renderers
+ items, _ = extract_items(
+ response,
+ item_types=visible_extraction_dispatch.keys(),
+ search_engagement_panels=True
+ )
+ found = set()
+ for renderer in items:
+ name, renderer_content = list(renderer.items())[0]
+ found.add(name)
+ liberal_dict_update(
+ info,
+ visible_extraction_dispatch[name](renderer_content)
+ )
+ # Call the function on blank dict for any that weren't found
+ # so that the empty keys get added
+ for name in visible_extraction_dispatch.keys() - found:
+ liberal_dict_update(info, visible_extraction_dispatch[name]({}))
# comment section info
items, _ = extract_items(response, item_types={
@@ -244,17 +365,18 @@ def _extract_watch_info_mobile(top_level):
# https://www.androidpolice.com/2019/10/31/google-youtube-app-comment-section-below-videos/
# https://www.youtube.com/watch?v=bR5Q-wD-6qo
if header_type == 'commentsEntryPointHeaderRenderer':
- comment_count_text = extract_str(comment_info.get('headerText'))
+ comment_count_text = extract_str(multi_get(
+ comment_info, 'commentCount', 'headerText'))
else:
comment_count_text = extract_str(deep_get(comment_info,
'header', 'commentSectionHeaderRenderer', 'countText'))
if comment_count_text == 'Comments': # just this with no number, means 0 comments
- info['comment_count'] = 0
+ info['comment_count'] = '0'
else:
- info['comment_count'] = extract_int(comment_count_text)
+ info['comment_count'] = extract_approx_int(comment_count_text)
info['comments_disabled'] = False
else: # no comment section present means comments are disabled
- info['comment_count'] = 0
+ info['comment_count'] = '0'
info['comments_disabled'] = True
# check for limited state
@@ -274,7 +396,6 @@ def _extract_watch_info_desktop(top_level):
info = {
'comment_count': None,
'comments_disabled': None,
- 'allowed_countries': [],
'limited_state': None,
'playlist': None,
}
@@ -307,26 +428,28 @@ def _extract_watch_info_desktop(top_level):
return info
def update_format_with_codec_info(fmt, codec):
- if (codec.startswith('av')
- or codec in ('vp9', 'vp8', 'vp8.0', 'h263', 'h264', 'mp4v')):
+ if any(codec.startswith(c) for c in ('av', 'vp', 'h263', 'h264', 'mp4v')):
if codec == 'vp8.0':
codec = 'vp8'
conservative_update(fmt, 'vcodec', codec)
elif (codec.startswith('mp4a')
- or codec in ('opus', 'mp3', 'aac', 'dtse', 'ec-3', 'vorbis')):
+ or codec in ('opus', 'mp3', 'aac', 'dtse', 'ec-3', 'vorbis',
+ 'ac-3')):
conservative_update(fmt, 'acodec', codec)
else:
print('Warning: unrecognized codec: ' + codec)
fmt_type_re = re.compile(
- r'(text|audio|video)/([\w0-9]+); codecs="([\w0-9\.]+(?:, [\w0-9\.]+)*)"')
+ r'(text|audio|video)/([\w0-9]+); codecs="([^"]+)"')
def update_format_with_type_info(fmt, yt_fmt):
# 'type' for invidious api format
mime_type = multi_get(yt_fmt, 'mimeType', 'type')
if mime_type is None:
return
match = re.fullmatch(fmt_type_re, mime_type)
-
+ if match is None:
+ print('Warning: Could not read mimetype', mime_type)
+ return
type, fmt['ext'], codecs = match.groups()
codecs = codecs.split(', ')
for codec in codecs:
@@ -349,17 +472,32 @@ def _extract_formats(info, player_response):
for yt_fmt in yt_formats:
itag = yt_fmt.get('itag')
+ # Translated audio track
+ # Example: https://www.youtube.com/watch?v=gF9kkB0UWYQ
+ # Only get the original language for now so a foreign
+ # translation will not be picked just because it comes first
+ if deep_get(yt_fmt, 'audioTrack', 'audioIsDefault') is False:
+ continue
+
fmt = {}
fmt['itag'] = itag
fmt['ext'] = None
fmt['audio_bitrate'] = None
+ fmt['bitrate'] = yt_fmt.get('bitrate')
fmt['acodec'] = None
fmt['vcodec'] = None
fmt['width'] = yt_fmt.get('width')
fmt['height'] = yt_fmt.get('height')
- fmt['file_size'] = yt_fmt.get('contentLength')
- fmt['audio_sample_rate'] = yt_fmt.get('audioSampleRate')
+ fmt['file_size'] = extract_int(yt_fmt.get('contentLength'))
+ fmt['audio_sample_rate'] = extract_int(yt_fmt.get('audioSampleRate'))
+ fmt['duration_ms'] = yt_fmt.get('approxDurationMs')
fmt['fps'] = yt_fmt.get('fps')
+ fmt['init_range'] = yt_fmt.get('initRange')
+ fmt['index_range'] = yt_fmt.get('indexRange')
+ for key in ('init_range', 'index_range'):
+ if fmt[key]:
+ fmt[key]['start'] = int(fmt[key]['start'])
+ fmt[key]['end'] = int(fmt[key]['end'])
update_format_with_type_info(fmt, yt_fmt)
cipher = dict(urllib.parse.parse_qsl(multi_get(yt_fmt,
'cipher', 'signatureCipher', default='')))
@@ -373,8 +511,16 @@ def _extract_formats(info, player_response):
# update with information from big table
hardcoded_itag_info = _formats.get(str(itag), {})
for key, value in hardcoded_itag_info.items():
- conservative_update(fmt, key, value) # prefer info from Youtube
+ conservative_update(fmt, key, value) # prefer info from YouTube
fmt['quality'] = hardcoded_itag_info.get('height')
+ conservative_update(
+ fmt, 'quality',
+ extract_int(yt_fmt.get('quality'), whole_word=False)
+ )
+ conservative_update(
+ fmt, 'quality',
+ extract_int(yt_fmt.get('qualityLabel'), whole_word=False)
+ )
info['formats'].append(fmt)
@@ -397,7 +543,7 @@ def extract_hls_formats(hls_manifest):
if lines[i].startswith('#EXT-X-STREAM-INF'):
fmt = {'acodec': None, 'vcodec': None, 'height': None,
'width': None, 'fps': None, 'audio_bitrate': None,
- 'itag': None, 'file_size': None,
+ 'itag': None, 'file_size': None, 'duration_ms': None,
'audio_sample_rate': None, 'url': None}
properties = lines[i].split(':')[1]
properties += ',' # make regex work for last key-value pair
@@ -484,6 +630,25 @@ def extract_watch_info(polymer_json):
info['translation_languages'] = []
captions_info = player_response.get('captions', {})
info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
+ # Sometimes the above playerCaptionsRender is randomly missing
+ # Extract base_url from one of the captions by removing lang specifiers
+ if not info['_captions_base_url']:
+ base_url = normalize_url(deep_get(
+ captions_info,
+ 'playerCaptionsTracklistRenderer',
+ 'captionTracks',
+ 0,
+ 'baseUrl'
+ ))
+ if base_url:
+ url_parts = urllib.parse.urlparse(base_url)
+ qs = urllib.parse.parse_qs(url_parts.query)
+ for key in ('tlang', 'lang', 'name', 'kind', 'fmt'):
+ if key in qs:
+ del qs[key]
+ base_url = urllib.parse.urlunparse(url_parts._replace(
+ query=urllib.parse.urlencode(qs, doseq=True)))
+ info['_captions_base_url'] = base_url
for caption_track in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()):
lang_code = caption_track.get('languageCode')
if not lang_code:
@@ -564,9 +729,17 @@ def extract_watch_info(polymer_json):
liberal_update(info, 'category', mf.get('category'))
liberal_update(info, 'time_published', mf.get('publishDate'))
liberal_update(info, 'time_uploaded', mf.get('uploadDate'))
+ family_safe = mf.get('isFamilySafe')
+ if family_safe is None:
+ conservative_update(info, 'age_restricted', None)
+ else:
+ conservative_update(info, 'age_restricted', not family_safe)
+ info['allowed_countries'] = mf.get('availableCountries', [])
# other stuff
info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
+ info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
+
return info
single_char_codes = {
@@ -646,10 +819,15 @@ def extract_watch_info_from_html(watch_html):
return extract_watch_info(fake_polymer_json)
+def captions_available(info):
+ return bool(info['_captions_base_url'])
+
def get_caption_url(info, language, format, automatic=False, translation_language=None):
'''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.'''
url = info['_captions_base_url']
+ if not url:
+ return None
url += '&lang=' + language
url += '&fmt=' + format
if automatic:
@@ -661,15 +839,10 @@ def get_caption_url(info, language, format, automatic=False, translation_languag
url += '&tlang=' + translation_language
return url
-def update_with_age_restricted_info(info, video_info_page):
- '''Inserts urls from 'player_response' in get_video_info page'''
+def update_with_new_urls(info, player_response):
+ '''Inserts urls from player_response json'''
ERROR_PREFIX = 'Error getting missing player or bypassing age-restriction: '
- video_info = urllib.parse.parse_qs(video_info_page)
- player_response = deep_get(video_info, 'player_response', 0)
- if player_response is None:
- info['playability_error'] = ERROR_PREFIX + 'Could not find player_response in video_info_page'
- return
try:
player_response = json.loads(player_response)
except json.decoder.JSONDecodeError: