diff options
Diffstat (limited to 'youtube/watch.py')
| -rw-r--r-- | youtube/watch.py | 1427 |
1 files changed, 1271 insertions, 156 deletions
diff --git a/youtube/watch.py b/youtube/watch.py index 41c90e4..ec446f4 100644 --- a/youtube/watch.py +++ b/youtube/watch.py @@ -1,120 +1,325 @@ +import json +import logging +import math +import os +import re +import traceback +import urllib +from math import ceil +from types import SimpleNamespace +from urllib.parse import parse_qs, urlencode + +import flask +import gevent +import urllib3.exceptions +from flask import request + +import youtube from youtube import yt_app from youtube import util, comments, local_playlist, yt_data_extract +from youtube import watch_formats import settings -from flask import request -import flask +# Backward compatibility aliases +codec_name = watch_formats.codec_name +video_quality_string = watch_formats.video_quality_string +short_video_quality_string = watch_formats.short_video_quality_string +audio_quality_string = watch_formats.audio_quality_string +format_bytes = watch_formats.format_bytes -from youtube_dl.YoutubeDL import YoutubeDL -from youtube_dl.extractor.youtube import YoutubeError -import json -import html -import gevent -import os +logger = logging.getLogger(__name__) -def get_related_items(info): - results = [] - for item in info['related_vids']: - if 'list' in item: # playlist: - result = watch_page_related_playlist_info(item) - else: - result = watch_page_related_video_info(item) - yt_data_extract.prefix_urls(result) - yt_data_extract.add_extra_html_info(result) - results.append(result) - return results - - -# json of related items retrieved directly from the watch page has different names for everything -# converts these to standard names -def watch_page_related_video_info(item): - result = {key: item[key] for key in ('id', 'title', 'author')} - result['duration'] = util.seconds_to_timestamp(item['length_seconds']) - try: - result['views'] = item['short_view_count_text'] - except KeyError: - result['views'] = '' - result['thumbnail'] = util.get_thumbnail_url(item['id']) - result['type'] = 'video' - return result - -def watch_page_related_playlist_info(item): +try: + with open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'r') as f: + decrypt_cache = json.loads(f.read())['decrypt_cache'] +except FileNotFoundError: + decrypt_cache = {} + + +# codec_name imported from watch_formats + + +def get_video_sources(info, target_resolution): + '''return dict with organized sources''' + audio_by_track = {} + video_only_sources = {} + uni_sources = [] + pair_sources = [] + + for fmt in info['formats']: + if not all(fmt[attr] for attr in ('ext', 'url', 'itag')): + continue + if fmt['acodec'] and fmt['vcodec']: + if fmt.get('audio_track_is_default', True) is False: + continue + source = {'type': f"video/{fmt['ext']}", + 'quality_string': short_video_quality_string(fmt)} + source['quality_string'] += ' (integrated)' + source.update(fmt) + uni_sources.append(source) + continue + if not (fmt['init_range'] and fmt['index_range']): + # Allow HLS-backed audio tracks (served locally, no init/index needed) + url_value = fmt.get('url', '') + if (not url_value.startswith('http://127.') + and '/ytl-api/' not in url_value): + continue + # Mark as HLS for frontend + fmt['is_hls'] = True + if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']): + if fmt['bitrate']: + fmt['audio_bitrate'] = int(fmt['bitrate']/1000) + source = {'type': f"audio/{fmt['ext']}", + 'quality_string': audio_quality_string(fmt)} + source.update(fmt) + source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\"" + tid = fmt.get('audio_track_id') or 'default' + if tid not in audio_by_track: + audio_by_track[tid] = { + 'name': fmt.get('audio_track_name') or 'Default', + 'is_default': fmt.get('audio_track_is_default', True), + 'sources': [], + } + audio_by_track[tid]['sources'].append(source) + elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')): + if codec_name(fmt['vcodec']) == 'unknown': + continue + source = {'type': f"video/{fmt['ext']}", + 'quality_string': short_video_quality_string(fmt)} + source.update(fmt) + source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\"" + quality = f"{fmt['quality']}p{fmt['fps']}" + video_only_sources.setdefault(quality, []).append(source) + + audio_tracks = [] + default_track_id = 'default' + for tid, ti in audio_by_track.items(): + audio_tracks.append({'id': tid, 'name': ti['name'], 'is_default': ti['is_default']}) + if ti['is_default']: + default_track_id = tid + audio_tracks.sort(key=lambda t: (not t['is_default'], t['name'])) + + default_audio = audio_by_track.get(default_track_id, {}).get('sources', []) + default_audio.sort(key=lambda s: s['audio_bitrate']) + uni_sources.sort(key=lambda src: src['quality']) + webm_audios = [a for a in default_audio if a['ext'] == 'webm'] + mp4_audios = [a for a in default_audio if a['ext'] == 'mp4'] + + for quality_string, sources in video_only_sources.items(): + # choose an audio source to go with it + # 0.5 is semiarbitrary empirical constant to spread audio sources + # between 144p and 1080p. Use something better eventually. + quality, fps = map(int, quality_string.split('p')) + target_audio_bitrate = quality*fps/30*0.5 + pair_info = { + 'quality_string': quality_string, + 'quality': quality, + 'height': sources[0]['height'], + 'width': sources[0]['width'], + 'fps': fps, + 'videos': sources, + 'audios': [], + } + for audio_choices in (webm_audios, mp4_audios): + if not audio_choices: + continue + closest_audio_source = audio_choices[0] + best_err = target_audio_bitrate - audio_choices[0]['audio_bitrate'] + best_err = abs(best_err) + for audio_source in audio_choices[1:]: + err = abs(audio_source['audio_bitrate'] - target_audio_bitrate) + # once err gets worse we have passed the closest one + if err > best_err: + break + best_err = err + closest_audio_source = audio_source + pair_info['audios'].append(closest_audio_source) + + if not pair_info['audios']: + continue + + def video_rank(src): + ''' Sort by settings preference. Use file size as tiebreaker ''' + setting_name = f'codec_rank_{codec_name(src["vcodec"])}' + return (settings.current_settings_dict[setting_name], + src['file_size']) + pair_info['videos'].sort(key=video_rank) + + pair_sources.append(pair_info) + + pair_sources.sort(key=lambda src: src['quality']) + + uni_idx = 0 if uni_sources else None + for i, source in enumerate(uni_sources): + if source['quality'] > target_resolution: + break + uni_idx = i + + pair_idx = 0 if pair_sources else None + for i, pair_info in enumerate(pair_sources): + if pair_info['quality'] > target_resolution: + break + pair_idx = i + + audio_track_sources = {} + for tid, ti in audio_by_track.items(): + srcs = ti['sources'] + srcs.sort(key=lambda s: s.get('audio_bitrate', 0)) + audio_track_sources[tid] = srcs + return { - 'size': item['playlist_length'] if item['playlist_length'] != "0" else "50+", - 'title': item['playlist_title'], - 'id': item['list'], - 'first_video_id': item['video_id'], - 'thumbnail': util.get_thumbnail_url(item['video_id']), - 'type': 'playlist', + 'uni_sources': uni_sources, + 'uni_idx': uni_idx, + 'pair_sources': pair_sources, + 'pair_idx': pair_idx, + 'audio_tracks': audio_tracks, + 'audio_track_sources': audio_track_sources, } -def get_video_sources(info): - video_sources = [] - if not settings.theater_mode: - max_resolution = 360 + +def make_caption_src(info, lang, auto=False, trans_lang=None): + label = lang + if auto: + label += ' (Automatic)' + if trans_lang: + label += f' -> {trans_lang}' + + # Try to use Android caption URL directly (no PO Token needed) + caption_url = None + for track in info.get('_android_caption_tracks', []): + track_lang = track.get('languageCode', '') + track_kind = track.get('kind', '') + if track_lang == lang and ( + (auto and track_kind == 'asr') or + (not auto and track_kind != 'asr') + ): + caption_url = track.get('baseUrl') + break + + if caption_url: + # Add format + if '&fmt=' in caption_url: + caption_url = re.sub(r'&fmt=[^&]*', '&fmt=vtt', caption_url) + else: + caption_url += '&fmt=vtt' + if trans_lang: + caption_url += f'&tlang={trans_lang}' + url = util.prefix_url(caption_url) else: - max_resolution = settings.default_resolution - - for format in info['formats']: - if format['acodec'] != 'none' and format['vcodec'] != 'none' and format['height'] <= max_resolution: - video_sources.append({ - 'src': format['url'], - 'type': 'video/' + format['ext'], - 'height': format['height'], - 'width': format['width'], - }) + # Fallback to old method + url = util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang)) + + return { + 'url': url, + 'label': label, + 'srclang': trans_lang[0:2] if trans_lang else lang[0:2], + 'on': False, + } - #### order the videos sources so the preferred resolution is first ### - video_sources.sort(key=lambda source: source['height'], reverse=True) +def lang_in(lang, sequence): + '''Tests if the language is in sequence, with e.g. en and en-US considered the same''' + if lang is None: + return False + lang = lang[0:2] + return lang in (item[0:2] for item in sequence) + + +def lang_eq(lang1, lang2): + '''Tests if two iso 639-1 codes are equal, with en and en-US considered the same. + Just because the codes are equal does not mean the dialects are mutually intelligible, but this will have to do for now without a complex language model''' + if lang1 is None or lang2 is None: + return False + return lang1[0:2] == lang2[0:2] + + +def equiv_lang_in(lang, sequence): + '''Extracts a language in sequence which is equivalent to lang. + e.g. if lang is en, extracts en-GB from sequence. + Necessary because if only a specific variant like en-GB is available, can't ask YouTube for simply en. Need to get the available variant.''' + lang = lang[0:2] + for item in sequence: + if item[0:2] == lang: + return item + return None - return video_sources def get_subtitle_sources(info): + '''Returns these sources, ordered from least to most intelligible: + native_video_lang (Automatic) + foreign_langs (Manual) + native_video_lang (Automatic) -> pref_lang + foreign_langs (Manual) -> pref_lang + native_video_lang (Manual) -> pref_lang + pref_lang (Automatic) + pref_lang (Manual)''' sources = [] - default_found = False - default = None - for language, formats in info['subtitles'].items(): - for format in formats: - if format['ext'] == 'vtt': - source = { - 'url': '/' + format['url'], - 'label': language, - 'srclang': language, - - # set as on by default if this is the preferred language and a default-on subtitles mode is in settings - 'on': language == settings.subtitles_language and settings.subtitles_mode > 0, - } + if not yt_data_extract.captions_available(info): + return [] + pref_lang = settings.subtitles_language + native_video_lang = None + if info['automatic_caption_languages']: + native_video_lang = info['automatic_caption_languages'][0] - if language == settings.subtitles_language: - default_found = True - default = source - else: - sources.append(source) - break + highest_fidelity_is_manual = False - # Put it at the end to avoid browser bug when there are too many languages + # Sources are added in very specific order outlined above + # More intelligible sources are put further down to avoid browser bug when there are too many languages # (in firefox, it is impossible to select a language near the top of the list because it is cut off) - if default_found: - sources.append(default) - try: - formats = info['automatic_captions'][settings.subtitles_language] - except KeyError: - pass - else: - for format in formats: - if format['ext'] == 'vtt': - sources.append({ - 'url': '/' + format['url'], - 'label': settings.subtitles_language + ' - Automatic', - 'srclang': settings.subtitles_language, + # native_video_lang (Automatic) + if native_video_lang and not lang_eq(native_video_lang, pref_lang): + sources.append(make_caption_src(info, native_video_lang, auto=True)) - # set as on by default if this is the preferred language and a default-on subtitles mode is in settings - 'on': settings.subtitles_mode == 2 and not default_found, + # foreign_langs (Manual) + for lang in info['manual_caption_languages']: + if not lang_eq(lang, pref_lang): + sources.append(make_caption_src(info, lang)) - }) + if (lang_in(pref_lang, info['translation_languages']) + and not lang_in(pref_lang, info['automatic_caption_languages']) + and not lang_in(pref_lang, info['manual_caption_languages'])): + # native_video_lang (Automatic) -> pref_lang + if native_video_lang and not lang_eq(pref_lang, native_video_lang): + sources.append(make_caption_src(info, native_video_lang, auto=True, trans_lang=pref_lang)) + + # foreign_langs (Manual) -> pref_lang + for lang in info['manual_caption_languages']: + if not lang_eq(lang, native_video_lang) and not lang_eq(lang, pref_lang): + sources.append(make_caption_src(info, lang, trans_lang=pref_lang)) + + # native_video_lang (Manual) -> pref_lang + if lang_in(native_video_lang, info['manual_caption_languages']): + sources.append(make_caption_src(info, native_video_lang, trans_lang=pref_lang)) + + # pref_lang (Automatic) + if lang_in(pref_lang, info['automatic_caption_languages']): + sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['automatic_caption_languages']), auto=True)) + + # pref_lang (Manual) + if lang_in(pref_lang, info['manual_caption_languages']): + sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['manual_caption_languages']))) + highest_fidelity_is_manual = True + + if sources and sources[-1]['srclang'] == pref_lang: + # set as on by default since it's manual a default-on subtitles mode is in settings + if highest_fidelity_is_manual and settings.subtitles_mode > 0: + sources[-1]['on'] = True + # set as on by default since settings indicate to set it as such even if it's not manual + elif settings.subtitles_mode == 2: + sources[-1]['on'] = True + + if len(sources) == 0: + # Invariant: with no caption sources there should be no languages + # either. Don't rely on `assert` which is stripped under `python -O`. + if (len(info['automatic_caption_languages']) != 0 + or len(info['manual_caption_languages']) != 0): + logger.warning( + 'Unexpected state: no subtitle sources but %d auto / %d manual languages', + len(info['automatic_caption_languages']), + len(info['manual_caption_languages']), + ) return sources @@ -135,115 +340,1025 @@ def get_ordered_music_list_attributes(music_list): return ordered_attributes -def extract_info(downloader, *args, **kwargs): +def save_decrypt_cache(): + os.makedirs(settings.data_dir, exist_ok=True) + f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w') + + f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True)) + f.close() + + +def decrypt_signatures(info, video_id): + '''return error string, or False if no errors''' + if not yt_data_extract.requires_decryption(info): + return False + if not info['player_name']: + return 'Could not find player name' + + player_name = info['player_name'] + if player_name in decrypt_cache: + print(f'Using cached decryption function for: {player_name}') + info['decryption_function'] = decrypt_cache[player_name] + else: + base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}') + base_js = base_js.decode('utf-8') + err = yt_data_extract.extract_decryption_function(info, base_js) + if err: + return err + decrypt_cache[player_name] = info['decryption_function'] + save_decrypt_cache() + err = yt_data_extract.decrypt_signatures(info) + return err + + +def _add_to_error(info, key, additional_message): + if key in info and info[key]: + info[key] += additional_message + else: + info[key] = additional_message + + +def fetch_player_response(client, video_id): + return util.call_youtube_api(client, 'player', { + 'videoId': video_id, + }) + + +def fetch_watch_page_info(video_id, playlist_id, index): + # bpctr=9999999999 will bypass are-you-sure dialogs for controversial + # videos + url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999' + if playlist_id: + url += f'&list={playlist_id}' + if index: + url += f'&index={index}' + + headers = ( + ('Accept', '*/*'), + ('Accept-Language', 'en-US,en;q=0.5'), + ('X-YouTube-Client-Name', '2'), + ('X-YouTube-Client-Version', '2.20180830'), + ) + util.mobile_ua + + watch_page = util.fetch_url(url, headers=headers, + debug_name='watch') + watch_page = watch_page.decode('utf-8') + return yt_data_extract.extract_watch_info_from_html(watch_page) + + +def extract_info(video_id, use_invidious, playlist_id=None, index=None): + primary_client = 'android_vr' + fallback_client = 'ios' + last_resort_client = 'tv_embedded' + + tasks = ( + # Get video metadata from here + gevent.spawn(fetch_watch_page_info, video_id, playlist_id, index), + gevent.spawn(fetch_player_response, primary_client, video_id) + ) + gevent.joinall(tasks) + util.check_gevent_exceptions(*tasks) + + info = tasks[0].value or {} + player_response = tasks[1].value or {} + + # Save android_vr caption tracks (no PO Token needed for these URLs) + if isinstance(player_response, str): + try: + pr_data = json.loads(player_response) + except Exception: + pr_data = {} + else: + pr_data = player_response or {} + android_caption_tracks = yt_data_extract.deep_get( + pr_data, 'captions', 'playerCaptionsTracklistRenderer', + 'captionTracks', default=[]) + info['_android_caption_tracks'] = android_caption_tracks + + # Save streamingData for multi-audio extraction + pr_streaming_data = pr_data.get('streamingData', {}) + info['_streamingData'] = pr_streaming_data + + yt_data_extract.update_with_new_urls(info, player_response) + + # HLS manifest - try multiple clients in case one is blocked + info['hls_manifest_url'] = None + info['hls_audio_tracks'] = {} + hls_data = None + hls_client_used = None + for hls_client in ('ios', 'android'): + try: + resp = fetch_player_response(hls_client, video_id) or {} + hls_data = json.loads(resp) if isinstance(resp, str) else resp + hls_manifest_url = (hls_data.get('streamingData') or {}).get('hlsManifestUrl', '') + if hls_manifest_url: + hls_client_used = hls_client + break + except Exception as e: + print(f'HLS fetch with {hls_client} failed: {e}') + + if hls_manifest_url: + info['hls_manifest_url'] = hls_manifest_url + import re as _re + from urllib.parse import urljoin + hls_manifest = util.fetch_url(hls_manifest_url, + headers=(('User-Agent', 'Mozilla/5.0'),), + debug_name='hls_manifest').decode('utf-8') + + # Parse EXT-X-MEDIA audio tracks from HLS manifest + for line in hls_manifest.split('\n'): + if '#EXT-X-MEDIA' not in line or 'TYPE=AUDIO' not in line: + continue + name_m = _re.search(r'NAME="([^"]+)"', line) + lang_m = _re.search(r'LANGUAGE="([^"]+)"', line) + default_m = _re.search(r'DEFAULT=(YES|NO)', line) + group_m = _re.search(r'GROUP-ID="([^"]+)"', line) + uri_m = _re.search(r'URI="([^"]+)"', line) + if not uri_m or not lang_m: + continue + lang = lang_m.group(1) + is_default = default_m and default_m.group(1) == 'YES' + group = group_m.group(1) if group_m else '0' + key = lang + absolute_hls_url = urljoin(hls_manifest_url, uri_m.group(1)) + if key not in info['hls_audio_tracks'] or group > info['hls_audio_tracks'][key].get('group', '0'): + info['hls_audio_tracks'][key] = { + 'name': name_m.group(1) if name_m else lang, + 'lang': lang, + 'hls_url': absolute_hls_url, + 'group': group, + 'is_default': is_default, + } + + # Register HLS audio tracks for proxy access + added = 0 + for lang, track in info['hls_audio_tracks'].items(): + ck = f"{video_id}_{lang}" + from youtube.hls_cache import register_track + register_track(ck, track['hls_url'], + video_id=video_id, track_id=lang) + + fmt = { + 'audio_track_id': lang, + 'audio_track_name': track['name'], + 'audio_track_is_default': track['is_default'], + 'itag': f'hls_{lang}', + 'ext': 'mp4', + 'audio_bitrate': 128, + 'bitrate': 128000, + 'acodec': 'mp4a.40.2', + 'vcodec': None, + 'width': None, + 'height': None, + 'file_size': None, + 'audio_sample_rate': 44100, + 'duration_ms': None, + 'fps': None, + 'init_range': {'start': 0, 'end': 0}, + 'index_range': {'start': 0, 'end': 0}, + 'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}', + 's': None, + 'sp': None, + 'quality': None, + 'type': 'audio/mp4', + 'quality_string': track['name'], + 'mime_codec': 'audio/mp4; codecs="mp4a.40.2"', + 'is_hls': True, + } + info['formats'].append(fmt) + added += 1 + + if added: + print(f"Added {added} HLS audio tracks (via {hls_client_used})") + else: + print("No HLS manifest available from any client") + info['hls_manifest_url'] = None + info['hls_audio_tracks'] = {} + info['hls_unavailable'] = True + + # Register HLS manifest for proxying + if info['hls_manifest_url']: + ck = f"{video_id}_video" + from youtube.hls_cache import register_track + register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video') + # Use proxy URL instead of direct Google Video URL + info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}' + + # Fallback to 'ios' if no valid URLs are found + if not info.get('formats') or info.get('player_urls_missing'): + print(f"No URLs found in '{primary_client}', attempting with '{fallback_client}'.") + try: + player_response = fetch_player_response(fallback_client, video_id) or {} + yt_data_extract.update_with_new_urls(info, player_response) + except util.FetchError as e: + print(f"Fallback '{fallback_client}' failed: {e}") + + # Final attempt with 'tv_embedded' if there are still no URLs + if not info.get('formats') or info.get('player_urls_missing'): + print(f"No URLs found in '{fallback_client}', attempting with '{last_resort_client}'") + try: + player_response = fetch_player_response(last_resort_client, video_id) or {} + yt_data_extract.update_with_new_urls(info, player_response) + except util.FetchError as e: + print(f"Fallback '{last_resort_client}' failed: {e}") + + # signature decryption + if info.get('formats'): + decryption_error = decrypt_signatures(info, video_id) + if decryption_error: + info['playability_error'] = f'Error decrypting url signatures: {decryption_error}' + + # check if urls ready (non-live format) in former livestream + # urls not ready if all of them have no filesize + if info['was_live']: + info['urls_ready'] = False + for fmt in info['formats']: + if fmt['file_size'] is not None: + info['urls_ready'] = True + else: + info['urls_ready'] = True + + # livestream urls + # sometimes only the livestream urls work soon after the livestream is over + info['hls_formats'] = [] + if info.get('hls_manifest_url') and (info.get('live') or not info.get('formats') or not info['urls_ready']): + try: + manifest = util.fetch_url(info['hls_manifest_url'], + debug_name='hls_manifest.m3u8', + report_text='Fetched hls manifest' + ).decode('utf-8') + info['hls_formats'], err = yt_data_extract.extract_hls_formats(manifest) + if not err: + info['playability_error'] = None + for fmt in info['hls_formats']: + fmt['video_quality'] = video_quality_string(fmt) + except Exception as e: + print(f"Error obteniendo HLS manifest: {e}") + info['hls_formats'] = [] + + # check for 403. Unnecessary for tor video routing b/c ip address is same + info['invidious_used'] = False + info['invidious_reload_button'] = False + info['tor_bypass_used'] = False + if (settings.route_tor == 1 + and info['formats'] and info['formats'][0]['url']): + try: + response = util.head(info['formats'][0]['url'], + report_text='Checked for URL access') + except urllib3.exceptions.HTTPError: + print('Error while checking for URL access:\n') + traceback.print_exc() + return info + + if response.status == 403: + print('Access denied (403) for video urls.') + print('Routing video through Tor') + info['tor_bypass_used'] = True + for fmt in info['formats']: + fmt['url'] += '&use_tor=1' + elif 300 <= response.status < 400: + print('Error: exceeded max redirects while checking video URL') + return info + + +# video_quality_string imported from watch_formats +# short_video_quality_string imported from watch_formats +# audio_quality_string imported from watch_formats +# format_bytes imported from watch_formats + suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent] + converted = float(bytes) / float(1024 ** exponent) + return '%.2f%s' % (converted, suffix) + + +@yt_app.route('/ytl-api/audio-track-proxy') +def audio_track_proxy(): + """Proxy for DASH audio tracks to avoid throttling.""" + audio_url = request.args.get('url', '') + + if not audio_url: + flask.abort(400, 'Missing URL') + try: - return downloader.extract_info(*args, **kwargs) - except YoutubeError as e: - return str(e) + headers = ( + ('User-Agent', 'Mozilla/5.0'), + ('Accept', '*/*'), + ) + content = util.fetch_url(audio_url, headers=headers, + debug_name='audio_dash', report_text=None) + return flask.Response(content, mimetype='audio/mp4', + headers={'Access-Control-Allow-Origin': '*', + 'Cache-Control': 'max-age=3600'}) + except Exception as e: + flask.abort(502, f'Audio fetch failed: {e}') + + +@yt_app.route('/ytl-api/audio-track') +def get_audio_track(): + """Proxy HLS audio/video: playlist or individual segment.""" + from youtube.hls_cache import get_hls_url + + cache_key = request.args.get('id', '') + seg_url = request.args.get('seg', '') + playlist_url = request.args.get('url', '') + + # Handle playlist/manifest URL (used for audio track playlists) + if playlist_url: + # Unwrap if double-proxied + if '/ytl-api/audio-track' in playlist_url: + import urllib.parse as _up + parsed = _up.parse_qs(_up.urlparse(playlist_url).query) + if 'url' in parsed: + playlist_url = parsed['url'][0] + + try: + playlist = util.fetch_url(playlist_url, + headers=(('User-Agent', 'Mozilla/5.0'),), + debug_name='audio_playlist').decode('utf-8') + + # Rewrite segment URLs + import re as _re + from urllib.parse import urljoin + base_url = request.url_root.rstrip('/') + playlist_base = playlist_url.rsplit('/', 1)[0] + '/' + + playlist_lines = [] + for line in playlist.split('\n'): + line = line.strip() + if not line or line.startswith('#'): + playlist_lines.append(line) + continue + + # Resolve and proxy segment URL + seg = line if line.startswith('http') else urljoin(playlist_base, line) + # Always use &seg= parameter, never &url= for segments + playlist_lines.append( + f'{base_url}/ytl-api/audio-track?id=' + f'{urllib.parse.quote(cache_key)}' + f'&seg={urllib.parse.quote(seg, safe="")}' + ) + + playlist = '\n'.join(playlist_lines) + + return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl', + headers={'Access-Control-Allow-Origin': '*'}) + except Exception as e: + import traceback + traceback.print_exc() + flask.abort(502, f'Playlist fetch failed: {e}') + + # Handle individual segment or nested playlist + if seg_url: + # Check if seg_url is already a proxied URL + if '/ytl-api/audio-track' in seg_url: + import urllib.parse as _up + parsed = _up.parse_qs(_up.urlparse(seg_url).query) + if 'seg' in parsed: + seg_url = parsed['seg'][0] + elif 'url' in parsed: + seg_url = parsed['url'][0] + + # Check if this is a nested playlist (m3u8) that needs rewriting + # Playlists END with .m3u8 (optionally followed by query params) + # Segments may contain /index.m3u8/ in their path but end with .ts or similar + url_path = urllib.parse.urlparse(seg_url).path + + # Only treat as playlist if path ends with .m3u8 + # Don't use 'in' check because segments can have /index.m3u8/ in their path + is_playlist = url_path.endswith('.m3u8') + + if is_playlist: + # This is a variant playlist - fetch and rewrite it + try: + raw_content = util.fetch_url(seg_url, + headers=(('User-Agent', 'Mozilla/5.0'),), + debug_name='nested_playlist') + + # Check if this is actually binary data (segment) misidentified as playlist + try: + playlist = raw_content.decode('utf-8') + except UnicodeDecodeError: + is_playlist = False # Fall through to segment handler + + if is_playlist: + # Rewrite segment URLs in this playlist + from urllib.parse import urljoin + import re as _re + base_url = request.url_root.rstrip('/') + playlist_base = seg_url.rsplit('/', 1)[0] + '/' + + def proxy_url(url): + """Rewrite a single URL to go through the proxy""" + if not url or url.startswith('/ytl-api/'): + return url + if not url.startswith('http://') and not url.startswith('https://'): + url = urljoin(playlist_base, url) + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}' + + playlist_lines = [] + for line in playlist.split('\n'): + line = line.strip() + if not line: + playlist_lines.append(line) + continue + + # Handle tags with URI attributes (EXT-X-MAP, EXT-X-KEY, etc.) + if line.startswith('#') and 'URI=' in line: + def rewrite_uri_attr(match): + uri = match.group(1) + return f'URI="{proxy_url(uri)}"' + line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line) + playlist_lines.append(line) + elif line.startswith('#'): + # Other tags pass through unchanged + playlist_lines.append(line) + else: + # This is a segment URL line + seg = line if line.startswith('http') else urljoin(playlist_base, line) + playlist_lines.append(proxy_url(seg)) + + playlist = '\n'.join(playlist_lines) + + return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl', + headers={'Access-Control-Allow-Origin': '*'}) + except Exception as e: + import traceback + traceback.print_exc() + flask.abort(502, f'Nested playlist fetch failed: {e}') + + # This is an actual segment - fetch and serve it + try: + headers_dict = { + 'User-Agent': 'Mozilla/5.0', + 'Accept': '*/*', + } + + # Determine content type based on URL + # HLS segments are usually MPEG-TS (.ts) but can be MP4 (.mp4, .m4s) + if '.mp4' in seg_url or '.m4s' in seg_url or seg_url.lower().endswith('.mp4'): + content_type = 'video/mp4' + elif '.webm' in seg_url or seg_url.lower().endswith('.webm'): + content_type = 'video/webm' + else: + # Default to MPEG-TS for HLS + content_type = 'video/mp2t' + response, cleanup_func = util.fetch_url_response( + seg_url, headers=tuple(headers_dict.items()), + timeout=30, use_tor=settings.route_tor) + def generate(): + try: + while True: + chunk = response.read(64 * 1024) # 64 KB chunks + if not chunk: + break + yield chunk + finally: + cleanup_func(response) + return flask.Response( + flask.stream_with_context(generate()), + mimetype=content_type, + headers={ + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Range, Content-Type', + 'Cache-Control': 'max-age=3600', + 'Content-Type': content_type, + }) + except Exception as e: + import traceback + traceback.print_exc() + flask.abort(502, f'Segment fetch failed: {e}') + + # Legacy: Proxy the HLS playlist for audio tracks (using get_hls_url) + hls_url = get_hls_url(cache_key) + if not hls_url: + flask.abort(404, 'Audio track not found') + + try: + playlist = util.fetch_url(hls_url, + headers=(('User-Agent', 'Mozilla/5.0'),), + debug_name='audio_hls_playlist').decode('utf-8') + + # Rewrite segment URLs to go through our proxy endpoint + import re as _re + from urllib.parse import urljoin + hls_base_url = hls_url.rsplit('/', 1)[0] + '/' + + def make_proxy_url(segment_url): + if segment_url.startswith('/ytl-api/audio-track'): + return segment_url + base_url = request.url_root.rstrip('/') + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}' + + playlist_lines = [] + for line in playlist.split('\n'): + line = line.strip() + if not line or line.startswith('#'): + playlist_lines.append(line) + continue + + if line.startswith('http://') or line.startswith('https://'): + segment_url = line + else: + segment_url = urljoin(hls_base_url, line) + + playlist_lines.append(make_proxy_url(segment_url)) + + playlist = '\n'.join(playlist_lines) + + return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl', + headers={'Access-Control-Allow-Origin': '*'}) + except Exception as e: + flask.abort(502, f'Playlist fetch failed: {e}') + + +@yt_app.route('/ytl-api/hls-manifest') +def get_hls_manifest(): + """Proxy HLS video manifest, rewriting ALL URLs including audio tracks.""" + from youtube.hls_cache import get_hls_url + + cache_key = request.args.get('id', '') + is_audio = '_audio_' in cache_key or cache_key.endswith('_audio') + print(f'[hls-manifest] Request: id={cache_key[:40] if cache_key else ""}... (audio={is_audio})') + + hls_url = get_hls_url(cache_key) + print(f'[hls-manifest] HLS URL: {hls_url[:80] if hls_url else None}...') + if not hls_url: + flask.abort(404, 'HLS manifest not found') + + try: + print('[hls-manifest] Fetching HLS manifest...') + manifest = util.fetch_url(hls_url, + headers=(('User-Agent', 'Mozilla/5.0'),), + debug_name='hls_manifest').decode('utf-8') + print(f'[hls-manifest] Successfully fetched manifest ({len(manifest)} bytes)') + + # Rewrite all URLs in the manifest to go through our proxy + import re as _re + from urllib.parse import urljoin + + # Get the base URL for resolving relative URLs + hls_base_url = hls_url.rsplit('/', 1)[0] + '/' + base_url = request.url_root.rstrip('/') + + # Rewrite URLs - handle both segment URLs and audio track URIs + def rewrite_url(url, is_audio_track=False): + if not url or url.startswith('/ytl-api/'): + return url + + # Resolve relative URLs + if not url.startswith('http://') and not url.startswith('https://'): + url = urljoin(hls_base_url, url) + + if is_audio_track: + # Audio track playlist - proxy through audio-track endpoint + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}' + else: + # Video segment or variant playlist - proxy through audio-track endpoint + return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}' + + # Parse and rewrite the manifest + manifest_lines = [] + rewritten_count = 0 + for line in manifest.split('\n'): + line = line.strip() + if not line: + manifest_lines.append(line) + continue + + # Handle EXT-X-MEDIA tags with URI (audio tracks) + if line.startswith('#EXT-X-MEDIA:') and 'URI=' in line: + # Extract and rewrite the URI attribute + def rewrite_media_uri(match): + nonlocal rewritten_count + uri = match.group(1) + rewritten_count += 1 + return f'URI="{rewrite_url(uri, is_audio_track=True)}"' + line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line) + manifest_lines.append(line) + elif line.startswith('#'): + # Other tags pass through + manifest_lines.append(line) + else: + # This is a URL (segment or variant playlist) + if line.startswith('http://') or line.startswith('https://'): + url = line + else: + url = urljoin(hls_base_url, line) + rewritten_count += 1 + manifest_lines.append(rewrite_url(url)) + manifest = '\n'.join(manifest_lines) + print(f'[hls-manifest] Rewrote manifest with {len(manifest_lines)} lines, {rewritten_count} URLs rewritten') + + return flask.Response(manifest, mimetype='application/vnd.apple.mpegurl', + headers={ + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Range, Content-Type', + 'Cache-Control': 'no-cache', + 'Content-Type': 'application/vnd.apple.mpegurl', + }) + except Exception as e: + print(f'[hls-manifest] Error: {e}') + import traceback + traceback.print_exc() + flask.abort(502, f'Manifest fetch failed: {e}') + + +@yt_app.route('/ytl-api/storyboard.vtt') +def get_storyboard_vtt(): + """ + See: + https://github.com/iv-org/invidious/blob/9a8b81fcbe49ff8d88f197b7f731d6bf79fc8087/src/invidious.cr#L3603 + https://github.com/iv-org/invidious/blob/3bb7fbb2f119790ee6675076b31cd990f75f64bb/src/invidious/videos.cr#L623 + """ + + spec_url = request.args.get('spec_url') + url, *boards = spec_url.split('|') + base_url, q = url.split('?') + q = parse_qs(q) # for url query + + storyboard = None + wanted_height = 90 + + for i, board in enumerate(boards): + *t, _, sigh = board.split("#") + width, height, count, width_cnt, height_cnt, interval = map(int, t) + if height != wanted_height: + continue + q['sigh'] = [sigh] + url = f"{base_url}?{urlencode(q, doseq=True)}" + storyboard = SimpleNamespace( + url = url.replace("$L", str(i)).replace("$N", "M$M"), + width = width, + height = height, + interval = interval, + width_cnt = width_cnt, + height_cnt = height_cnt, + storyboard_count = ceil(count / (width_cnt * height_cnt)) + ) + + if not storyboard: + flask.abort(404) + + def to_ts(ms): + s, ms = divmod(ms, 1000) + h, s = divmod(s, 3600) + m, s = divmod(s, 60) + return f"{h:02}:{m:02}:{s:02}.{ms:03}" + + r = "WEBVTT" # result + ts = 0 # current timestamp + + for i in range(storyboard.storyboard_count): + url = f'/{storyboard.url.replace("$M", str(i))}' + interval = storyboard.interval + w, h = storyboard.width, storyboard.height + w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt + + for j in range(h_cnt): + for k in range(w_cnt): + r += f"{to_ts(ts)} --> {to_ts(ts+interval)}\n" + r += f"{url}#xywh={w * k},{h * j},{w},{h}\n\n" + ts += interval + + return flask.Response(r, mimetype='text/vtt') + + +time_table = {'h': 3600, 'm': 60, 's': 1} @yt_app.route('/watch') -def get_watch_page(): - video_id = request.args['v'] +@yt_app.route('/embed') +@yt_app.route('/embed/<video_id>') +@yt_app.route('/shorts') +@yt_app.route('/shorts/<video_id>') +def get_watch_page(video_id=None): + video_id = request.args.get('v') or video_id + if not video_id: + return flask.render_template('error.html', error_message='Missing video id'), 404 if len(video_id) < 11: - flask.abort(404) - flask.abort(flask.Response('Incomplete video id (too short): ' + video_id)) + return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404 + + time_start_str = request.args.get('t', '0s') + time_start = 0 + if re.fullmatch(r'(\d+(h|m|s))+', time_start_str): + for match in re.finditer(r'(\d+)(h|m|s)', time_start_str): + time_start += int(match.group(1))*time_table[match.group(2)] + elif re.fullmatch(r'\d+', time_start_str): + time_start = int(time_start_str) lc = request.args.get('lc', '') - if settings.route_tor: - proxy = 'socks5://127.0.0.1:9150/' + playlist_id = request.args.get('list') + index = request.args.get('index') + use_invidious = bool(int(request.args.get('use_invidious', '1'))) + if request.path.startswith('/embed') and settings.embed_page_mode: + tasks = ( + gevent.spawn((lambda: {})), + gevent.spawn(extract_info, video_id, use_invidious, + playlist_id=playlist_id, index=index), + ) else: - proxy = '' - yt_dl_downloader = YoutubeDL(params={'youtube_include_dash_manifest':False, 'proxy':proxy}) - tasks = ( - gevent.spawn(comments.video_comments, video_id, int(settings.default_comment_sorting), lc=lc ), - gevent.spawn(extract_info, yt_dl_downloader, "https://www.youtube.com/watch?v=" + video_id, download=False) - ) + tasks = ( + gevent.spawn(comments.video_comments, video_id, + int(settings.default_comment_sorting), lc=lc), + gevent.spawn(extract_info, video_id, use_invidious, + playlist_id=playlist_id, index=index), + ) gevent.joinall(tasks) + util.check_gevent_exceptions(tasks[1]) comments_info, info = tasks[0].value, tasks[1].value - if isinstance(info, str): # youtube error - return flask.render_template('error.html', error_message = info) + if info['error']: + return flask.render_template('error.html', error_message=info['error']) video_info = { - "duration": util.seconds_to_timestamp(info["duration"]), - "id": info['id'], - "title": info['title'], - "author": info['uploader'], + 'duration': util.seconds_to_timestamp(info['duration'] or 0), + 'id': info['id'], + 'title': info['title'], + 'author': info['author'], + 'author_id': info['author_id'], } - upload_year = info["upload_date"][0:4] - upload_month = info["upload_date"][4:6] - upload_day = info["upload_date"][6:8] - upload_date = upload_month + "/" + upload_day + "/" + upload_year - - if settings.related_videos_mode: - related_videos = get_related_items(info) - else: - related_videos = [] - - - if settings.gather_googlevideo_domains: - with open(os.path.join(settings.data_dir, 'googlevideo-domains.txt'), 'a+', encoding='utf-8') as f: - url = info['formats'][0]['url'] - subdomain = url[0:url.find(".googlevideo.com")] - f.write(subdomain + "\n") + # prefix urls, and other post-processing not handled by yt_data_extract + for item in info['related_videos']: + # Only set thumbnail if YouTube didn't provide one + if not item.get('thumbnail'): + if item.get('type') == 'playlist' and item.get('first_video_id'): + item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['first_video_id']) + elif item.get('type') == 'video' and item.get('id'): + item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id']) + util.prefix_urls(item) + util.add_extra_html_info(item) + for song in info['music_list']: + song['url'] = util.prefix_url(song['url']) + if info['playlist']: + playlist_id = info['playlist']['id'] + for item in info['playlist']['items']: + # Only set thumbnail if YouTube didn't provide one + if not item.get('thumbnail') and item.get('type') == 'video' and item.get('id'): + item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id']) + util.prefix_urls(item) + util.add_extra_html_info(item) + if playlist_id: + item['url'] += f'&list={playlist_id}' + if item['index']: + item['url'] += f'&index={item["index"]}' + info['playlist']['author_url'] = util.prefix_url( + info['playlist']['author_url']) + if settings.img_prefix: + # Don't prefix hls_formats for now because the urls inside the manifest + # would need to be prefixed as well. + for fmt in info['formats']: + fmt['url'] = util.prefix_url(fmt['url']) + # Add video title to end of url path so it has a filename other than just + # "videoplayback" when downloaded + title = urllib.parse.quote(util.to_valid_filename(info['title'] or '')) + for fmt in info['formats']: + filename = title + ext = fmt.get('ext') + if ext: + filename += f'.{ext}' + fmt['url'] = fmt['url'].replace( + '/videoplayback', + f'/videoplayback/name/{filename}') download_formats = [] - for format in info['formats']: + for format in (info['formats'] + info['hls_formats']): + if format['acodec'] and format['vcodec']: + codecs_string = f"{format['acodec']}, {format['vcodec']}" + else: + codecs_string = format['acodec'] or format['vcodec'] or '?' download_formats.append({ 'url': format['url'], - 'ext': format['ext'], - 'resolution': yt_dl_downloader.format_resolution(format), - 'note': yt_dl_downloader._format_note(format), + 'ext': format['ext'] or '?', + 'audio_quality': audio_quality_string(format), + 'video_quality': video_quality_string(format), + 'file_size': format_bytes(format['file_size']), + 'codecs': codecs_string, }) - video_sources = get_video_sources(info) - video_height = video_sources[0]['height'] + if (settings.route_tor == 2) or info['tor_bypass_used']: + target_resolution = 240 + else: + res = settings.default_resolution + target_resolution = 1080 if res == 'auto' else int(res) + + # Get video sources for no-JS fallback and DASH (av-merge) fallback + video_sources = get_video_sources(info, target_resolution) + uni_sources = video_sources['uni_sources'] + pair_sources = video_sources['pair_sources'] + pair_idx = video_sources['pair_idx'] + + # Build audio tracks list from HLS + audio_tracks = [] + hls_audio_tracks = info.get('hls_audio_tracks', {}) + hls_manifest_url = info.get('hls_manifest_url') + if hls_audio_tracks: + # Prefer "original" audio track + original_lang = None + for lang, track in hls_audio_tracks.items(): + if 'original' in (track.get('name') or '').lower(): + original_lang = lang + break + + # Add tracks, preferring original as default + for lang, track in hls_audio_tracks.items(): + is_default = (lang == original_lang) if original_lang else track['is_default'] + if is_default: + audio_tracks.insert(0, { + 'id': lang, + 'name': track['name'], + 'is_default': True, + }) + else: + audio_tracks.append({ + 'id': lang, + 'name': track['name'], + 'is_default': False, + }) + else: + # Fallback: single default audio track + audio_tracks = [{'id': 'default', 'name': 'Default', 'is_default': True}] + + # Get video dimensions + video_height = info.get('height') or 360 + video_width = info.get('width') or 640 + + # 1 second per pixel, or the actual video width - theater_video_target_width = max(640, info['duration'], video_sources[0]['width']) + theater_video_target_width = max(640, info['duration'] or 0, video_width) + + # Check for false determination of disabled comments, which comes from + # the watch page. But if we got comments in the separate request for those, + # then the determination is wrong. + if info['comments_disabled'] and comments_info.get('comments'): + info['comments_disabled'] = False + print('Warning: False determination that comments are disabled') + print('Comment count:', info['comment_count']) + info['comment_count'] = None # hack to make it obvious there's a bug + + # captions and transcript + subtitle_sources = get_subtitle_sources(info) + other_downloads = [] + for source in subtitle_sources: + best_caption_parse = urllib.parse.urlparse( + source['url'].lstrip('/')) + transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}' + other_downloads.append({ + 'label': f'Video Transcript: {source["label"]}', + 'ext': 'txt', + 'url': transcript_url + }) - return flask.render_template('watch.html', + if request.path.startswith('/embed') and settings.embed_page_mode: + template_name = 'embed.html' + else: + template_name = 'watch.html' + return flask.render_template(template_name, header_playlist_names = local_playlist.get_playlist_names(), - uploader_channel_url = '/' + info['uploader_url'], - upload_date = upload_date, - views = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)), - likes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)), - dislikes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)), + uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '', + time_published = info['time_published'], + view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)), + like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)), + dislike_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)), download_formats = download_formats, + other_downloads = other_downloads, video_info = json.dumps(video_info), - video_sources = video_sources, - subtitle_sources = get_subtitle_sources(info), - related = related_videos, + hls_formats = info['hls_formats'], + hls_manifest_url = hls_manifest_url, + audio_tracks = audio_tracks, + subtitle_sources = subtitle_sources, + uni_sources = uni_sources, + pair_sources = pair_sources, + pair_idx = pair_idx, + hls_unavailable = info.get('hls_unavailable', False), + playback_mode = settings.playback_mode, + related = info['related_videos'], + playlist = info['playlist'], music_list = info['music_list'], music_attributes = get_ordered_music_list_attributes(info['music_list']), comments_info = comments_info, - - theater_mode = settings.theater_mode, - related_videos_mode = settings.related_videos_mode, - comments_mode = settings.comments_mode, + comment_count = info['comment_count'], + comments_disabled = info['comments_disabled'], video_height = video_height, + video_width = video_width, theater_video_target_width = theater_video_target_width, title = info['title'], - uploader = info['uploader'], + uploader = info['author'], description = info['description'], unlisted = info['unlisted'], + limited_state = info['limited_state'], + age_restricted = info['age_restricted'], + live = info['live'], + playability_error = info['playability_error'], + + allowed_countries = info['allowed_countries'], + ip_address = info['ip_address'] if settings.route_tor else None, + invidious_used = info['invidious_used'], + invidious_reload_button = info['invidious_reload_button'], + video_url = f'{util.URL_ORIGIN}/watch?v={video_id}', + video_id = video_id, + storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?' + f'{urlencode([("spec_url", info["storyboard_spec_url"])])}' + if info['storyboard_spec_url'] else None), + + js_data = { + 'video_id': info['id'], + 'video_duration': info['duration'], + 'settings': settings.current_settings_dict, + 'has_manual_captions': any(s.get('on') for s in subtitle_sources), + 'audio_tracks': audio_tracks, + 'hls_manifest_url': hls_manifest_url, + 'time_start': time_start, + 'playlist': info['playlist'], + 'related': info['related_videos'], + 'playability_error': info['playability_error'], + 'hls_unavailable': info.get('hls_unavailable', False), + 'pair_sources': pair_sources, + 'pair_idx': pair_idx, + 'uni_sources': uni_sources, + 'uni_idx': video_sources['uni_idx'], + 'using_pair_sources': bool(pair_sources), + }, + font_family = youtube.font_choices[settings.font], # for embed page ) @yt_app.route('/api/<path:dummy>') def get_captions(dummy): - result = util.fetch_url('https://www.youtube.com' + request.full_path) - result = result.replace(b"align:start position:0%", b"") - return result + url = f'https://www.youtube.com{request.full_path}' + try: + result = util.fetch_url(url, headers=util.mobile_ua) + result = result.replace(b"align:start position:0%", b"") + return flask.Response(result, mimetype='text/vtt') + except Exception as e: + logger.debug(f'Caption fetch failed: {e}') + return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200) + +times_reg = re.compile(r'^\d\d:\d\d:\d\d\.\d\d\d --> \d\d:\d\d:\d\d\.\d\d\d.*$') +inner_timestamp_removal_reg = re.compile(r'<[^>]+>') +@yt_app.route('/watch/transcript/<path:caption_path>') +def get_transcript(caption_path): + try: + captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8') + except util.FetchError as e: + msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.' + print(msg) + return flask.Response( + msg, + status=e.code, + mimetype='text/plain;charset=UTF-8') + + lines = captions.splitlines() + segments = [] + + # skip captions file header + i = 0 + while lines[i] != '': + i += 1 + current_segment = None + while i < len(lines): + line = lines[i] + if line == '': + if ((current_segment is not None) + and (current_segment['begin'] is not None)): + segments.append(current_segment) + current_segment = { + 'begin': None, + 'end': None, + 'lines': [], + } + elif times_reg.fullmatch(line.rstrip()): + current_segment['begin'], current_segment['end'] = line.split(' --> ') + else: + current_segment['lines'].append( + inner_timestamp_removal_reg.sub('', line)) + i += 1 + + # if automatic captions, but not translated + if request.args.get('kind') == 'asr' and not request.args.get('tlang'): + # Automatic captions repeat content. The new segment is displayed + # on the bottom row; the old one is displayed on the top row. + # So grab the bottom row only + for seg in segments: + seg['text'] = seg['lines'][1] + else: + for seg in segments: + seg['text'] = ' '.join(map(str.rstrip, seg['lines'])) + result = '' + for seg in segments: + if seg['text'] != ' ': + result += f"{seg['begin']} {seg['text']}\r\n" + return flask.Response(result.encode('utf-8'), + mimetype='text/plain;charset=UTF-8') |
