From 76376b29a0adf6bd6d7a0202d904f923bdc8aa57 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Thu, 19 Dec 2019 19:28:58 -0800 Subject: Extraction: Split yt_data_extract.py into multiple files --- youtube/yt_data_extract.py | 1190 --------------------------- youtube/yt_data_extract/__init__.py | 11 + youtube/yt_data_extract/common.py | 455 ++++++++++ youtube/yt_data_extract/everything_else.py | 273 ++++++ youtube/yt_data_extract/watch_extraction.py | 449 ++++++++++ 5 files changed, 1188 insertions(+), 1190 deletions(-) delete mode 100644 youtube/yt_data_extract.py create mode 100644 youtube/yt_data_extract/__init__.py create mode 100644 youtube/yt_data_extract/common.py create mode 100644 youtube/yt_data_extract/everything_else.py create mode 100644 youtube/yt_data_extract/watch_extraction.py diff --git a/youtube/yt_data_extract.py b/youtube/yt_data_extract.py deleted file mode 100644 index 68550cf..0000000 --- a/youtube/yt_data_extract.py +++ /dev/null @@ -1,1190 +0,0 @@ -from youtube import util, proto - -import html -import json -import re -import urllib.parse -import collections -from math import ceil -import traceback - -# videos: - -# id -# title -# url -# author -# author_url -# thumbnail -# description -# time_published (str) -# duration (str) -# like_count (int) -# dislike_count (int) -# view_count (int) -# approx_view_count (str) -# playlist_index - -# playlists: - -# id -# title -# url -# author -# author_url -# thumbnail -# description -# time_published (str) -# video_count (int) -# first_video_id - -# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py -_formats = { - '5': {'ext': 'flv', 'width': 400, 'height': 240, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'}, - '6': {'ext': 'flv', 'width': 450, 'height': 270, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'}, - '13': {'ext': '3gp', 'acodec': 'aac', 'vcodec': 'mp4v'}, - '17': {'ext': '3gp', 'width': 176, 'height': 144, 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'mp4v'}, - '18': {'ext': 'mp4', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 96, 'vcodec': 'h264'}, - '22': {'ext': 'mp4', 'width': 1280, 'height': 720, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, - '34': {'ext': 'flv', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - '35': {'ext': 'flv', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - # itag 36 videos are either 320x180 (BaW_jenozKc) or 320x240 (__2ABJjxzNo), audio_bitrate varies as well - '36': {'ext': '3gp', 'width': 320, 'acodec': 'aac', 'vcodec': 'mp4v'}, - '37': {'ext': 'mp4', 'width': 1920, 'height': 1080, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, - '38': {'ext': 'mp4', 'width': 4096, 'height': 3072, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, - '43': {'ext': 'webm', 'width': 640, 'height': 360, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'}, - '44': {'ext': 'webm', 'width': 854, 'height': 480, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'}, - '45': {'ext': 'webm', 'width': 1280, 'height': 720, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, - '46': {'ext': 'webm', 'width': 1920, 'height': 1080, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, - '59': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - '78': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - - - # 3D videos - '82': {'ext': 'mp4', 'height': 360, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - '83': {'ext': 'mp4', 'height': 480, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - '84': {'ext': 'mp4', 'height': 720, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, - '85': {'ext': 'mp4', 'height': 1080, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, - '100': {'ext': 'webm', 'height': 360, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'}, - '101': {'ext': 'webm', 'height': 480, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, - '102': {'ext': 'webm', 'height': 720, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, - - # Apple HTTP Live Streaming - '91': {'ext': 'mp4', 'height': 144, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'}, - '92': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'}, - '93': {'ext': 'mp4', 'height': 360, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - '94': {'ext': 'mp4', 'height': 480, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, - '95': {'ext': 'mp4', 'height': 720, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'}, - '96': {'ext': 'mp4', 'height': 1080, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'}, - '132': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'}, - '151': {'ext': 'mp4', 'height': 72, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'h264'}, - - # DASH mp4 video - '133': {'ext': 'mp4', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '134': {'ext': 'mp4', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '135': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '136': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '137': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '138': {'ext': 'mp4', 'format_note': 'DASH video', 'vcodec': 'h264'}, # Height can vary (https://github.com/ytdl-org/youtube-dl/issues/4559) - '160': {'ext': 'mp4', 'height': 144, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '212': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '264': {'ext': 'mp4', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'h264'}, - '298': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60}, - '299': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60}, - '266': {'ext': 'mp4', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'h264'}, - - # Dash mp4 audio - '139': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 48, 'container': 'm4a_dash'}, - '140': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 128, 'container': 'm4a_dash'}, - '141': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 256, 'container': 'm4a_dash'}, - '256': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'}, - '258': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'}, - '325': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'dtse', 'container': 'm4a_dash'}, - '328': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'ec-3', 'container': 'm4a_dash'}, - - # Dash webm - '167': {'ext': 'webm', 'height': 360, 'width': 640, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, - '168': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, - '169': {'ext': 'webm', 'height': 720, 'width': 1280, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, - '170': {'ext': 'webm', 'height': 1080, 'width': 1920, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, - '218': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, - '219': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, - '278': {'ext': 'webm', 'height': 144, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp9'}, - '242': {'ext': 'webm', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '243': {'ext': 'webm', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '244': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '245': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '246': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '247': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '248': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '271': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - # itag 272 videos are either 3840x2160 (e.g. RtoitU2A-3E) or 7680x4320 (sLprVF6d7Ug) - '272': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '302': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, - '303': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, - '308': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, - '313': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'}, - '315': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, - - # Dash webm audio - '171': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 128}, - '172': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 256}, - - # Dash webm audio with opus inside - '249': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 50}, - '250': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 70}, - '251': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 160}, - - # RTMP (unnamed) - '_rtmp': {'protocol': 'rtmp'}, - - # av01 video only formats sometimes served with "unknown" codecs - '394': {'vcodec': 'av01.0.05M.08'}, - '395': {'vcodec': 'av01.0.05M.08'}, - '396': {'vcodec': 'av01.0.05M.08'}, - '397': {'vcodec': 'av01.0.05M.08'}, -} - -def get(object, key, default=None, types=()): - '''Like dict.get(), but returns default if the result doesn't match one of the types. - Also works for indexing lists.''' - try: - result = object[key] - except (TypeError, IndexError, KeyError): - return default - - if not types or isinstance(result, types): - return result - else: - return default - -def multi_get(object, *keys, default=None, types=()): - '''Like get, but try other keys if the first fails''' - for key in keys: - try: - result = object[key] - except (TypeError, IndexError, KeyError): - pass - else: - if not types or isinstance(result, types): - return result - else: - continue - return default - - -def deep_get(object, *keys, default=None, types=()): - '''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. - Last argument is the default value to use in case of any IndexErrors or KeyErrors. - If types is given and the result doesn't match one of those types, default is returned''' - try: - for key in keys: - object = object[key] - except (TypeError, IndexError, KeyError): - return default - else: - if not types or isinstance(object, types): - return object - else: - return default - -def multi_deep_get(object, *key_sequences, default=None, types=()): - '''Like deep_get, but can try different key sequences in case one fails. - Return default if all of them fail. key_sequences is a list of lists''' - for key_sequence in key_sequences: - _object = object - try: - for key in key_sequence: - _object = _object[key] - except (TypeError, IndexError, KeyError): - pass - else: - if not types or isinstance(_object, types): - return _object - else: - continue - return default - -def liberal_update(obj, key, value): - '''Updates obj[key] with value as long as value is not None. - Ensures obj[key] will at least get a value of None, however''' - if (value is not None) or (key not in obj): - obj[key] = value - -def conservative_update(obj, key, value): - '''Only updates obj if it doesn't have key or obj[key] is None''' - if obj.get(key) is None: - obj[key] = value - -def remove_redirect(url): - if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking - query_string = url[url.find('?')+1: ] - return urllib.parse.parse_qs(query_string)['q'][0] - return url - -def _recover_urls(runs): - for run in runs: - url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url') - text = run.get('text', '') - # second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text - if url is not None and (text.startswith('http://') or text.startswith('https://')): - url = remove_redirect(url) - run['url'] = url - run['text'] = url # youtube truncates the url text, use actual url instead - -def extract_str(node, default=None, recover_urls=False): - '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)''' - if isinstance(node, str): - return node - - try: - return node['simpleText'] - except (KeyError, TypeError): - pass - - if isinstance(node, dict) and 'runs' in node: - if recover_urls: - _recover_urls(node['runs']) - return ''.join(text_run.get('text', '') for text_run in node['runs']) - - return default - -def extract_formatted_text(node): - if not node: - return [] - if 'runs' in node: - _recover_urls(node['runs']) - return node['runs'] - elif 'simpleText' in node: - return [{'text': node['simpleText']}] - return [] - -def extract_int(string, default=None): - if isinstance(string, int): - return string - if not isinstance(string, str): - string = extract_str(string) - if not string: - return default - match = re.search(r'(\d+)', string.replace(',', '')) - if match is None: - return default - try: - return int(match.group(1)) - except ValueError: - return default - -def extract_approx_int(string): - '''e.g. "15M" from "15M subscribers"''' - if not isinstance(string, str): - string = extract_str(string) - if not string: - return None - match = re.search(r'(\d+[KMBTkmbt])', string.replace(',', '')) - if match is None: - return None - return match.group(1) - -youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$') -def normalize_url(url): - if url is None: - return None - match = youtube_url_re.fullmatch(url) - if match is None: - raise Exception() - - return 'https://www.youtube.com' + match.group(1) - -def prefix_urls(item): - try: - item['thumbnail'] = util.prefix_url(item['thumbnail']) - except KeyError: - pass - - try: - item['author_url'] = util.prefix_url(item['author_url']) - except KeyError: - pass - -def add_extra_html_info(item): - if item['type'] == 'video': - item['url'] = (util.URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None - - video_info = {} - for key in ('id', 'title', 'author', 'duration'): - try: - video_info[key] = item[key] - except KeyError: - video_info[key] = '' - - item['video_info'] = json.dumps(video_info) - - elif item['type'] == 'playlist': - item['url'] = (util.URL_ORIGIN + '/playlist?list=' + item['id']) if item.get('id') else None - elif item['type'] == 'channel': - item['url'] = (util.URL_ORIGIN + "/channel/" + item['id']) if item.get('id') else None - -def extract_item_info(item, additional_info={}): - if not item: - return {'error': 'No item given'} - - type = get(list(item.keys()), 0) - if not type: - return {'error': 'Could not find type'} - item = item[type] - - info = {'error': None} - if type in ('itemSectionRenderer', 'compactAutoplayRenderer'): - return extract_item_info(deep_get(item, 'contents', 0), additional_info) - - if type in ('movieRenderer', 'clarificationRenderer'): - info['type'] = 'unsupported' - return info - - info.update(additional_info) - - # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer' - # camelCase split, https://stackoverflow.com/a/37697078 - type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()] - if len(type_parts) < 2: - info['type'] = 'unsupported' - return - primary_type = type_parts[-2] - if primary_type == 'video': - info['type'] = 'video' - elif primary_type in ('playlist', 'radio', 'show'): - info['type'] = 'playlist' - elif primary_type == 'channel': - info['type'] = 'channel' - else: - info['type'] = 'unsupported' - - info['title'] = extract_str(item.get('title')) - info['author'] = extract_str(multi_get(item, 'longBylineText', 'shortBylineText', 'ownerText')) - info['author_id'] = extract_str(multi_deep_get(item, - ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], - ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], - ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] - )) - info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None - info['description'] = extract_formatted_text(multi_get(item, 'descriptionSnippet', 'descriptionText')) - info['thumbnail'] = multi_deep_get(item, - ['thumbnail', 'thumbnails', 0, 'url'], # videos - ['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists - ['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows - ) - - info['badges'] = [] - for badge_node in multi_get(item, 'badges', 'ownerBadges', default=()): - badge = deep_get(badge_node, 'metadataBadgeRenderer', 'label') - if badge: - info['badges'].append(badge) - - if primary_type in ('video', 'playlist'): - info['time_published'] = extract_str(item.get('publishedTimeText')) - - if primary_type == 'video': - info['id'] = item.get('videoId') - info['view_count'] = extract_int(item.get('viewCountText')) - - # dig into accessibility data to get view_count for videos marked as recommended, and to get time_published - accessibility_label = deep_get(item, 'title', 'accessibility', 'accessibilityData', 'label', default='') - timestamp = re.search(r'(\d+ \w+ ago)', accessibility_label) - if timestamp: - conservative_update(info, 'time_published', timestamp.group(1)) - view_count = re.search(r'(\d+) views', accessibility_label.replace(',', '')) - if view_count: - conservative_update(info, 'view_count', int(view_count.group(1))) - - if info['view_count']: - info['approx_view_count'] = '{:,}'.format(info['view_count']) - else: - info['approx_view_count'] = extract_approx_int(multi_get(item, 'shortViewCountText')) - info['duration'] = extract_str(item.get('lengthText')) - elif primary_type == 'playlist': - info['id'] = item.get('playlistId') - info['video_count'] = extract_int(item.get('videoCount')) - elif primary_type == 'channel': - info['id'] = item.get('channelId') - info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText')) - elif primary_type == 'show': - info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId') - - if primary_type in ('playlist', 'channel'): - conservative_update(info, 'video_count', extract_int(item.get('videoCountText'))) - - for overlay in item.get('thumbnailOverlays', []): - conservative_update(info, 'duration', extract_str(deep_get( - overlay, 'thumbnailOverlayTimeStatusRenderer', 'text' - ))) - # show renderers don't have videoCountText - conservative_update(info, 'video_count', extract_int(deep_get( - overlay, 'thumbnailOverlayBottomPanelRenderer', 'text' - ))) - return info - -def parse_info_prepare_for_html(renderer, additional_info={}): - item = extract_item_info(renderer, additional_info) - prefix_urls(item) - add_extra_html_info(item) - - return item - -def extract_response(polymer_json): - '''return response, error''' - response = multi_deep_get(polymer_json, [1, 'response'], ['response'], default=None, types=dict) - if response is None: - return None, 'Failed to extract response' - else: - return response, None - - -list_types = { - 'sectionListRenderer', - 'itemSectionRenderer', - 'gridRenderer', - 'playlistVideoListRenderer', -} - -item_types = { - 'movieRenderer', - 'didYouMeanRenderer', - 'showingResultsForRenderer', - - 'videoRenderer', - 'compactVideoRenderer', - 'compactAutoplayRenderer', - 'gridVideoRenderer', - 'playlistVideoRenderer', - - 'playlistRenderer', - 'compactPlaylistRenderer', - 'gridPlaylistRenderer', - - 'radioRenderer', - 'compactRadioRenderer', - 'gridRadioRenderer', - - 'showRenderer', - 'compactShowRenderer', - 'gridShowRenderer', - - - 'channelRenderer', - 'compactChannelRenderer', - 'gridChannelRenderer', - - 'channelAboutFullMetadataRenderer', -} - -def traverse_browse_renderer(renderer): - for tab in get(renderer, 'tabs', (), types=(list, tuple)): - tab_renderer = multi_deep_get(tab, ['tabRenderer'], ['expandableTabRenderer'], default=None, types=dict) - if tab_renderer is None: - continue - if tab_renderer.get('selected', False): - return get(tab_renderer, 'content', {}, types=(dict)) - print('Could not find tab with content') - return {} - -def traverse_standard_list(renderer): - renderer_list = multi_deep_get(renderer, ['contents'], ['items'], default=(), types=(list, tuple)) - continuation = deep_get(renderer, 'continuations', 0, 'nextContinuationData', 'continuation') - return renderer_list, continuation - -# these renderers contain one inside them -nested_renderer_dispatch = { - 'singleColumnBrowseResultsRenderer': traverse_browse_renderer, - 'twoColumnBrowseResultsRenderer': traverse_browse_renderer, - 'twoColumnSearchResultsRenderer': lambda renderer: get(renderer, 'primaryContents', {}, types=dict), -} - -# these renderers contain a list of renderers in side them -nested_renderer_list_dispatch = { - 'sectionListRenderer': traverse_standard_list, - 'itemSectionRenderer': traverse_standard_list, - 'gridRenderer': traverse_standard_list, - 'playlistVideoListRenderer': traverse_standard_list, - 'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[], types=(list, tuple)), None), -} - -def extract_items(response, item_types=item_types): - '''return items, ctoken''' - if 'continuationContents' in response: - # always has just the one [something]Continuation key, but do this just in case they add some tracking key or something - for key, renderer_continuation in get(response, 'continuationContents', {}, types=dict).items(): - if key.endswith('Continuation'): # e.g. commentSectionContinuation, playlistVideoListContinuation - items = multi_deep_get(renderer_continuation, ['contents'], ['items'], default=[], types=(list, tuple)) - ctoken = deep_get(renderer_continuation, 'continuations', 0, 'nextContinuationData', 'continuation', default=None, types=str) - return items, ctoken - return [], None - elif 'contents' in response: - ctoken = None - items = [] - - iter_stack = collections.deque() - current_iter = iter(()) - - renderer = get(response, 'contents', {}, types=dict) - - while True: - # mode 1: dig into the current renderer - # Will stay in mode 1 (via continue) if a new renderer is found inside this one - # Otherwise, after finding that it is an item renderer, - # contains a list, or contains nothing, - # falls through into mode 2 to get a new renderer - if len(renderer) != 0: - key, value = list(renderer.items())[0] - - # has a list in it, add it to the iter stack - if key in nested_renderer_list_dispatch: - renderer_list, continuation = nested_renderer_list_dispatch[key](value) - if renderer_list: - iter_stack.append(current_iter) - current_iter = iter(renderer_list) - if continuation: - ctoken = continuation - - # new renderer nested inside this one - elif key in nested_renderer_dispatch: - renderer = nested_renderer_dispatch[key](value) - continue # back to mode 1 - - # the renderer is an item - elif key in item_types: - items.append(renderer) - - - # mode 2: get a new renderer by iterating. - # goes up the stack for an iterator if one has been exhausted - while current_iter is not None: - try: - renderer = current_iter.__next__() - break - except StopIteration: - try: - current_iter = iter_stack.pop() # go back up the stack - except IndexError: - return items, ctoken - - else: - return [], None - -def extract_channel_info(polymer_json, tab): - response, err = extract_response(polymer_json) - if err: - return {'error': err} - - try: - microformat = response['microformat']['microformatDataRenderer'] - - # channel doesn't exist or was terminated - # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org - except KeyError: - if 'alerts' in response and len(response['alerts']) > 0: - return {'error': ' '.join(alert['alertRenderer']['text']['simpleText'] for alert in response['alerts']) } - elif 'errors' in response['responseContext']: - for error in response['responseContext']['errors']['error']: - if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id': - return {'error': 'This channel does not exist'} - return {'error': 'Failure getting microformat'} - - info = {'error': None} - info['current_tab'] = tab - - - # stuff from microformat (info given by youtube for every page on channel) - info['short_description'] = microformat['description'] - info['channel_name'] = microformat['title'] - info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url'] - channel_url = microformat['urlCanonical'].rstrip('/') - channel_id = channel_url[channel_url.rfind('/')+1:] - info['channel_id'] = channel_id - info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id - - info['items'] = [] - - # empty channel - if 'contents' not in response and 'continuationContents' not in response: - return info - - - items, _ = extract_items(response) - if tab in ('videos', 'playlists', 'search'): - additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} - info['items'] = [extract_item_info(renderer, additional_info) for renderer in items] - - elif tab == 'about': - for item in items: - try: - channel_metadata = item['channelAboutFullMetadataRenderer'] - break - except KeyError: - pass - else: - info['error'] = 'Could not find channelAboutFullMetadataRenderer' - return info - - info['links'] = [] - for link_json in channel_metadata.get('primaryLinks', ()): - url = remove_redirect(link_json['navigationEndpoint']['urlEndpoint']['url']) - - text = extract_str(link_json['title']) - - info['links'].append( (text, url) ) - - - info['stats'] = [] - for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): - try: - stat = channel_metadata[stat_name] - except KeyError: - continue - info['stats'].append(extract_str(stat)) - - if 'description' in channel_metadata: - info['description'] = extract_str(channel_metadata['description']) - else: - info['description'] = '' - - else: - raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) - - return info - -def extract_search_info(polymer_json): - response, err = extract_response(polymer_json) - if err: - return {'error': err} - info = {'error': None} - info['estimated_results'] = int(response['estimatedResults']) - info['estimated_pages'] = ceil(info['estimated_results']/20) - - - results, _ = extract_items(response) - - - info['items'] = [] - info['corrections'] = {'type': None} - for renderer in results: - type = list(renderer.keys())[0] - if type == 'shelfRenderer': - continue - if type == 'didYouMeanRenderer': - renderer = renderer[type] - - info['corrections'] = { - 'type': 'did_you_mean', - 'corrected_query': renderer['correctedQueryEndpoint']['searchEndpoint']['query'], - 'corrected_query_text': renderer['correctedQuery']['runs'], - } - continue - if type == 'showingResultsForRenderer': - renderer = renderer[type] - - info['corrections'] = { - 'type': 'showing_results_for', - 'corrected_query_text': renderer['correctedQuery']['runs'], - 'original_query_text': renderer['originalQuery']['simpleText'], - } - continue - - i_info = extract_item_info(renderer) - if i_info.get('type') != 'unsupported': - info['items'].append(i_info) - - - return info - -def extract_playlist_metadata(polymer_json): - response, err = extract_response(polymer_json) - if err: - return {'error': err} - - metadata = {'error': None} - header = deep_get(response, 'header', 'playlistHeaderRenderer', default={}) - metadata['title'] = extract_str(header.get('title')) - - metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId') - first_id = re.search(r'([a-z_\-]{11})', deep_get(header, - 'thumbnail', 'thumbnails', 0, 'url', default='')) - if first_id: - conservative_update(metadata, 'first_video_id', first_id.group(1)) - if metadata['first_video_id'] is None: - metadata['thumbnail'] = None - else: - metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg' - - metadata['video_count'] = extract_int(header.get('numVideosText')) - metadata['description'] = extract_str(header.get('descriptionText'), default='') - metadata['author'] = extract_str(header.get('ownerText')) - metadata['author_id'] = multi_deep_get(header, - ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], - ['ownerEndpoint', 'browseEndpoint', 'browseId']) - if metadata['author_id']: - metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id'] - else: - metadata['author_url'] = None - metadata['view_count'] = extract_int(header.get('viewCountText')) - metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText')) - for stat in header.get('stats', ()): - text = extract_str(stat) - if 'videos' in text: - conservative_update(metadata, 'video_count', extract_int(text)) - elif 'views' in text: - conservative_update(metadata, 'view_count', extract_int(text)) - elif 'updated' in text: - metadata['time_published'] = extract_date(text) - - return metadata - -def extract_playlist_info(polymer_json): - response, err = extract_response(polymer_json) - if err: - return {'error': err} - info = {'error': None} - first_page = 'continuationContents' not in response - video_list, _ = extract_items(response) - - info['items'] = [extract_item_info(renderer) for renderer in video_list] - - if first_page: - info['metadata'] = extract_playlist_metadata(polymer_json) - - return info - -def ctoken_metadata(ctoken): - result = dict() - params = proto.parse(proto.b64_to_bytes(ctoken)) - result['video_id'] = proto.parse(params[2])[2].decode('ascii') - - offset_information = proto.parse(params[6]) - result['offset'] = offset_information.get(5, 0) - - result['is_replies'] = False - if (3 in offset_information) and (2 in proto.parse(offset_information[3])): - result['is_replies'] = True - result['sort'] = None - else: - try: - result['sort'] = proto.parse(offset_information[4])[6] - except KeyError: - result['sort'] = 0 - return result - -def extract_comments_info(polymer_json): - response, err = extract_response(polymer_json) - if err: - return {'error': err} - info = {'error': None} - - url = multi_deep_get(polymer_json, [1, 'url'], ['url']) - if url: - ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0] - metadata = ctoken_metadata(ctoken) - else: - metadata = {} - info['video_id'] = metadata.get('video_id') - info['offset'] = metadata.get('offset') - info['is_replies'] = metadata.get('is_replies') - info['sort'] = metadata.get('sort') - info['video_title'] = None - - comments, ctoken = extract_items(response) - info['comments'] = [] - info['ctoken'] = ctoken - for comment in comments: - comment_info = {} - - if 'commentThreadRenderer' in comment: # top level comments - conservative_update(info, 'is_replies', False) - comment_thread = comment['commentThreadRenderer'] - info['video_title'] = extract_str(comment_thread.get('commentTargetTitle')) - if 'replies' not in comment_thread: - comment_info['reply_count'] = 0 - else: - comment_info['reply_count'] = extract_int(deep_get(comment_thread, - 'replies', 'commentRepliesRenderer', 'moreText' - ), default=1) # With 1 reply, the text reads "View reply" - comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={}) - elif 'commentRenderer' in comment: # replies - comment_info['reply_count'] = 0 # replyCount, below, not present for replies even if the reply has further replies to it - conservative_update(info, 'is_replies', True) - comment_renderer = comment['commentRenderer'] - else: - comment_renderer = {} - - # These 3 are sometimes absent, likely because the channel was deleted - comment_info['author'] = extract_str(comment_renderer.get('authorText')) - comment_info['author_url'] = deep_get(comment_renderer, - 'authorEndpoint', 'commandMetadata', 'webCommandMetadata', 'url') - comment_info['author_id'] = deep_get(comment_renderer, - 'authorEndpoint', 'browseEndpoint', 'browseId') - - comment_info['author_avatar'] = deep_get(comment_renderer, - 'authorThumbnail', 'thumbnails', 0, 'url') - comment_info['id'] = comment_renderer.get('commentId') - comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText')) - comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText')) - comment_info['like_count'] = comment_renderer.get('likeCount') - liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount')) - - info['comments'].append(comment_info) - - return info - -def check_missing_keys(object, *key_sequences): - for key_sequence in key_sequences: - _object = object - try: - for key in key_sequence: - _object = _object[key] - except (KeyError, IndexError, TypeError): - return 'Could not find ' + key - - return None - -def extract_metadata_row_info(video_renderer_info): - # extract category and music list - info = { - 'category': None, - 'music_list': [], - } - - current_song = {} - for row in deep_get(video_renderer_info, 'metadataRowContainer', 'metadataRowContainerRenderer', 'rows', default=[]): - row_title = extract_str(deep_get(row, 'metadataRowRenderer', 'title'), default='') - row_content = extract_str(deep_get(row, 'metadataRowRenderer', 'contents', 0)) - if row_title == 'Category': - info['category'] = row_content - elif row_title in ('Song', 'Music'): - if current_song: - info['music_list'].append(current_song) - current_song = {'title': row_content} - elif row_title == 'Artist': - current_song['artist'] = row_content - elif row_title == 'Album': - current_song['album'] = row_content - elif row_title == 'Writers': - current_song['writers'] = row_content - elif row_title.startswith('Licensed'): - current_song['licensor'] = row_content - if current_song: - info['music_list'].append(current_song) - - return info - -def extract_date(date_text): - if date_text is None: - return None - - date_text = date_text.replace(',', '').lower() - parts = date_text.split() - if len(parts) >= 3: - month, day, year = parts[-3:] - month = month_abbreviations.get(month[0:3]) # slicing in case they start writing out the full month name - if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None): - return year + '-' + month + '-' + day - -def extract_watch_info_mobile(top_level): - info = {} - microformat = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={}) - - family_safe = microformat.get('isFamilySafe') - if family_safe is None: - info['age_restricted'] = None - else: - info['age_restricted'] = not family_safe - info['allowed_countries'] = microformat.get('availableCountries', []) - info['time_published'] = microformat.get('publishDate') - - response = top_level.get('response', {}) - - # video info from metadata renderers - items, _ = extract_items(response, item_types={'slimVideoMetadataRenderer'}) - if items: - video_info = items[0]['slimVideoMetadataRenderer'] - else: - print('Failed to extract video metadata') - video_info = {} - - info.update(extract_metadata_row_info(video_info)) - info['description'] = extract_str(video_info.get('description'), recover_urls=True) - info['view_count'] = extract_int(extract_str(video_info.get('expandedSubtitle'))) - info['author'] = extract_str(deep_get(video_info, 'owner', 'slimOwnerRenderer', 'title')) - info['author_id'] = deep_get(video_info, 'owner', 'slimOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId') - info['title'] = extract_str(video_info.get('title')) - info['live'] = 'watching' in extract_str(video_info.get('expandedSubtitle'), default='') - info['unlisted'] = False - for badge in video_info.get('badges', []): - if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted': - info['unlisted'] = True - info['like_count'] = None - info['dislike_count'] = None - if not info['time_published']: - info['time_published'] = extract_date(extract_str(video_info.get('dateText', None))) - for button in video_info.get('buttons', ()): - button_renderer = button.get('slimMetadataToggleButtonRenderer', {}) - - # all the digits can be found in the accessibility data - count = extract_int(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText', 'accessibility', 'accessibilityData', 'label')) - - # this count doesn't have all the digits, it's like 53K for instance - dumb_count = extract_int(extract_str(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText'))) - - # the accessibility text will be "No likes" or "No dislikes" or something like that, but dumb count will be 0 - if dumb_count == 0: - count = 0 - - if 'isLike' in button_renderer: - info['like_count'] = count - elif 'isDislike' in button_renderer: - info['dislike_count'] = count - - # comment section info - items, _ = extract_items(response, item_types={'commentSectionRenderer'}) - if items: - comment_info = items[0]['commentSectionRenderer'] - comment_count_text = extract_str(deep_get(comment_info, 'header', 'commentSectionHeaderRenderer', 'countText')) - if comment_count_text == 'Comments': # just this with no number, means 0 comments - info['comment_count'] = 0 - else: - info['comment_count'] = extract_int(comment_count_text) - info['comments_disabled'] = False - else: # no comment section present means comments are disabled - info['comment_count'] = 0 - info['comments_disabled'] = True - - # check for limited state - items, _ = extract_items(response, item_types={'limitedStateMessageRenderer'}) - if items: - info['limited_state'] = True - else: - info['limited_state'] = False - - # related videos - related, _ = extract_items(response) - info['related_videos'] = [extract_item_info(renderer) for renderer in related] - - return info - -month_abbreviations = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'} -def extract_watch_info_desktop(top_level): - info = { - 'comment_count': None, - 'comments_disabled': None, - 'allowed_countries': None, - 'limited_state': None, - } - - video_info = {} - for renderer in deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'results', 'results', 'contents', default=()): - if renderer and list(renderer.keys())[0] in ('videoPrimaryInfoRenderer', 'videoSecondaryInfoRenderer'): - video_info.update(list(renderer.values())[0]) - - info.update(extract_metadata_row_info(video_info)) - info['description'] = extract_str(video_info.get('description', None), recover_urls=True) - info['time_published'] = extract_date(extract_str(video_info.get('dateText', None))) - - likes_dislikes = deep_get(video_info, 'sentimentBar', 'sentimentBarRenderer', 'tooltip', default='').split('/') - if len(likes_dislikes) == 2: - info['like_count'] = extract_int(likes_dislikes[0]) - info['dislike_count'] = extract_int(likes_dislikes[1]) - else: - info['like_count'] = None - info['dislike_count'] = None - - info['title'] = extract_str(video_info.get('title', None)) - info['author'] = extract_str(deep_get(video_info, 'owner', 'videoOwnerRenderer', 'title')) - info['author_id'] = deep_get(video_info, 'owner', 'videoOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId') - info['view_count'] = extract_int(extract_str(deep_get(video_info, 'viewCount', 'videoViewCountRenderer', 'viewCount'))) - - related = deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results', default=[]) - info['related_videos'] = [extract_item_info(renderer) for renderer in related] - - return info - -def get_caption_url(info, language, format, automatic=False, translation_language=None): - '''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.''' - url = info['_captions_base_url'] - url += '&lang=' + language - url += '&fmt=' + format - if automatic: - url += '&kind=asr' - elif language in info['_manual_caption_language_names']: - url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='') - - if translation_language: - url += '&tlang=' + translation_language - return url - -def extract_formats(info, player_response): - streaming_data = player_response.get('streamingData', {}) - yt_formats = streaming_data.get('formats', []) + streaming_data.get('adaptiveFormats', []) - - info['formats'] = [] - - for yt_fmt in yt_formats: - fmt = {} - fmt['ext'] = None - fmt['audio_bitrate'] = None - fmt['acodec'] = None - fmt['vcodec'] = None - fmt['width'] = yt_fmt.get('width') - fmt['height'] = yt_fmt.get('height') - fmt['file_size'] = yt_fmt.get('contentLength') - fmt['audio_sample_rate'] = yt_fmt.get('audioSampleRate') - fmt['fps'] = yt_fmt.get('fps') - cipher = dict(urllib.parse.parse_qsl(yt_fmt.get('cipher', ''))) - if cipher: - fmt['url'] = cipher.get('url') - else: - fmt['url'] = yt_fmt.get('url') - fmt['s'] = cipher.get('s') - fmt['sp'] = cipher.get('sp') - fmt.update(_formats.get(str(yt_fmt.get('itag')), {})) - - info['formats'].append(fmt) - -def extract_playability_error(info, player_response, error_prefix=''): - if info['formats']: - info['playability_status'] = None - info['playability_error'] = None - return - - playability_status = deep_get(player_response, 'playabilityStatus', 'status', default=None) - info['playability_status'] = playability_status - - playability_reason = extract_str(multi_deep_get(player_response, - ['playabilityStatus', 'reason'], - ['playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'reason'], - default='Could not find playability error') - ) - - if playability_status not in (None, 'OK'): - info['playability_error'] = error_prefix + playability_reason - else: - info['playability_error'] = error_prefix + 'Unknown playability error' - -SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') -def extract_watch_info(polymer_json): - info = {'playability_error': None, 'error': None} - - if isinstance(polymer_json, dict): - top_level = polymer_json - elif isinstance(polymer_json, (list, tuple)): - top_level = {} - for page_part in polymer_json: - if not isinstance(page_part, dict): - return {'error': 'Invalid page part'} - top_level.update(page_part) - else: - return {'error': 'Invalid top level polymer data'} - - error = check_missing_keys(top_level, - ['player', 'args'], - ['player', 'assets', 'js'], - ['playerResponse'], - ) - if error: - info['playability_error'] = error - - player_args = deep_get(top_level, 'player', 'args', default={}) - player_response = json.loads(player_args['player_response']) if 'player_response' in player_args else {} - - # captions - info['automatic_caption_languages'] = [] - info['manual_caption_languages'] = [] - info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url - info['translation_languages'] = [] - captions_info = player_response.get('captions', {}) - info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl')) - for caption_track in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()): - lang_code = caption_track.get('languageCode') - if not lang_code: - continue - if caption_track.get('kind') == 'asr': - info['automatic_caption_languages'].append(lang_code) - else: - info['manual_caption_languages'].append(lang_code) - base_url = caption_track.get('baseUrl', '') - lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0) - if lang_name: - info['_manual_caption_language_names'][lang_code] = lang_name - - for translation_lang_info in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'translationLanguages', default=()): - lang_code = translation_lang_info.get('languageCode') - if lang_code: - info['translation_languages'].append(lang_code) - if translation_lang_info.get('isTranslatable') == False: - print('WARNING: Found non-translatable caption language') - - # formats - extract_formats(info, player_response) - - # playability errors - extract_playability_error(info, player_response) - - # check age-restriction - info['age_restricted'] = (info['playability_status'] == 'LOGIN_REQUIRED' and info['playability_error'] and ' age' in info['playability_error']) - - # base_js (for decryption of signatures) - info['base_js'] = deep_get(top_level, 'player', 'assets', 'js') - if info['base_js']: - info['base_js'] = normalize_url(info['base_js']) - - mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={}) - if mobile: - info.update(extract_watch_info_mobile(top_level)) - else: - info.update(extract_watch_info_desktop(top_level)) - - # stuff from videoDetails. Use liberal_update to prioritize info from videoDetails over existing info - vd = deep_get(top_level, 'playerResponse', 'videoDetails', default={}) - liberal_update(info, 'title', extract_str(vd.get('title'))) - liberal_update(info, 'duration', extract_int(vd.get('lengthSeconds'))) - liberal_update(info, 'view_count', extract_int(vd.get('viewCount'))) - # videos with no description have a blank string - liberal_update(info, 'description', vd.get('shortDescription')) - liberal_update(info, 'id', vd.get('videoId')) - liberal_update(info, 'author', vd.get('author')) - liberal_update(info, 'author_id', vd.get('channelId')) - liberal_update(info, 'live', vd.get('isLiveContent')) - conservative_update(info, 'unlisted', not vd.get('isCrawlable', True)) #isCrawlable is false on limited state videos even if they aren't unlisted - liberal_update(info, 'tags', vd.get('keywords', [])) - - # fallback stuff from microformat - mf = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={}) - conservative_update(info, 'title', extract_str(mf.get('title'))) - conservative_update(info, 'duration', extract_int(mf.get('lengthSeconds'))) - # this gives the view count for limited state videos - conservative_update(info, 'view_count', extract_int(mf.get('viewCount'))) - conservative_update(info, 'description', extract_str(mf.get('description'), recover_urls=True)) - conservative_update(info, 'author', mf.get('ownerChannelName')) - conservative_update(info, 'author_id', mf.get('externalChannelId')) - liberal_update(info, 'unlisted', mf.get('isUnlisted')) - liberal_update(info, 'category', mf.get('category')) - liberal_update(info, 'time_published', mf.get('publishDate')) - liberal_update(info, 'time_uploaded', mf.get('uploadDate')) - - # other stuff - info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None - return info - -def update_with_age_restricted_info(info, video_info_page): - ERROR_PREFIX = 'Error bypassing age-restriction: ' - - video_info = urllib.parse.parse_qs(video_info_page) - player_response = deep_get(video_info, 'player_response', 0) - if player_response is None: - info['playability_error'] = ERROR_PREFIX + 'Could not find player_response in video_info_page' - return - try: - player_response = json.loads(player_response) - except json.decoder.JSONDecodeError: - traceback.print_exc() - info['playability_error'] = ERROR_PREFIX + 'Failed to parse json response' - return - - extract_formats(info, player_response) - extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX) diff --git a/youtube/yt_data_extract/__init__.py b/youtube/yt_data_extract/__init__.py new file mode 100644 index 0000000..f2a93a9 --- /dev/null +++ b/youtube/yt_data_extract/__init__.py @@ -0,0 +1,11 @@ +from .common import (get, multi_get, deep_get, multi_deep_get, + liberal_update, conservative_update, remove_redirect, normalize_url, + extract_str, extract_formatted_text, extract_int, extract_approx_int, + extract_date, extract_item_info, extract_items, extract_response, + prefix_urls, add_extra_html_info, parse_info_prepare_for_html) + +from .everything_else import (extract_channel_info, extract_search_info, + extract_playlist_metadata, extract_playlist_info, extract_comments_info) + +from .watch_extraction import (extract_watch_info, get_caption_url, + update_with_age_restricted_info) diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py new file mode 100644 index 0000000..5fa67bc --- /dev/null +++ b/youtube/yt_data_extract/common.py @@ -0,0 +1,455 @@ +from youtube import util + +import json +import re +import urllib.parse +import collections + +def get(object, key, default=None, types=()): + '''Like dict.get(), but returns default if the result doesn't match one of the types. + Also works for indexing lists.''' + try: + result = object[key] + except (TypeError, IndexError, KeyError): + return default + + if not types or isinstance(result, types): + return result + else: + return default + +def multi_get(object, *keys, default=None, types=()): + '''Like get, but try other keys if the first fails''' + for key in keys: + try: + result = object[key] + except (TypeError, IndexError, KeyError): + pass + else: + if not types or isinstance(result, types): + return result + else: + continue + return default + + +def deep_get(object, *keys, default=None, types=()): + '''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. + Last argument is the default value to use in case of any IndexErrors or KeyErrors. + If types is given and the result doesn't match one of those types, default is returned''' + try: + for key in keys: + object = object[key] + except (TypeError, IndexError, KeyError): + return default + else: + if not types or isinstance(object, types): + return object + else: + return default + +def multi_deep_get(object, *key_sequences, default=None, types=()): + '''Like deep_get, but can try different key sequences in case one fails. + Return default if all of them fail. key_sequences is a list of lists''' + for key_sequence in key_sequences: + _object = object + try: + for key in key_sequence: + _object = _object[key] + except (TypeError, IndexError, KeyError): + pass + else: + if not types or isinstance(_object, types): + return _object + else: + continue + return default + +def liberal_update(obj, key, value): + '''Updates obj[key] with value as long as value is not None. + Ensures obj[key] will at least get a value of None, however''' + if (value is not None) or (key not in obj): + obj[key] = value + +def conservative_update(obj, key, value): + '''Only updates obj if it doesn't have key or obj[key] is None''' + if obj.get(key) is None: + obj[key] = value + +def remove_redirect(url): + if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # youtube puts these on external links to do tracking + query_string = url[url.find('?')+1: ] + return urllib.parse.parse_qs(query_string)['q'][0] + return url + +youtube_url_re = re.compile(r'^(?:(?:(?:https?:)?//)?(?:www\.)?youtube\.com)?(/.*)$') +def normalize_url(url): + if url is None: + return None + match = youtube_url_re.fullmatch(url) + if match is None: + raise Exception() + + return 'https://www.youtube.com' + match.group(1) + +def _recover_urls(runs): + for run in runs: + url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url') + text = run.get('text', '') + # second condition is necessary because youtube makes other things into urls, such as hashtags, which we want to keep as text + if url is not None and (text.startswith('http://') or text.startswith('https://')): + url = remove_redirect(url) + run['url'] = url + run['text'] = url # youtube truncates the url text, use actual url instead + +def extract_str(node, default=None, recover_urls=False): + '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix Youtube's truncation of url text (most prominently seen in descriptions)''' + if isinstance(node, str): + return node + + try: + return node['simpleText'] + except (KeyError, TypeError): + pass + + if isinstance(node, dict) and 'runs' in node: + if recover_urls: + _recover_urls(node['runs']) + return ''.join(text_run.get('text', '') for text_run in node['runs']) + + return default + +def extract_formatted_text(node): + if not node: + return [] + if 'runs' in node: + _recover_urls(node['runs']) + return node['runs'] + elif 'simpleText' in node: + return [{'text': node['simpleText']}] + return [] + +def extract_int(string, default=None): + if isinstance(string, int): + return string + if not isinstance(string, str): + string = extract_str(string) + if not string: + return default + match = re.search(r'(\d+)', string.replace(',', '')) + if match is None: + return default + try: + return int(match.group(1)) + except ValueError: + return default + +def extract_approx_int(string): + '''e.g. "15M" from "15M subscribers"''' + if not isinstance(string, str): + string = extract_str(string) + if not string: + return None + match = re.search(r'(\d+[KMBTkmbt])', string.replace(',', '')) + if match is None: + return None + return match.group(1) + +def extract_date(date_text): + '''Input: "Mar 9, 2019". Output: "2019-3-9"''' + if date_text is None: + return None + + date_text = date_text.replace(',', '').lower() + parts = date_text.split() + if len(parts) >= 3: + month, day, year = parts[-3:] + month = month_abbreviations.get(month[0:3]) # slicing in case they start writing out the full month name + if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None): + return year + '-' + month + '-' + day + +def check_missing_keys(object, *key_sequences): + for key_sequence in key_sequences: + _object = object + try: + for key in key_sequence: + _object = _object[key] + except (KeyError, IndexError, TypeError): + return 'Could not find ' + key + + return None + +def prefix_urls(item): + try: + item['thumbnail'] = util.prefix_url(item['thumbnail']) + except KeyError: + pass + + try: + item['author_url'] = util.prefix_url(item['author_url']) + except KeyError: + pass + +def add_extra_html_info(item): + if item['type'] == 'video': + item['url'] = (util.URL_ORIGIN + '/watch?v=' + item['id']) if item.get('id') else None + + video_info = {} + for key in ('id', 'title', 'author', 'duration'): + try: + video_info[key] = item[key] + except KeyError: + video_info[key] = '' + + item['video_info'] = json.dumps(video_info) + + elif item['type'] == 'playlist': + item['url'] = (util.URL_ORIGIN + '/playlist?list=' + item['id']) if item.get('id') else None + elif item['type'] == 'channel': + item['url'] = (util.URL_ORIGIN + "/channel/" + item['id']) if item.get('id') else None + +def extract_item_info(item, additional_info={}): + if not item: + return {'error': 'No item given'} + + type = get(list(item.keys()), 0) + if not type: + return {'error': 'Could not find type'} + item = item[type] + + info = {'error': None} + if type in ('itemSectionRenderer', 'compactAutoplayRenderer'): + return extract_item_info(deep_get(item, 'contents', 0), additional_info) + + if type in ('movieRenderer', 'clarificationRenderer'): + info['type'] = 'unsupported' + return info + + info.update(additional_info) + + # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer' + # camelCase split, https://stackoverflow.com/a/37697078 + type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()] + if len(type_parts) < 2: + info['type'] = 'unsupported' + return + primary_type = type_parts[-2] + if primary_type == 'video': + info['type'] = 'video' + elif primary_type in ('playlist', 'radio', 'show'): + info['type'] = 'playlist' + elif primary_type == 'channel': + info['type'] = 'channel' + else: + info['type'] = 'unsupported' + + info['title'] = extract_str(item.get('title')) + info['author'] = extract_str(multi_get(item, 'longBylineText', 'shortBylineText', 'ownerText')) + info['author_id'] = extract_str(multi_deep_get(item, + ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], + ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], + ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] + )) + info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None + info['description'] = extract_formatted_text(multi_get(item, 'descriptionSnippet', 'descriptionText')) + info['thumbnail'] = multi_deep_get(item, + ['thumbnail', 'thumbnails', 0, 'url'], # videos + ['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists + ['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows + ) + + info['badges'] = [] + for badge_node in multi_get(item, 'badges', 'ownerBadges', default=()): + badge = deep_get(badge_node, 'metadataBadgeRenderer', 'label') + if badge: + info['badges'].append(badge) + + if primary_type in ('video', 'playlist'): + info['time_published'] = extract_str(item.get('publishedTimeText')) + + if primary_type == 'video': + info['id'] = item.get('videoId') + info['view_count'] = extract_int(item.get('viewCountText')) + + # dig into accessibility data to get view_count for videos marked as recommended, and to get time_published + accessibility_label = deep_get(item, 'title', 'accessibility', 'accessibilityData', 'label', default='') + timestamp = re.search(r'(\d+ \w+ ago)', accessibility_label) + if timestamp: + conservative_update(info, 'time_published', timestamp.group(1)) + view_count = re.search(r'(\d+) views', accessibility_label.replace(',', '')) + if view_count: + conservative_update(info, 'view_count', int(view_count.group(1))) + + if info['view_count']: + info['approx_view_count'] = '{:,}'.format(info['view_count']) + else: + info['approx_view_count'] = extract_approx_int(multi_get(item, 'shortViewCountText')) + info['duration'] = extract_str(item.get('lengthText')) + elif primary_type == 'playlist': + info['id'] = item.get('playlistId') + info['video_count'] = extract_int(item.get('videoCount')) + elif primary_type == 'channel': + info['id'] = item.get('channelId') + info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText')) + elif primary_type == 'show': + info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId') + + if primary_type in ('playlist', 'channel'): + conservative_update(info, 'video_count', extract_int(item.get('videoCountText'))) + + for overlay in item.get('thumbnailOverlays', []): + conservative_update(info, 'duration', extract_str(deep_get( + overlay, 'thumbnailOverlayTimeStatusRenderer', 'text' + ))) + # show renderers don't have videoCountText + conservative_update(info, 'video_count', extract_int(deep_get( + overlay, 'thumbnailOverlayBottomPanelRenderer', 'text' + ))) + return info + +def parse_info_prepare_for_html(renderer, additional_info={}): + item = extract_item_info(renderer, additional_info) + prefix_urls(item) + add_extra_html_info(item) + + return item + +def extract_response(polymer_json): + '''return response, error''' + response = multi_deep_get(polymer_json, [1, 'response'], ['response'], default=None, types=dict) + if response is None: + return None, 'Failed to extract response' + else: + return response, None + + +list_types = { + 'sectionListRenderer', + 'itemSectionRenderer', + 'gridRenderer', + 'playlistVideoListRenderer', +} + +item_types = { + 'movieRenderer', + 'didYouMeanRenderer', + 'showingResultsForRenderer', + + 'videoRenderer', + 'compactVideoRenderer', + 'compactAutoplayRenderer', + 'gridVideoRenderer', + 'playlistVideoRenderer', + + 'playlistRenderer', + 'compactPlaylistRenderer', + 'gridPlaylistRenderer', + + 'radioRenderer', + 'compactRadioRenderer', + 'gridRadioRenderer', + + 'showRenderer', + 'compactShowRenderer', + 'gridShowRenderer', + + + 'channelRenderer', + 'compactChannelRenderer', + 'gridChannelRenderer', + + 'channelAboutFullMetadataRenderer', +} + +def traverse_browse_renderer(renderer): + for tab in get(renderer, 'tabs', (), types=(list, tuple)): + tab_renderer = multi_deep_get(tab, ['tabRenderer'], ['expandableTabRenderer'], default=None, types=dict) + if tab_renderer is None: + continue + if tab_renderer.get('selected', False): + return get(tab_renderer, 'content', {}, types=(dict)) + print('Could not find tab with content') + return {} + +def traverse_standard_list(renderer): + renderer_list = multi_deep_get(renderer, ['contents'], ['items'], default=(), types=(list, tuple)) + continuation = deep_get(renderer, 'continuations', 0, 'nextContinuationData', 'continuation') + return renderer_list, continuation + +# these renderers contain one inside them +nested_renderer_dispatch = { + 'singleColumnBrowseResultsRenderer': traverse_browse_renderer, + 'twoColumnBrowseResultsRenderer': traverse_browse_renderer, + 'twoColumnSearchResultsRenderer': lambda renderer: get(renderer, 'primaryContents', {}, types=dict), +} + +# these renderers contain a list of renderers inside them +nested_renderer_list_dispatch = { + 'sectionListRenderer': traverse_standard_list, + 'itemSectionRenderer': traverse_standard_list, + 'gridRenderer': traverse_standard_list, + 'playlistVideoListRenderer': traverse_standard_list, + 'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[], types=(list, tuple)), None), +} + +def extract_items(response, item_types=item_types): + '''return items, ctoken''' + if 'continuationContents' in response: + # always has just the one [something]Continuation key, but do this just in case they add some tracking key or something + for key, renderer_continuation in get(response, 'continuationContents', {}, types=dict).items(): + if key.endswith('Continuation'): # e.g. commentSectionContinuation, playlistVideoListContinuation + items = multi_deep_get(renderer_continuation, ['contents'], ['items'], default=[], types=(list, tuple)) + ctoken = deep_get(renderer_continuation, 'continuations', 0, 'nextContinuationData', 'continuation', default=None, types=str) + return items, ctoken + return [], None + elif 'contents' in response: + ctoken = None + items = [] + + iter_stack = collections.deque() + current_iter = iter(()) + + renderer = get(response, 'contents', {}, types=dict) + + while True: + # mode 1: dig into the current renderer + # Will stay in mode 1 (via continue) if a new renderer is found inside this one + # Otherwise, after finding that it is an item renderer, + # contains a list, or contains nothing, + # falls through into mode 2 to get a new renderer + if len(renderer) != 0: + key, value = list(renderer.items())[0] + + # has a list in it, add it to the iter stack + if key in nested_renderer_list_dispatch: + renderer_list, continuation = nested_renderer_list_dispatch[key](value) + if renderer_list: + iter_stack.append(current_iter) + current_iter = iter(renderer_list) + if continuation: + ctoken = continuation + + # new renderer nested inside this one + elif key in nested_renderer_dispatch: + renderer = nested_renderer_dispatch[key](value) + continue # back to mode 1 + + # the renderer is an item + elif key in item_types: + items.append(renderer) + + + # mode 2: get a new renderer by iterating. + # goes up the stack for an iterator if one has been exhausted + while current_iter is not None: + try: + renderer = current_iter.__next__() + break + except StopIteration: + try: + current_iter = iter_stack.pop() # go back up the stack + except IndexError: + return items, ctoken + + else: + return [], None diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py new file mode 100644 index 0000000..6277c8d --- /dev/null +++ b/youtube/yt_data_extract/everything_else.py @@ -0,0 +1,273 @@ +from .common import (get, multi_get, deep_get, multi_deep_get, + liberal_update, conservative_update, remove_redirect, normalize_url, + extract_str, extract_formatted_text, extract_int, extract_approx_int, + extract_date, check_missing_keys, extract_item_info, extract_items, + extract_response) +from youtube import proto + +import re +import urllib +from math import ceil + +def extract_channel_info(polymer_json, tab): + response, err = extract_response(polymer_json) + if err: + return {'error': err} + + try: + microformat = response['microformat']['microformatDataRenderer'] + + # channel doesn't exist or was terminated + # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org + except KeyError: + if 'alerts' in response and len(response['alerts']) > 0: + return {'error': ' '.join(alert['alertRenderer']['text']['simpleText'] for alert in response['alerts']) } + elif 'errors' in response['responseContext']: + for error in response['responseContext']['errors']['error']: + if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id': + return {'error': 'This channel does not exist'} + return {'error': 'Failure getting microformat'} + + info = {'error': None} + info['current_tab'] = tab + + + # stuff from microformat (info given by youtube for every page on channel) + info['short_description'] = microformat['description'] + info['channel_name'] = microformat['title'] + info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url'] + channel_url = microformat['urlCanonical'].rstrip('/') + channel_id = channel_url[channel_url.rfind('/')+1:] + info['channel_id'] = channel_id + info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id + + info['items'] = [] + + # empty channel + if 'contents' not in response and 'continuationContents' not in response: + return info + + + items, _ = extract_items(response) + if tab in ('videos', 'playlists', 'search'): + additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id} + info['items'] = [extract_item_info(renderer, additional_info) for renderer in items] + + elif tab == 'about': + for item in items: + try: + channel_metadata = item['channelAboutFullMetadataRenderer'] + break + except KeyError: + pass + else: + info['error'] = 'Could not find channelAboutFullMetadataRenderer' + return info + + info['links'] = [] + for link_json in channel_metadata.get('primaryLinks', ()): + url = remove_redirect(link_json['navigationEndpoint']['urlEndpoint']['url']) + + text = extract_str(link_json['title']) + + info['links'].append( (text, url) ) + + + info['stats'] = [] + for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): + try: + stat = channel_metadata[stat_name] + except KeyError: + continue + info['stats'].append(extract_str(stat)) + + if 'description' in channel_metadata: + info['description'] = extract_str(channel_metadata['description']) + else: + info['description'] = '' + + else: + raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) + + return info + +def extract_search_info(polymer_json): + response, err = extract_response(polymer_json) + if err: + return {'error': err} + info = {'error': None} + info['estimated_results'] = int(response['estimatedResults']) + info['estimated_pages'] = ceil(info['estimated_results']/20) + + + results, _ = extract_items(response) + + + info['items'] = [] + info['corrections'] = {'type': None} + for renderer in results: + type = list(renderer.keys())[0] + if type == 'shelfRenderer': + continue + if type == 'didYouMeanRenderer': + renderer = renderer[type] + + info['corrections'] = { + 'type': 'did_you_mean', + 'corrected_query': renderer['correctedQueryEndpoint']['searchEndpoint']['query'], + 'corrected_query_text': renderer['correctedQuery']['runs'], + } + continue + if type == 'showingResultsForRenderer': + renderer = renderer[type] + + info['corrections'] = { + 'type': 'showing_results_for', + 'corrected_query_text': renderer['correctedQuery']['runs'], + 'original_query_text': renderer['originalQuery']['simpleText'], + } + continue + + i_info = extract_item_info(renderer) + if i_info.get('type') != 'unsupported': + info['items'].append(i_info) + + + return info + +def extract_playlist_metadata(polymer_json): + response, err = extract_response(polymer_json) + if err: + return {'error': err} + + metadata = {'error': None} + header = deep_get(response, 'header', 'playlistHeaderRenderer', default={}) + metadata['title'] = extract_str(header.get('title')) + + metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId') + first_id = re.search(r'([a-z_\-]{11})', deep_get(header, + 'thumbnail', 'thumbnails', 0, 'url', default='')) + if first_id: + conservative_update(metadata, 'first_video_id', first_id.group(1)) + if metadata['first_video_id'] is None: + metadata['thumbnail'] = None + else: + metadata['thumbnail'] = 'https://i.ytimg.com/vi/' + metadata['first_video_id'] + '/mqdefault.jpg' + + metadata['video_count'] = extract_int(header.get('numVideosText')) + metadata['description'] = extract_str(header.get('descriptionText'), default='') + metadata['author'] = extract_str(header.get('ownerText')) + metadata['author_id'] = multi_deep_get(header, + ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], + ['ownerEndpoint', 'browseEndpoint', 'browseId']) + if metadata['author_id']: + metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id'] + else: + metadata['author_url'] = None + metadata['view_count'] = extract_int(header.get('viewCountText')) + metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText')) + for stat in header.get('stats', ()): + text = extract_str(stat) + if 'videos' in text: + conservative_update(metadata, 'video_count', extract_int(text)) + elif 'views' in text: + conservative_update(metadata, 'view_count', extract_int(text)) + elif 'updated' in text: + metadata['time_published'] = extract_date(text) + + return metadata + +def extract_playlist_info(polymer_json): + response, err = extract_response(polymer_json) + if err: + return {'error': err} + info = {'error': None} + first_page = 'continuationContents' not in response + video_list, _ = extract_items(response) + + info['items'] = [extract_item_info(renderer) for renderer in video_list] + + if first_page: + info['metadata'] = extract_playlist_metadata(polymer_json) + + return info + +def ctoken_metadata(ctoken): + result = dict() + params = proto.parse(proto.b64_to_bytes(ctoken)) + result['video_id'] = proto.parse(params[2])[2].decode('ascii') + + offset_information = proto.parse(params[6]) + result['offset'] = offset_information.get(5, 0) + + result['is_replies'] = False + if (3 in offset_information) and (2 in proto.parse(offset_information[3])): + result['is_replies'] = True + result['sort'] = None + else: + try: + result['sort'] = proto.parse(offset_information[4])[6] + except KeyError: + result['sort'] = 0 + return result + +def extract_comments_info(polymer_json): + response, err = extract_response(polymer_json) + if err: + return {'error': err} + info = {'error': None} + + url = multi_deep_get(polymer_json, [1, 'url'], ['url']) + if url: + ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0] + metadata = ctoken_metadata(ctoken) + else: + metadata = {} + info['video_id'] = metadata.get('video_id') + info['offset'] = metadata.get('offset') + info['is_replies'] = metadata.get('is_replies') + info['sort'] = metadata.get('sort') + info['video_title'] = None + + comments, ctoken = extract_items(response) + info['comments'] = [] + info['ctoken'] = ctoken + for comment in comments: + comment_info = {} + + if 'commentThreadRenderer' in comment: # top level comments + conservative_update(info, 'is_replies', False) + comment_thread = comment['commentThreadRenderer'] + info['video_title'] = extract_str(comment_thread.get('commentTargetTitle')) + if 'replies' not in comment_thread: + comment_info['reply_count'] = 0 + else: + comment_info['reply_count'] = extract_int(deep_get(comment_thread, + 'replies', 'commentRepliesRenderer', 'moreText' + ), default=1) # With 1 reply, the text reads "View reply" + comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={}) + elif 'commentRenderer' in comment: # replies + comment_info['reply_count'] = 0 # replyCount, below, not present for replies even if the reply has further replies to it + conservative_update(info, 'is_replies', True) + comment_renderer = comment['commentRenderer'] + else: + comment_renderer = {} + + # These 3 are sometimes absent, likely because the channel was deleted + comment_info['author'] = extract_str(comment_renderer.get('authorText')) + comment_info['author_url'] = deep_get(comment_renderer, + 'authorEndpoint', 'commandMetadata', 'webCommandMetadata', 'url') + comment_info['author_id'] = deep_get(comment_renderer, + 'authorEndpoint', 'browseEndpoint', 'browseId') + + comment_info['author_avatar'] = deep_get(comment_renderer, + 'authorThumbnail', 'thumbnails', 0, 'url') + comment_info['id'] = comment_renderer.get('commentId') + comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText')) + comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText')) + comment_info['like_count'] = comment_renderer.get('likeCount') + liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount')) + + info['comments'].append(comment_info) + + return info diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py new file mode 100644 index 0000000..1166344 --- /dev/null +++ b/youtube/yt_data_extract/watch_extraction.py @@ -0,0 +1,449 @@ +from .common import (get, multi_get, deep_get, multi_deep_get, + liberal_update, conservative_update, remove_redirect, normalize_url, + extract_str, extract_formatted_text, extract_int, extract_approx_int, + extract_date, check_missing_keys, extract_item_info, extract_items, + extract_response) + +import json +import urllib.parse +import traceback + +# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py +_formats = { + '5': {'ext': 'flv', 'width': 400, 'height': 240, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'}, + '6': {'ext': 'flv', 'width': 450, 'height': 270, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'}, + '13': {'ext': '3gp', 'acodec': 'aac', 'vcodec': 'mp4v'}, + '17': {'ext': '3gp', 'width': 176, 'height': 144, 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'mp4v'}, + '18': {'ext': 'mp4', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 96, 'vcodec': 'h264'}, + '22': {'ext': 'mp4', 'width': 1280, 'height': 720, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, + '34': {'ext': 'flv', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + '35': {'ext': 'flv', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + # itag 36 videos are either 320x180 (BaW_jenozKc) or 320x240 (__2ABJjxzNo), audio_bitrate varies as well + '36': {'ext': '3gp', 'width': 320, 'acodec': 'aac', 'vcodec': 'mp4v'}, + '37': {'ext': 'mp4', 'width': 1920, 'height': 1080, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, + '38': {'ext': 'mp4', 'width': 4096, 'height': 3072, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, + '43': {'ext': 'webm', 'width': 640, 'height': 360, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'}, + '44': {'ext': 'webm', 'width': 854, 'height': 480, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'}, + '45': {'ext': 'webm', 'width': 1280, 'height': 720, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, + '46': {'ext': 'webm', 'width': 1920, 'height': 1080, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, + '59': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + '78': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + + + # 3D videos + '82': {'ext': 'mp4', 'height': 360, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + '83': {'ext': 'mp4', 'height': 480, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + '84': {'ext': 'mp4', 'height': 720, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, + '85': {'ext': 'mp4', 'height': 1080, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'}, + '100': {'ext': 'webm', 'height': 360, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'}, + '101': {'ext': 'webm', 'height': 480, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, + '102': {'ext': 'webm', 'height': 720, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'}, + + # Apple HTTP Live Streaming + '91': {'ext': 'mp4', 'height': 144, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'}, + '92': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'}, + '93': {'ext': 'mp4', 'height': 360, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + '94': {'ext': 'mp4', 'height': 480, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'}, + '95': {'ext': 'mp4', 'height': 720, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'}, + '96': {'ext': 'mp4', 'height': 1080, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'}, + '132': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'}, + '151': {'ext': 'mp4', 'height': 72, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'h264'}, + + # DASH mp4 video + '133': {'ext': 'mp4', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '134': {'ext': 'mp4', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '135': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '136': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '137': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '138': {'ext': 'mp4', 'format_note': 'DASH video', 'vcodec': 'h264'}, # Height can vary (https://github.com/ytdl-org/youtube-dl/issues/4559) + '160': {'ext': 'mp4', 'height': 144, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '212': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '264': {'ext': 'mp4', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'h264'}, + '298': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60}, + '299': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60}, + '266': {'ext': 'mp4', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'h264'}, + + # Dash mp4 audio + '139': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 48, 'container': 'm4a_dash'}, + '140': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 128, 'container': 'm4a_dash'}, + '141': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 256, 'container': 'm4a_dash'}, + '256': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'}, + '258': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'}, + '325': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'dtse', 'container': 'm4a_dash'}, + '328': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'ec-3', 'container': 'm4a_dash'}, + + # Dash webm + '167': {'ext': 'webm', 'height': 360, 'width': 640, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, + '168': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, + '169': {'ext': 'webm', 'height': 720, 'width': 1280, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, + '170': {'ext': 'webm', 'height': 1080, 'width': 1920, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, + '218': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, + '219': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'}, + '278': {'ext': 'webm', 'height': 144, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp9'}, + '242': {'ext': 'webm', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '243': {'ext': 'webm', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '244': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '245': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '246': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '247': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '248': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '271': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + # itag 272 videos are either 3840x2160 (e.g. RtoitU2A-3E) or 7680x4320 (sLprVF6d7Ug) + '272': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '302': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, + '303': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, + '308': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, + '313': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'}, + '315': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60}, + + # Dash webm audio + '171': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 128}, + '172': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 256}, + + # Dash webm audio with opus inside + '249': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 50}, + '250': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 70}, + '251': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 160}, + + # RTMP (unnamed) + '_rtmp': {'protocol': 'rtmp'}, + + # av01 video only formats sometimes served with "unknown" codecs + '394': {'vcodec': 'av01.0.05M.08'}, + '395': {'vcodec': 'av01.0.05M.08'}, + '396': {'vcodec': 'av01.0.05M.08'}, + '397': {'vcodec': 'av01.0.05M.08'}, +} + +def extract_metadata_row_info(video_renderer_info): + # extract category and music list + info = { + 'category': None, + 'music_list': [], + } + + current_song = {} + for row in deep_get(video_renderer_info, 'metadataRowContainer', 'metadataRowContainerRenderer', 'rows', default=[]): + row_title = extract_str(deep_get(row, 'metadataRowRenderer', 'title'), default='') + row_content = extract_str(deep_get(row, 'metadataRowRenderer', 'contents', 0)) + if row_title == 'Category': + info['category'] = row_content + elif row_title in ('Song', 'Music'): + if current_song: + info['music_list'].append(current_song) + current_song = {'title': row_content} + elif row_title == 'Artist': + current_song['artist'] = row_content + elif row_title == 'Album': + current_song['album'] = row_content + elif row_title == 'Writers': + current_song['writers'] = row_content + elif row_title.startswith('Licensed'): + current_song['licensor'] = row_content + if current_song: + info['music_list'].append(current_song) + + return info + +def extract_watch_info_mobile(top_level): + info = {} + microformat = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={}) + + family_safe = microformat.get('isFamilySafe') + if family_safe is None: + info['age_restricted'] = None + else: + info['age_restricted'] = not family_safe + info['allowed_countries'] = microformat.get('availableCountries', []) + info['time_published'] = microformat.get('publishDate') + + response = top_level.get('response', {}) + + # video info from metadata renderers + items, _ = extract_items(response, item_types={'slimVideoMetadataRenderer'}) + if items: + video_info = items[0]['slimVideoMetadataRenderer'] + else: + print('Failed to extract video metadata') + video_info = {} + + info.update(extract_metadata_row_info(video_info)) + info['description'] = extract_str(video_info.get('description'), recover_urls=True) + info['view_count'] = extract_int(extract_str(video_info.get('expandedSubtitle'))) + info['author'] = extract_str(deep_get(video_info, 'owner', 'slimOwnerRenderer', 'title')) + info['author_id'] = deep_get(video_info, 'owner', 'slimOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId') + info['title'] = extract_str(video_info.get('title')) + info['live'] = 'watching' in extract_str(video_info.get('expandedSubtitle'), default='') + info['unlisted'] = False + for badge in video_info.get('badges', []): + if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted': + info['unlisted'] = True + info['like_count'] = None + info['dislike_count'] = None + if not info['time_published']: + info['time_published'] = extract_date(extract_str(video_info.get('dateText', None))) + for button in video_info.get('buttons', ()): + button_renderer = button.get('slimMetadataToggleButtonRenderer', {}) + + # all the digits can be found in the accessibility data + count = extract_int(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText', 'accessibility', 'accessibilityData', 'label')) + + # this count doesn't have all the digits, it's like 53K for instance + dumb_count = extract_int(extract_str(deep_get(button_renderer, 'button', 'toggleButtonRenderer', 'defaultText'))) + + # the accessibility text will be "No likes" or "No dislikes" or something like that, but dumb count will be 0 + if dumb_count == 0: + count = 0 + + if 'isLike' in button_renderer: + info['like_count'] = count + elif 'isDislike' in button_renderer: + info['dislike_count'] = count + + # comment section info + items, _ = extract_items(response, item_types={'commentSectionRenderer'}) + if items: + comment_info = items[0]['commentSectionRenderer'] + comment_count_text = extract_str(deep_get(comment_info, 'header', 'commentSectionHeaderRenderer', 'countText')) + if comment_count_text == 'Comments': # just this with no number, means 0 comments + info['comment_count'] = 0 + else: + info['comment_count'] = extract_int(comment_count_text) + info['comments_disabled'] = False + else: # no comment section present means comments are disabled + info['comment_count'] = 0 + info['comments_disabled'] = True + + # check for limited state + items, _ = extract_items(response, item_types={'limitedStateMessageRenderer'}) + if items: + info['limited_state'] = True + else: + info['limited_state'] = False + + # related videos + related, _ = extract_items(response) + info['related_videos'] = [extract_item_info(renderer) for renderer in related] + + return info + +month_abbreviations = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'} +def extract_watch_info_desktop(top_level): + info = { + 'comment_count': None, + 'comments_disabled': None, + 'allowed_countries': None, + 'limited_state': None, + } + + video_info = {} + for renderer in deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'results', 'results', 'contents', default=()): + if renderer and list(renderer.keys())[0] in ('videoPrimaryInfoRenderer', 'videoSecondaryInfoRenderer'): + video_info.update(list(renderer.values())[0]) + + info.update(extract_metadata_row_info(video_info)) + info['description'] = extract_str(video_info.get('description', None), recover_urls=True) + info['time_published'] = extract_date(extract_str(video_info.get('dateText', None))) + + likes_dislikes = deep_get(video_info, 'sentimentBar', 'sentimentBarRenderer', 'tooltip', default='').split('/') + if len(likes_dislikes) == 2: + info['like_count'] = extract_int(likes_dislikes[0]) + info['dislike_count'] = extract_int(likes_dislikes[1]) + else: + info['like_count'] = None + info['dislike_count'] = None + + info['title'] = extract_str(video_info.get('title', None)) + info['author'] = extract_str(deep_get(video_info, 'owner', 'videoOwnerRenderer', 'title')) + info['author_id'] = deep_get(video_info, 'owner', 'videoOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId') + info['view_count'] = extract_int(extract_str(deep_get(video_info, 'viewCount', 'videoViewCountRenderer', 'viewCount'))) + + related = deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results', default=[]) + info['related_videos'] = [extract_item_info(renderer) for renderer in related] + + return info + +def get_caption_url(info, language, format, automatic=False, translation_language=None): + '''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.''' + url = info['_captions_base_url'] + url += '&lang=' + language + url += '&fmt=' + format + if automatic: + url += '&kind=asr' + elif language in info['_manual_caption_language_names']: + url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='') + + if translation_language: + url += '&tlang=' + translation_language + return url + +def extract_formats(info, player_response): + streaming_data = player_response.get('streamingData', {}) + yt_formats = streaming_data.get('formats', []) + streaming_data.get('adaptiveFormats', []) + + info['formats'] = [] + + for yt_fmt in yt_formats: + fmt = {} + fmt['ext'] = None + fmt['audio_bitrate'] = None + fmt['acodec'] = None + fmt['vcodec'] = None + fmt['width'] = yt_fmt.get('width') + fmt['height'] = yt_fmt.get('height') + fmt['file_size'] = yt_fmt.get('contentLength') + fmt['audio_sample_rate'] = yt_fmt.get('audioSampleRate') + fmt['fps'] = yt_fmt.get('fps') + cipher = dict(urllib.parse.parse_qsl(yt_fmt.get('cipher', ''))) + if cipher: + fmt['url'] = cipher.get('url') + else: + fmt['url'] = yt_fmt.get('url') + fmt['s'] = cipher.get('s') + fmt['sp'] = cipher.get('sp') + fmt.update(_formats.get(str(yt_fmt.get('itag')), {})) + + info['formats'].append(fmt) + +def extract_playability_error(info, player_response, error_prefix=''): + if info['formats']: + info['playability_status'] = None + info['playability_error'] = None + return + + playability_status = deep_get(player_response, 'playabilityStatus', 'status', default=None) + info['playability_status'] = playability_status + + playability_reason = extract_str(multi_deep_get(player_response, + ['playabilityStatus', 'reason'], + ['playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'reason'], + default='Could not find playability error') + ) + + if playability_status not in (None, 'OK'): + info['playability_error'] = error_prefix + playability_reason + else: + info['playability_error'] = error_prefix + 'Unknown playability error' + +SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt') +def extract_watch_info(polymer_json): + info = {'playability_error': None, 'error': None} + + if isinstance(polymer_json, dict): + top_level = polymer_json + elif isinstance(polymer_json, (list, tuple)): + top_level = {} + for page_part in polymer_json: + if not isinstance(page_part, dict): + return {'error': 'Invalid page part'} + top_level.update(page_part) + else: + return {'error': 'Invalid top level polymer data'} + + error = check_missing_keys(top_level, + ['player', 'args'], + ['player', 'assets', 'js'], + ['playerResponse'], + ) + if error: + info['playability_error'] = error + + player_args = deep_get(top_level, 'player', 'args', default={}) + player_response = json.loads(player_args['player_response']) if 'player_response' in player_args else {} + + # captions + info['automatic_caption_languages'] = [] + info['manual_caption_languages'] = [] + info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url + info['translation_languages'] = [] + captions_info = player_response.get('captions', {}) + info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl')) + for caption_track in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()): + lang_code = caption_track.get('languageCode') + if not lang_code: + continue + if caption_track.get('kind') == 'asr': + info['automatic_caption_languages'].append(lang_code) + else: + info['manual_caption_languages'].append(lang_code) + base_url = caption_track.get('baseUrl', '') + lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0) + if lang_name: + info['_manual_caption_language_names'][lang_code] = lang_name + + for translation_lang_info in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'translationLanguages', default=()): + lang_code = translation_lang_info.get('languageCode') + if lang_code: + info['translation_languages'].append(lang_code) + if translation_lang_info.get('isTranslatable') == False: + print('WARNING: Found non-translatable caption language') + + # formats + extract_formats(info, player_response) + + # playability errors + extract_playability_error(info, player_response) + + # check age-restriction + info['age_restricted'] = (info['playability_status'] == 'LOGIN_REQUIRED' and info['playability_error'] and ' age' in info['playability_error']) + + # base_js (for decryption of signatures) + info['base_js'] = deep_get(top_level, 'player', 'assets', 'js') + if info['base_js']: + info['base_js'] = normalize_url(info['base_js']) + + mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={}) + if mobile: + info.update(extract_watch_info_mobile(top_level)) + else: + info.update(extract_watch_info_desktop(top_level)) + + # stuff from videoDetails. Use liberal_update to prioritize info from videoDetails over existing info + vd = deep_get(top_level, 'playerResponse', 'videoDetails', default={}) + liberal_update(info, 'title', extract_str(vd.get('title'))) + liberal_update(info, 'duration', extract_int(vd.get('lengthSeconds'))) + liberal_update(info, 'view_count', extract_int(vd.get('viewCount'))) + # videos with no description have a blank string + liberal_update(info, 'description', vd.get('shortDescription')) + liberal_update(info, 'id', vd.get('videoId')) + liberal_update(info, 'author', vd.get('author')) + liberal_update(info, 'author_id', vd.get('channelId')) + liberal_update(info, 'live', vd.get('isLiveContent')) + conservative_update(info, 'unlisted', not vd.get('isCrawlable', True)) #isCrawlable is false on limited state videos even if they aren't unlisted + liberal_update(info, 'tags', vd.get('keywords', [])) + + # fallback stuff from microformat + mf = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={}) + conservative_update(info, 'title', extract_str(mf.get('title'))) + conservative_update(info, 'duration', extract_int(mf.get('lengthSeconds'))) + # this gives the view count for limited state videos + conservative_update(info, 'view_count', extract_int(mf.get('viewCount'))) + conservative_update(info, 'description', extract_str(mf.get('description'), recover_urls=True)) + conservative_update(info, 'author', mf.get('ownerChannelName')) + conservative_update(info, 'author_id', mf.get('externalChannelId')) + liberal_update(info, 'unlisted', mf.get('isUnlisted')) + liberal_update(info, 'category', mf.get('category')) + liberal_update(info, 'time_published', mf.get('publishDate')) + liberal_update(info, 'time_uploaded', mf.get('uploadDate')) + + # other stuff + info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None + return info + +def update_with_age_restricted_info(info, video_info_page): + ERROR_PREFIX = 'Error bypassing age-restriction: ' + + video_info = urllib.parse.parse_qs(video_info_page) + player_response = deep_get(video_info, 'player_response', 0) + if player_response is None: + info['playability_error'] = ERROR_PREFIX + 'Could not find player_response in video_info_page' + return + try: + player_response = json.loads(player_response) + except json.decoder.JSONDecodeError: + traceback.print_exc() + info['playability_error'] = ERROR_PREFIX + 'Failed to parse json response' + return + + extract_formats(info, player_response) + extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX) -- cgit v1.2.3