aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/watch.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/watch.py')
-rw-r--r--youtube/watch.py1427
1 files changed, 1271 insertions, 156 deletions
diff --git a/youtube/watch.py b/youtube/watch.py
index 41c90e4..ec446f4 100644
--- a/youtube/watch.py
+++ b/youtube/watch.py
@@ -1,120 +1,325 @@
+import json
+import logging
+import math
+import os
+import re
+import traceback
+import urllib
+from math import ceil
+from types import SimpleNamespace
+from urllib.parse import parse_qs, urlencode
+
+import flask
+import gevent
+import urllib3.exceptions
+from flask import request
+
+import youtube
from youtube import yt_app
from youtube import util, comments, local_playlist, yt_data_extract
+from youtube import watch_formats
import settings
-from flask import request
-import flask
+# Backward compatibility aliases
+codec_name = watch_formats.codec_name
+video_quality_string = watch_formats.video_quality_string
+short_video_quality_string = watch_formats.short_video_quality_string
+audio_quality_string = watch_formats.audio_quality_string
+format_bytes = watch_formats.format_bytes
-from youtube_dl.YoutubeDL import YoutubeDL
-from youtube_dl.extractor.youtube import YoutubeError
-import json
-import html
-import gevent
-import os
+logger = logging.getLogger(__name__)
-def get_related_items(info):
- results = []
- for item in info['related_vids']:
- if 'list' in item: # playlist:
- result = watch_page_related_playlist_info(item)
- else:
- result = watch_page_related_video_info(item)
- yt_data_extract.prefix_urls(result)
- yt_data_extract.add_extra_html_info(result)
- results.append(result)
- return results
-
-
-# json of related items retrieved directly from the watch page has different names for everything
-# converts these to standard names
-def watch_page_related_video_info(item):
- result = {key: item[key] for key in ('id', 'title', 'author')}
- result['duration'] = util.seconds_to_timestamp(item['length_seconds'])
- try:
- result['views'] = item['short_view_count_text']
- except KeyError:
- result['views'] = ''
- result['thumbnail'] = util.get_thumbnail_url(item['id'])
- result['type'] = 'video'
- return result
-
-def watch_page_related_playlist_info(item):
+try:
+ with open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'r') as f:
+ decrypt_cache = json.loads(f.read())['decrypt_cache']
+except FileNotFoundError:
+ decrypt_cache = {}
+
+
+# codec_name imported from watch_formats
+
+
+def get_video_sources(info, target_resolution):
+ '''return dict with organized sources'''
+ audio_by_track = {}
+ video_only_sources = {}
+ uni_sources = []
+ pair_sources = []
+
+ for fmt in info['formats']:
+ if not all(fmt[attr] for attr in ('ext', 'url', 'itag')):
+ continue
+ if fmt['acodec'] and fmt['vcodec']:
+ if fmt.get('audio_track_is_default', True) is False:
+ continue
+ source = {'type': f"video/{fmt['ext']}",
+ 'quality_string': short_video_quality_string(fmt)}
+ source['quality_string'] += ' (integrated)'
+ source.update(fmt)
+ uni_sources.append(source)
+ continue
+ if not (fmt['init_range'] and fmt['index_range']):
+ # Allow HLS-backed audio tracks (served locally, no init/index needed)
+ url_value = fmt.get('url', '')
+ if (not url_value.startswith('http://127.')
+ and '/ytl-api/' not in url_value):
+ continue
+ # Mark as HLS for frontend
+ fmt['is_hls'] = True
+ if fmt['acodec'] and not fmt['vcodec'] and (fmt['audio_bitrate'] or fmt['bitrate']):
+ if fmt['bitrate']:
+ fmt['audio_bitrate'] = int(fmt['bitrate']/1000)
+ source = {'type': f"audio/{fmt['ext']}",
+ 'quality_string': audio_quality_string(fmt)}
+ source.update(fmt)
+ source['mime_codec'] = f"{source['type']}; codecs=\"{source['acodec']}\""
+ tid = fmt.get('audio_track_id') or 'default'
+ if tid not in audio_by_track:
+ audio_by_track[tid] = {
+ 'name': fmt.get('audio_track_name') or 'Default',
+ 'is_default': fmt.get('audio_track_is_default', True),
+ 'sources': [],
+ }
+ audio_by_track[tid]['sources'].append(source)
+ elif all(fmt[attr] for attr in ('vcodec', 'quality', 'width', 'fps', 'file_size')):
+ if codec_name(fmt['vcodec']) == 'unknown':
+ continue
+ source = {'type': f"video/{fmt['ext']}",
+ 'quality_string': short_video_quality_string(fmt)}
+ source.update(fmt)
+ source['mime_codec'] = f"{source['type']}; codecs=\"{source['vcodec']}\""
+ quality = f"{fmt['quality']}p{fmt['fps']}"
+ video_only_sources.setdefault(quality, []).append(source)
+
+ audio_tracks = []
+ default_track_id = 'default'
+ for tid, ti in audio_by_track.items():
+ audio_tracks.append({'id': tid, 'name': ti['name'], 'is_default': ti['is_default']})
+ if ti['is_default']:
+ default_track_id = tid
+ audio_tracks.sort(key=lambda t: (not t['is_default'], t['name']))
+
+ default_audio = audio_by_track.get(default_track_id, {}).get('sources', [])
+ default_audio.sort(key=lambda s: s['audio_bitrate'])
+ uni_sources.sort(key=lambda src: src['quality'])
+ webm_audios = [a for a in default_audio if a['ext'] == 'webm']
+ mp4_audios = [a for a in default_audio if a['ext'] == 'mp4']
+
+ for quality_string, sources in video_only_sources.items():
+ # choose an audio source to go with it
+ # 0.5 is semiarbitrary empirical constant to spread audio sources
+ # between 144p and 1080p. Use something better eventually.
+ quality, fps = map(int, quality_string.split('p'))
+ target_audio_bitrate = quality*fps/30*0.5
+ pair_info = {
+ 'quality_string': quality_string,
+ 'quality': quality,
+ 'height': sources[0]['height'],
+ 'width': sources[0]['width'],
+ 'fps': fps,
+ 'videos': sources,
+ 'audios': [],
+ }
+ for audio_choices in (webm_audios, mp4_audios):
+ if not audio_choices:
+ continue
+ closest_audio_source = audio_choices[0]
+ best_err = target_audio_bitrate - audio_choices[0]['audio_bitrate']
+ best_err = abs(best_err)
+ for audio_source in audio_choices[1:]:
+ err = abs(audio_source['audio_bitrate'] - target_audio_bitrate)
+ # once err gets worse we have passed the closest one
+ if err > best_err:
+ break
+ best_err = err
+ closest_audio_source = audio_source
+ pair_info['audios'].append(closest_audio_source)
+
+ if not pair_info['audios']:
+ continue
+
+ def video_rank(src):
+ ''' Sort by settings preference. Use file size as tiebreaker '''
+ setting_name = f'codec_rank_{codec_name(src["vcodec"])}'
+ return (settings.current_settings_dict[setting_name],
+ src['file_size'])
+ pair_info['videos'].sort(key=video_rank)
+
+ pair_sources.append(pair_info)
+
+ pair_sources.sort(key=lambda src: src['quality'])
+
+ uni_idx = 0 if uni_sources else None
+ for i, source in enumerate(uni_sources):
+ if source['quality'] > target_resolution:
+ break
+ uni_idx = i
+
+ pair_idx = 0 if pair_sources else None
+ for i, pair_info in enumerate(pair_sources):
+ if pair_info['quality'] > target_resolution:
+ break
+ pair_idx = i
+
+ audio_track_sources = {}
+ for tid, ti in audio_by_track.items():
+ srcs = ti['sources']
+ srcs.sort(key=lambda s: s.get('audio_bitrate', 0))
+ audio_track_sources[tid] = srcs
+
return {
- 'size': item['playlist_length'] if item['playlist_length'] != "0" else "50+",
- 'title': item['playlist_title'],
- 'id': item['list'],
- 'first_video_id': item['video_id'],
- 'thumbnail': util.get_thumbnail_url(item['video_id']),
- 'type': 'playlist',
+ 'uni_sources': uni_sources,
+ 'uni_idx': uni_idx,
+ 'pair_sources': pair_sources,
+ 'pair_idx': pair_idx,
+ 'audio_tracks': audio_tracks,
+ 'audio_track_sources': audio_track_sources,
}
-def get_video_sources(info):
- video_sources = []
- if not settings.theater_mode:
- max_resolution = 360
+
+def make_caption_src(info, lang, auto=False, trans_lang=None):
+ label = lang
+ if auto:
+ label += ' (Automatic)'
+ if trans_lang:
+ label += f' -> {trans_lang}'
+
+ # Try to use Android caption URL directly (no PO Token needed)
+ caption_url = None
+ for track in info.get('_android_caption_tracks', []):
+ track_lang = track.get('languageCode', '')
+ track_kind = track.get('kind', '')
+ if track_lang == lang and (
+ (auto and track_kind == 'asr') or
+ (not auto and track_kind != 'asr')
+ ):
+ caption_url = track.get('baseUrl')
+ break
+
+ if caption_url:
+ # Add format
+ if '&fmt=' in caption_url:
+ caption_url = re.sub(r'&fmt=[^&]*', '&fmt=vtt', caption_url)
+ else:
+ caption_url += '&fmt=vtt'
+ if trans_lang:
+ caption_url += f'&tlang={trans_lang}'
+ url = util.prefix_url(caption_url)
else:
- max_resolution = settings.default_resolution
-
- for format in info['formats']:
- if format['acodec'] != 'none' and format['vcodec'] != 'none' and format['height'] <= max_resolution:
- video_sources.append({
- 'src': format['url'],
- 'type': 'video/' + format['ext'],
- 'height': format['height'],
- 'width': format['width'],
- })
+ # Fallback to old method
+ url = util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang))
+
+ return {
+ 'url': url,
+ 'label': label,
+ 'srclang': trans_lang[0:2] if trans_lang else lang[0:2],
+ 'on': False,
+ }
- #### order the videos sources so the preferred resolution is first ###
- video_sources.sort(key=lambda source: source['height'], reverse=True)
+def lang_in(lang, sequence):
+ '''Tests if the language is in sequence, with e.g. en and en-US considered the same'''
+ if lang is None:
+ return False
+ lang = lang[0:2]
+ return lang in (item[0:2] for item in sequence)
+
+
+def lang_eq(lang1, lang2):
+ '''Tests if two iso 639-1 codes are equal, with en and en-US considered the same.
+ Just because the codes are equal does not mean the dialects are mutually intelligible, but this will have to do for now without a complex language model'''
+ if lang1 is None or lang2 is None:
+ return False
+ return lang1[0:2] == lang2[0:2]
+
+
+def equiv_lang_in(lang, sequence):
+ '''Extracts a language in sequence which is equivalent to lang.
+ e.g. if lang is en, extracts en-GB from sequence.
+ Necessary because if only a specific variant like en-GB is available, can't ask YouTube for simply en. Need to get the available variant.'''
+ lang = lang[0:2]
+ for item in sequence:
+ if item[0:2] == lang:
+ return item
+ return None
- return video_sources
def get_subtitle_sources(info):
+ '''Returns these sources, ordered from least to most intelligible:
+ native_video_lang (Automatic)
+ foreign_langs (Manual)
+ native_video_lang (Automatic) -> pref_lang
+ foreign_langs (Manual) -> pref_lang
+ native_video_lang (Manual) -> pref_lang
+ pref_lang (Automatic)
+ pref_lang (Manual)'''
sources = []
- default_found = False
- default = None
- for language, formats in info['subtitles'].items():
- for format in formats:
- if format['ext'] == 'vtt':
- source = {
- 'url': '/' + format['url'],
- 'label': language,
- 'srclang': language,
-
- # set as on by default if this is the preferred language and a default-on subtitles mode is in settings
- 'on': language == settings.subtitles_language and settings.subtitles_mode > 0,
- }
+ if not yt_data_extract.captions_available(info):
+ return []
+ pref_lang = settings.subtitles_language
+ native_video_lang = None
+ if info['automatic_caption_languages']:
+ native_video_lang = info['automatic_caption_languages'][0]
- if language == settings.subtitles_language:
- default_found = True
- default = source
- else:
- sources.append(source)
- break
+ highest_fidelity_is_manual = False
- # Put it at the end to avoid browser bug when there are too many languages
+ # Sources are added in very specific order outlined above
+ # More intelligible sources are put further down to avoid browser bug when there are too many languages
# (in firefox, it is impossible to select a language near the top of the list because it is cut off)
- if default_found:
- sources.append(default)
- try:
- formats = info['automatic_captions'][settings.subtitles_language]
- except KeyError:
- pass
- else:
- for format in formats:
- if format['ext'] == 'vtt':
- sources.append({
- 'url': '/' + format['url'],
- 'label': settings.subtitles_language + ' - Automatic',
- 'srclang': settings.subtitles_language,
+ # native_video_lang (Automatic)
+ if native_video_lang and not lang_eq(native_video_lang, pref_lang):
+ sources.append(make_caption_src(info, native_video_lang, auto=True))
- # set as on by default if this is the preferred language and a default-on subtitles mode is in settings
- 'on': settings.subtitles_mode == 2 and not default_found,
+ # foreign_langs (Manual)
+ for lang in info['manual_caption_languages']:
+ if not lang_eq(lang, pref_lang):
+ sources.append(make_caption_src(info, lang))
- })
+ if (lang_in(pref_lang, info['translation_languages'])
+ and not lang_in(pref_lang, info['automatic_caption_languages'])
+ and not lang_in(pref_lang, info['manual_caption_languages'])):
+ # native_video_lang (Automatic) -> pref_lang
+ if native_video_lang and not lang_eq(pref_lang, native_video_lang):
+ sources.append(make_caption_src(info, native_video_lang, auto=True, trans_lang=pref_lang))
+
+ # foreign_langs (Manual) -> pref_lang
+ for lang in info['manual_caption_languages']:
+ if not lang_eq(lang, native_video_lang) and not lang_eq(lang, pref_lang):
+ sources.append(make_caption_src(info, lang, trans_lang=pref_lang))
+
+ # native_video_lang (Manual) -> pref_lang
+ if lang_in(native_video_lang, info['manual_caption_languages']):
+ sources.append(make_caption_src(info, native_video_lang, trans_lang=pref_lang))
+
+ # pref_lang (Automatic)
+ if lang_in(pref_lang, info['automatic_caption_languages']):
+ sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['automatic_caption_languages']), auto=True))
+
+ # pref_lang (Manual)
+ if lang_in(pref_lang, info['manual_caption_languages']):
+ sources.append(make_caption_src(info, equiv_lang_in(pref_lang, info['manual_caption_languages'])))
+ highest_fidelity_is_manual = True
+
+ if sources and sources[-1]['srclang'] == pref_lang:
+ # set as on by default since it's manual a default-on subtitles mode is in settings
+ if highest_fidelity_is_manual and settings.subtitles_mode > 0:
+ sources[-1]['on'] = True
+ # set as on by default since settings indicate to set it as such even if it's not manual
+ elif settings.subtitles_mode == 2:
+ sources[-1]['on'] = True
+
+ if len(sources) == 0:
+ # Invariant: with no caption sources there should be no languages
+ # either. Don't rely on `assert` which is stripped under `python -O`.
+ if (len(info['automatic_caption_languages']) != 0
+ or len(info['manual_caption_languages']) != 0):
+ logger.warning(
+ 'Unexpected state: no subtitle sources but %d auto / %d manual languages',
+ len(info['automatic_caption_languages']),
+ len(info['manual_caption_languages']),
+ )
return sources
@@ -135,115 +340,1025 @@ def get_ordered_music_list_attributes(music_list):
return ordered_attributes
-def extract_info(downloader, *args, **kwargs):
+def save_decrypt_cache():
+ os.makedirs(settings.data_dir, exist_ok=True)
+ f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
+
+ f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True))
+ f.close()
+
+
+def decrypt_signatures(info, video_id):
+ '''return error string, or False if no errors'''
+ if not yt_data_extract.requires_decryption(info):
+ return False
+ if not info['player_name']:
+ return 'Could not find player name'
+
+ player_name = info['player_name']
+ if player_name in decrypt_cache:
+ print(f'Using cached decryption function for: {player_name}')
+ info['decryption_function'] = decrypt_cache[player_name]
+ else:
+ base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text=f'Fetched player {player_name}')
+ base_js = base_js.decode('utf-8')
+ err = yt_data_extract.extract_decryption_function(info, base_js)
+ if err:
+ return err
+ decrypt_cache[player_name] = info['decryption_function']
+ save_decrypt_cache()
+ err = yt_data_extract.decrypt_signatures(info)
+ return err
+
+
+def _add_to_error(info, key, additional_message):
+ if key in info and info[key]:
+ info[key] += additional_message
+ else:
+ info[key] = additional_message
+
+
+def fetch_player_response(client, video_id):
+ return util.call_youtube_api(client, 'player', {
+ 'videoId': video_id,
+ })
+
+
+def fetch_watch_page_info(video_id, playlist_id, index):
+ # bpctr=9999999999 will bypass are-you-sure dialogs for controversial
+ # videos
+ url = f'https://m.youtube.com/embed/{video_id}?bpctr=9999999999'
+ if playlist_id:
+ url += f'&list={playlist_id}'
+ if index:
+ url += f'&index={index}'
+
+ headers = (
+ ('Accept', '*/*'),
+ ('Accept-Language', 'en-US,en;q=0.5'),
+ ('X-YouTube-Client-Name', '2'),
+ ('X-YouTube-Client-Version', '2.20180830'),
+ ) + util.mobile_ua
+
+ watch_page = util.fetch_url(url, headers=headers,
+ debug_name='watch')
+ watch_page = watch_page.decode('utf-8')
+ return yt_data_extract.extract_watch_info_from_html(watch_page)
+
+
+def extract_info(video_id, use_invidious, playlist_id=None, index=None):
+ primary_client = 'android_vr'
+ fallback_client = 'ios'
+ last_resort_client = 'tv_embedded'
+
+ tasks = (
+ # Get video metadata from here
+ gevent.spawn(fetch_watch_page_info, video_id, playlist_id, index),
+ gevent.spawn(fetch_player_response, primary_client, video_id)
+ )
+ gevent.joinall(tasks)
+ util.check_gevent_exceptions(*tasks)
+
+ info = tasks[0].value or {}
+ player_response = tasks[1].value or {}
+
+ # Save android_vr caption tracks (no PO Token needed for these URLs)
+ if isinstance(player_response, str):
+ try:
+ pr_data = json.loads(player_response)
+ except Exception:
+ pr_data = {}
+ else:
+ pr_data = player_response or {}
+ android_caption_tracks = yt_data_extract.deep_get(
+ pr_data, 'captions', 'playerCaptionsTracklistRenderer',
+ 'captionTracks', default=[])
+ info['_android_caption_tracks'] = android_caption_tracks
+
+ # Save streamingData for multi-audio extraction
+ pr_streaming_data = pr_data.get('streamingData', {})
+ info['_streamingData'] = pr_streaming_data
+
+ yt_data_extract.update_with_new_urls(info, player_response)
+
+ # HLS manifest - try multiple clients in case one is blocked
+ info['hls_manifest_url'] = None
+ info['hls_audio_tracks'] = {}
+ hls_data = None
+ hls_client_used = None
+ for hls_client in ('ios', 'android'):
+ try:
+ resp = fetch_player_response(hls_client, video_id) or {}
+ hls_data = json.loads(resp) if isinstance(resp, str) else resp
+ hls_manifest_url = (hls_data.get('streamingData') or {}).get('hlsManifestUrl', '')
+ if hls_manifest_url:
+ hls_client_used = hls_client
+ break
+ except Exception as e:
+ print(f'HLS fetch with {hls_client} failed: {e}')
+
+ if hls_manifest_url:
+ info['hls_manifest_url'] = hls_manifest_url
+ import re as _re
+ from urllib.parse import urljoin
+ hls_manifest = util.fetch_url(hls_manifest_url,
+ headers=(('User-Agent', 'Mozilla/5.0'),),
+ debug_name='hls_manifest').decode('utf-8')
+
+ # Parse EXT-X-MEDIA audio tracks from HLS manifest
+ for line in hls_manifest.split('\n'):
+ if '#EXT-X-MEDIA' not in line or 'TYPE=AUDIO' not in line:
+ continue
+ name_m = _re.search(r'NAME="([^"]+)"', line)
+ lang_m = _re.search(r'LANGUAGE="([^"]+)"', line)
+ default_m = _re.search(r'DEFAULT=(YES|NO)', line)
+ group_m = _re.search(r'GROUP-ID="([^"]+)"', line)
+ uri_m = _re.search(r'URI="([^"]+)"', line)
+ if not uri_m or not lang_m:
+ continue
+ lang = lang_m.group(1)
+ is_default = default_m and default_m.group(1) == 'YES'
+ group = group_m.group(1) if group_m else '0'
+ key = lang
+ absolute_hls_url = urljoin(hls_manifest_url, uri_m.group(1))
+ if key not in info['hls_audio_tracks'] or group > info['hls_audio_tracks'][key].get('group', '0'):
+ info['hls_audio_tracks'][key] = {
+ 'name': name_m.group(1) if name_m else lang,
+ 'lang': lang,
+ 'hls_url': absolute_hls_url,
+ 'group': group,
+ 'is_default': is_default,
+ }
+
+ # Register HLS audio tracks for proxy access
+ added = 0
+ for lang, track in info['hls_audio_tracks'].items():
+ ck = f"{video_id}_{lang}"
+ from youtube.hls_cache import register_track
+ register_track(ck, track['hls_url'],
+ video_id=video_id, track_id=lang)
+
+ fmt = {
+ 'audio_track_id': lang,
+ 'audio_track_name': track['name'],
+ 'audio_track_is_default': track['is_default'],
+ 'itag': f'hls_{lang}',
+ 'ext': 'mp4',
+ 'audio_bitrate': 128,
+ 'bitrate': 128000,
+ 'acodec': 'mp4a.40.2',
+ 'vcodec': None,
+ 'width': None,
+ 'height': None,
+ 'file_size': None,
+ 'audio_sample_rate': 44100,
+ 'duration_ms': None,
+ 'fps': None,
+ 'init_range': {'start': 0, 'end': 0},
+ 'index_range': {'start': 0, 'end': 0},
+ 'url': f'/ytl-api/audio-track?id={urllib.parse.quote(ck)}',
+ 's': None,
+ 'sp': None,
+ 'quality': None,
+ 'type': 'audio/mp4',
+ 'quality_string': track['name'],
+ 'mime_codec': 'audio/mp4; codecs="mp4a.40.2"',
+ 'is_hls': True,
+ }
+ info['formats'].append(fmt)
+ added += 1
+
+ if added:
+ print(f"Added {added} HLS audio tracks (via {hls_client_used})")
+ else:
+ print("No HLS manifest available from any client")
+ info['hls_manifest_url'] = None
+ info['hls_audio_tracks'] = {}
+ info['hls_unavailable'] = True
+
+ # Register HLS manifest for proxying
+ if info['hls_manifest_url']:
+ ck = f"{video_id}_video"
+ from youtube.hls_cache import register_track
+ register_track(ck, info['hls_manifest_url'], video_id=video_id, track_id='video')
+ # Use proxy URL instead of direct Google Video URL
+ info['hls_manifest_url'] = f'/ytl-api/hls-manifest?id={urllib.parse.quote(ck)}'
+
+ # Fallback to 'ios' if no valid URLs are found
+ if not info.get('formats') or info.get('player_urls_missing'):
+ print(f"No URLs found in '{primary_client}', attempting with '{fallback_client}'.")
+ try:
+ player_response = fetch_player_response(fallback_client, video_id) or {}
+ yt_data_extract.update_with_new_urls(info, player_response)
+ except util.FetchError as e:
+ print(f"Fallback '{fallback_client}' failed: {e}")
+
+ # Final attempt with 'tv_embedded' if there are still no URLs
+ if not info.get('formats') or info.get('player_urls_missing'):
+ print(f"No URLs found in '{fallback_client}', attempting with '{last_resort_client}'")
+ try:
+ player_response = fetch_player_response(last_resort_client, video_id) or {}
+ yt_data_extract.update_with_new_urls(info, player_response)
+ except util.FetchError as e:
+ print(f"Fallback '{last_resort_client}' failed: {e}")
+
+ # signature decryption
+ if info.get('formats'):
+ decryption_error = decrypt_signatures(info, video_id)
+ if decryption_error:
+ info['playability_error'] = f'Error decrypting url signatures: {decryption_error}'
+
+ # check if urls ready (non-live format) in former livestream
+ # urls not ready if all of them have no filesize
+ if info['was_live']:
+ info['urls_ready'] = False
+ for fmt in info['formats']:
+ if fmt['file_size'] is not None:
+ info['urls_ready'] = True
+ else:
+ info['urls_ready'] = True
+
+ # livestream urls
+ # sometimes only the livestream urls work soon after the livestream is over
+ info['hls_formats'] = []
+ if info.get('hls_manifest_url') and (info.get('live') or not info.get('formats') or not info['urls_ready']):
+ try:
+ manifest = util.fetch_url(info['hls_manifest_url'],
+ debug_name='hls_manifest.m3u8',
+ report_text='Fetched hls manifest'
+ ).decode('utf-8')
+ info['hls_formats'], err = yt_data_extract.extract_hls_formats(manifest)
+ if not err:
+ info['playability_error'] = None
+ for fmt in info['hls_formats']:
+ fmt['video_quality'] = video_quality_string(fmt)
+ except Exception as e:
+ print(f"Error obteniendo HLS manifest: {e}")
+ info['hls_formats'] = []
+
+ # check for 403. Unnecessary for tor video routing b/c ip address is same
+ info['invidious_used'] = False
+ info['invidious_reload_button'] = False
+ info['tor_bypass_used'] = False
+ if (settings.route_tor == 1
+ and info['formats'] and info['formats'][0]['url']):
+ try:
+ response = util.head(info['formats'][0]['url'],
+ report_text='Checked for URL access')
+ except urllib3.exceptions.HTTPError:
+ print('Error while checking for URL access:\n')
+ traceback.print_exc()
+ return info
+
+ if response.status == 403:
+ print('Access denied (403) for video urls.')
+ print('Routing video through Tor')
+ info['tor_bypass_used'] = True
+ for fmt in info['formats']:
+ fmt['url'] += '&use_tor=1'
+ elif 300 <= response.status < 400:
+ print('Error: exceeded max redirects while checking video URL')
+ return info
+
+
+# video_quality_string imported from watch_formats
+# short_video_quality_string imported from watch_formats
+# audio_quality_string imported from watch_formats
+# format_bytes imported from watch_formats
+ suffix = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'][exponent]
+ converted = float(bytes) / float(1024 ** exponent)
+ return '%.2f%s' % (converted, suffix)
+
+
+@yt_app.route('/ytl-api/audio-track-proxy')
+def audio_track_proxy():
+ """Proxy for DASH audio tracks to avoid throttling."""
+ audio_url = request.args.get('url', '')
+
+ if not audio_url:
+ flask.abort(400, 'Missing URL')
+
try:
- return downloader.extract_info(*args, **kwargs)
- except YoutubeError as e:
- return str(e)
+ headers = (
+ ('User-Agent', 'Mozilla/5.0'),
+ ('Accept', '*/*'),
+ )
+ content = util.fetch_url(audio_url, headers=headers,
+ debug_name='audio_dash', report_text=None)
+ return flask.Response(content, mimetype='audio/mp4',
+ headers={'Access-Control-Allow-Origin': '*',
+ 'Cache-Control': 'max-age=3600'})
+ except Exception as e:
+ flask.abort(502, f'Audio fetch failed: {e}')
+
+
+@yt_app.route('/ytl-api/audio-track')
+def get_audio_track():
+ """Proxy HLS audio/video: playlist or individual segment."""
+ from youtube.hls_cache import get_hls_url
+
+ cache_key = request.args.get('id', '')
+ seg_url = request.args.get('seg', '')
+ playlist_url = request.args.get('url', '')
+
+ # Handle playlist/manifest URL (used for audio track playlists)
+ if playlist_url:
+ # Unwrap if double-proxied
+ if '/ytl-api/audio-track' in playlist_url:
+ import urllib.parse as _up
+ parsed = _up.parse_qs(_up.urlparse(playlist_url).query)
+ if 'url' in parsed:
+ playlist_url = parsed['url'][0]
+
+ try:
+ playlist = util.fetch_url(playlist_url,
+ headers=(('User-Agent', 'Mozilla/5.0'),),
+ debug_name='audio_playlist').decode('utf-8')
+
+ # Rewrite segment URLs
+ import re as _re
+ from urllib.parse import urljoin
+ base_url = request.url_root.rstrip('/')
+ playlist_base = playlist_url.rsplit('/', 1)[0] + '/'
+
+ playlist_lines = []
+ for line in playlist.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('#'):
+ playlist_lines.append(line)
+ continue
+
+ # Resolve and proxy segment URL
+ seg = line if line.startswith('http') else urljoin(playlist_base, line)
+ # Always use &seg= parameter, never &url= for segments
+ playlist_lines.append(
+ f'{base_url}/ytl-api/audio-track?id='
+ f'{urllib.parse.quote(cache_key)}'
+ f'&seg={urllib.parse.quote(seg, safe="")}'
+ )
+
+ playlist = '\n'.join(playlist_lines)
+
+ return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl',
+ headers={'Access-Control-Allow-Origin': '*'})
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ flask.abort(502, f'Playlist fetch failed: {e}')
+
+ # Handle individual segment or nested playlist
+ if seg_url:
+ # Check if seg_url is already a proxied URL
+ if '/ytl-api/audio-track' in seg_url:
+ import urllib.parse as _up
+ parsed = _up.parse_qs(_up.urlparse(seg_url).query)
+ if 'seg' in parsed:
+ seg_url = parsed['seg'][0]
+ elif 'url' in parsed:
+ seg_url = parsed['url'][0]
+
+ # Check if this is a nested playlist (m3u8) that needs rewriting
+ # Playlists END with .m3u8 (optionally followed by query params)
+ # Segments may contain /index.m3u8/ in their path but end with .ts or similar
+ url_path = urllib.parse.urlparse(seg_url).path
+
+ # Only treat as playlist if path ends with .m3u8
+ # Don't use 'in' check because segments can have /index.m3u8/ in their path
+ is_playlist = url_path.endswith('.m3u8')
+
+ if is_playlist:
+ # This is a variant playlist - fetch and rewrite it
+ try:
+ raw_content = util.fetch_url(seg_url,
+ headers=(('User-Agent', 'Mozilla/5.0'),),
+ debug_name='nested_playlist')
+
+ # Check if this is actually binary data (segment) misidentified as playlist
+ try:
+ playlist = raw_content.decode('utf-8')
+ except UnicodeDecodeError:
+ is_playlist = False # Fall through to segment handler
+
+ if is_playlist:
+ # Rewrite segment URLs in this playlist
+ from urllib.parse import urljoin
+ import re as _re
+ base_url = request.url_root.rstrip('/')
+ playlist_base = seg_url.rsplit('/', 1)[0] + '/'
+
+ def proxy_url(url):
+ """Rewrite a single URL to go through the proxy"""
+ if not url or url.startswith('/ytl-api/'):
+ return url
+ if not url.startswith('http://') and not url.startswith('https://'):
+ url = urljoin(playlist_base, url)
+ return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
+
+ playlist_lines = []
+ for line in playlist.split('\n'):
+ line = line.strip()
+ if not line:
+ playlist_lines.append(line)
+ continue
+
+ # Handle tags with URI attributes (EXT-X-MAP, EXT-X-KEY, etc.)
+ if line.startswith('#') and 'URI=' in line:
+ def rewrite_uri_attr(match):
+ uri = match.group(1)
+ return f'URI="{proxy_url(uri)}"'
+ line = _re.sub(r'URI="([^"]+)"', rewrite_uri_attr, line)
+ playlist_lines.append(line)
+ elif line.startswith('#'):
+ # Other tags pass through unchanged
+ playlist_lines.append(line)
+ else:
+ # This is a segment URL line
+ seg = line if line.startswith('http') else urljoin(playlist_base, line)
+ playlist_lines.append(proxy_url(seg))
+
+ playlist = '\n'.join(playlist_lines)
+
+ return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl',
+ headers={'Access-Control-Allow-Origin': '*'})
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ flask.abort(502, f'Nested playlist fetch failed: {e}')
+
+ # This is an actual segment - fetch and serve it
+ try:
+ headers_dict = {
+ 'User-Agent': 'Mozilla/5.0',
+ 'Accept': '*/*',
+ }
+
+ # Determine content type based on URL
+ # HLS segments are usually MPEG-TS (.ts) but can be MP4 (.mp4, .m4s)
+ if '.mp4' in seg_url or '.m4s' in seg_url or seg_url.lower().endswith('.mp4'):
+ content_type = 'video/mp4'
+ elif '.webm' in seg_url or seg_url.lower().endswith('.webm'):
+ content_type = 'video/webm'
+ else:
+ # Default to MPEG-TS for HLS
+ content_type = 'video/mp2t'
+ response, cleanup_func = util.fetch_url_response(
+ seg_url, headers=tuple(headers_dict.items()),
+ timeout=30, use_tor=settings.route_tor)
+ def generate():
+ try:
+ while True:
+ chunk = response.read(64 * 1024) # 64 KB chunks
+ if not chunk:
+ break
+ yield chunk
+ finally:
+ cleanup_func(response)
+ return flask.Response(
+ flask.stream_with_context(generate()),
+ mimetype=content_type,
+ headers={
+ 'Access-Control-Allow-Origin': '*',
+ 'Access-Control-Allow-Methods': 'GET, OPTIONS',
+ 'Access-Control-Allow-Headers': 'Range, Content-Type',
+ 'Cache-Control': 'max-age=3600',
+ 'Content-Type': content_type,
+ })
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ flask.abort(502, f'Segment fetch failed: {e}')
+
+ # Legacy: Proxy the HLS playlist for audio tracks (using get_hls_url)
+ hls_url = get_hls_url(cache_key)
+ if not hls_url:
+ flask.abort(404, 'Audio track not found')
+
+ try:
+ playlist = util.fetch_url(hls_url,
+ headers=(('User-Agent', 'Mozilla/5.0'),),
+ debug_name='audio_hls_playlist').decode('utf-8')
+
+ # Rewrite segment URLs to go through our proxy endpoint
+ import re as _re
+ from urllib.parse import urljoin
+ hls_base_url = hls_url.rsplit('/', 1)[0] + '/'
+
+ def make_proxy_url(segment_url):
+ if segment_url.startswith('/ytl-api/audio-track'):
+ return segment_url
+ base_url = request.url_root.rstrip('/')
+ return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(segment_url)}'
+
+ playlist_lines = []
+ for line in playlist.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('#'):
+ playlist_lines.append(line)
+ continue
+
+ if line.startswith('http://') or line.startswith('https://'):
+ segment_url = line
+ else:
+ segment_url = urljoin(hls_base_url, line)
+
+ playlist_lines.append(make_proxy_url(segment_url))
+
+ playlist = '\n'.join(playlist_lines)
+
+ return flask.Response(playlist, mimetype='application/vnd.apple.mpegurl',
+ headers={'Access-Control-Allow-Origin': '*'})
+ except Exception as e:
+ flask.abort(502, f'Playlist fetch failed: {e}')
+
+
+@yt_app.route('/ytl-api/hls-manifest')
+def get_hls_manifest():
+ """Proxy HLS video manifest, rewriting ALL URLs including audio tracks."""
+ from youtube.hls_cache import get_hls_url
+
+ cache_key = request.args.get('id', '')
+ is_audio = '_audio_' in cache_key or cache_key.endswith('_audio')
+ print(f'[hls-manifest] Request: id={cache_key[:40] if cache_key else ""}... (audio={is_audio})')
+
+ hls_url = get_hls_url(cache_key)
+ print(f'[hls-manifest] HLS URL: {hls_url[:80] if hls_url else None}...')
+ if not hls_url:
+ flask.abort(404, 'HLS manifest not found')
+
+ try:
+ print('[hls-manifest] Fetching HLS manifest...')
+ manifest = util.fetch_url(hls_url,
+ headers=(('User-Agent', 'Mozilla/5.0'),),
+ debug_name='hls_manifest').decode('utf-8')
+ print(f'[hls-manifest] Successfully fetched manifest ({len(manifest)} bytes)')
+
+ # Rewrite all URLs in the manifest to go through our proxy
+ import re as _re
+ from urllib.parse import urljoin
+
+ # Get the base URL for resolving relative URLs
+ hls_base_url = hls_url.rsplit('/', 1)[0] + '/'
+ base_url = request.url_root.rstrip('/')
+
+ # Rewrite URLs - handle both segment URLs and audio track URIs
+ def rewrite_url(url, is_audio_track=False):
+ if not url or url.startswith('/ytl-api/'):
+ return url
+
+ # Resolve relative URLs
+ if not url.startswith('http://') and not url.startswith('https://'):
+ url = urljoin(hls_base_url, url)
+
+ if is_audio_track:
+ # Audio track playlist - proxy through audio-track endpoint
+ return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&url={urllib.parse.quote(url, safe="")}'
+ else:
+ # Video segment or variant playlist - proxy through audio-track endpoint
+ return f'{base_url}/ytl-api/audio-track?id={urllib.parse.quote(cache_key)}&seg={urllib.parse.quote(url, safe="")}'
+
+ # Parse and rewrite the manifest
+ manifest_lines = []
+ rewritten_count = 0
+ for line in manifest.split('\n'):
+ line = line.strip()
+ if not line:
+ manifest_lines.append(line)
+ continue
+
+ # Handle EXT-X-MEDIA tags with URI (audio tracks)
+ if line.startswith('#EXT-X-MEDIA:') and 'URI=' in line:
+ # Extract and rewrite the URI attribute
+ def rewrite_media_uri(match):
+ nonlocal rewritten_count
+ uri = match.group(1)
+ rewritten_count += 1
+ return f'URI="{rewrite_url(uri, is_audio_track=True)}"'
+ line = _re.sub(r'URI="([^"]+)"', rewrite_media_uri, line)
+ manifest_lines.append(line)
+ elif line.startswith('#'):
+ # Other tags pass through
+ manifest_lines.append(line)
+ else:
+ # This is a URL (segment or variant playlist)
+ if line.startswith('http://') or line.startswith('https://'):
+ url = line
+ else:
+ url = urljoin(hls_base_url, line)
+ rewritten_count += 1
+ manifest_lines.append(rewrite_url(url))
+ manifest = '\n'.join(manifest_lines)
+ print(f'[hls-manifest] Rewrote manifest with {len(manifest_lines)} lines, {rewritten_count} URLs rewritten')
+
+ return flask.Response(manifest, mimetype='application/vnd.apple.mpegurl',
+ headers={
+ 'Access-Control-Allow-Origin': '*',
+ 'Access-Control-Allow-Methods': 'GET, OPTIONS',
+ 'Access-Control-Allow-Headers': 'Range, Content-Type',
+ 'Cache-Control': 'no-cache',
+ 'Content-Type': 'application/vnd.apple.mpegurl',
+ })
+ except Exception as e:
+ print(f'[hls-manifest] Error: {e}')
+ import traceback
+ traceback.print_exc()
+ flask.abort(502, f'Manifest fetch failed: {e}')
+
+
+@yt_app.route('/ytl-api/storyboard.vtt')
+def get_storyboard_vtt():
+ """
+ See:
+ https://github.com/iv-org/invidious/blob/9a8b81fcbe49ff8d88f197b7f731d6bf79fc8087/src/invidious.cr#L3603
+ https://github.com/iv-org/invidious/blob/3bb7fbb2f119790ee6675076b31cd990f75f64bb/src/invidious/videos.cr#L623
+ """
+
+ spec_url = request.args.get('spec_url')
+ url, *boards = spec_url.split('|')
+ base_url, q = url.split('?')
+ q = parse_qs(q) # for url query
+
+ storyboard = None
+ wanted_height = 90
+
+ for i, board in enumerate(boards):
+ *t, _, sigh = board.split("#")
+ width, height, count, width_cnt, height_cnt, interval = map(int, t)
+ if height != wanted_height:
+ continue
+ q['sigh'] = [sigh]
+ url = f"{base_url}?{urlencode(q, doseq=True)}"
+ storyboard = SimpleNamespace(
+ url = url.replace("$L", str(i)).replace("$N", "M$M"),
+ width = width,
+ height = height,
+ interval = interval,
+ width_cnt = width_cnt,
+ height_cnt = height_cnt,
+ storyboard_count = ceil(count / (width_cnt * height_cnt))
+ )
+
+ if not storyboard:
+ flask.abort(404)
+
+ def to_ts(ms):
+ s, ms = divmod(ms, 1000)
+ h, s = divmod(s, 3600)
+ m, s = divmod(s, 60)
+ return f"{h:02}:{m:02}:{s:02}.{ms:03}"
+
+ r = "WEBVTT" # result
+ ts = 0 # current timestamp
+
+ for i in range(storyboard.storyboard_count):
+ url = f'/{storyboard.url.replace("$M", str(i))}'
+ interval = storyboard.interval
+ w, h = storyboard.width, storyboard.height
+ w_cnt, h_cnt = storyboard.width_cnt, storyboard.height_cnt
+
+ for j in range(h_cnt):
+ for k in range(w_cnt):
+ r += f"{to_ts(ts)} --> {to_ts(ts+interval)}\n"
+ r += f"{url}#xywh={w * k},{h * j},{w},{h}\n\n"
+ ts += interval
+
+ return flask.Response(r, mimetype='text/vtt')
+
+
+time_table = {'h': 3600, 'm': 60, 's': 1}
@yt_app.route('/watch')
-def get_watch_page():
- video_id = request.args['v']
+@yt_app.route('/embed')
+@yt_app.route('/embed/<video_id>')
+@yt_app.route('/shorts')
+@yt_app.route('/shorts/<video_id>')
+def get_watch_page(video_id=None):
+ video_id = request.args.get('v') or video_id
+ if not video_id:
+ return flask.render_template('error.html', error_message='Missing video id'), 404
if len(video_id) < 11:
- flask.abort(404)
- flask.abort(flask.Response('Incomplete video id (too short): ' + video_id))
+ return flask.render_template('error.html', error_message=f'Incomplete video id (too short): {video_id}'), 404
+
+ time_start_str = request.args.get('t', '0s')
+ time_start = 0
+ if re.fullmatch(r'(\d+(h|m|s))+', time_start_str):
+ for match in re.finditer(r'(\d+)(h|m|s)', time_start_str):
+ time_start += int(match.group(1))*time_table[match.group(2)]
+ elif re.fullmatch(r'\d+', time_start_str):
+ time_start = int(time_start_str)
lc = request.args.get('lc', '')
- if settings.route_tor:
- proxy = 'socks5://127.0.0.1:9150/'
+ playlist_id = request.args.get('list')
+ index = request.args.get('index')
+ use_invidious = bool(int(request.args.get('use_invidious', '1')))
+ if request.path.startswith('/embed') and settings.embed_page_mode:
+ tasks = (
+ gevent.spawn((lambda: {})),
+ gevent.spawn(extract_info, video_id, use_invidious,
+ playlist_id=playlist_id, index=index),
+ )
else:
- proxy = ''
- yt_dl_downloader = YoutubeDL(params={'youtube_include_dash_manifest':False, 'proxy':proxy})
- tasks = (
- gevent.spawn(comments.video_comments, video_id, int(settings.default_comment_sorting), lc=lc ),
- gevent.spawn(extract_info, yt_dl_downloader, "https://www.youtube.com/watch?v=" + video_id, download=False)
- )
+ tasks = (
+ gevent.spawn(comments.video_comments, video_id,
+ int(settings.default_comment_sorting), lc=lc),
+ gevent.spawn(extract_info, video_id, use_invidious,
+ playlist_id=playlist_id, index=index),
+ )
gevent.joinall(tasks)
+ util.check_gevent_exceptions(tasks[1])
comments_info, info = tasks[0].value, tasks[1].value
- if isinstance(info, str): # youtube error
- return flask.render_template('error.html', error_message = info)
+ if info['error']:
+ return flask.render_template('error.html', error_message=info['error'])
video_info = {
- "duration": util.seconds_to_timestamp(info["duration"]),
- "id": info['id'],
- "title": info['title'],
- "author": info['uploader'],
+ 'duration': util.seconds_to_timestamp(info['duration'] or 0),
+ 'id': info['id'],
+ 'title': info['title'],
+ 'author': info['author'],
+ 'author_id': info['author_id'],
}
- upload_year = info["upload_date"][0:4]
- upload_month = info["upload_date"][4:6]
- upload_day = info["upload_date"][6:8]
- upload_date = upload_month + "/" + upload_day + "/" + upload_year
-
- if settings.related_videos_mode:
- related_videos = get_related_items(info)
- else:
- related_videos = []
-
-
- if settings.gather_googlevideo_domains:
- with open(os.path.join(settings.data_dir, 'googlevideo-domains.txt'), 'a+', encoding='utf-8') as f:
- url = info['formats'][0]['url']
- subdomain = url[0:url.find(".googlevideo.com")]
- f.write(subdomain + "\n")
+ # prefix urls, and other post-processing not handled by yt_data_extract
+ for item in info['related_videos']:
+ # Only set thumbnail if YouTube didn't provide one
+ if not item.get('thumbnail'):
+ if item.get('type') == 'playlist' and item.get('first_video_id'):
+ item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['first_video_id'])
+ elif item.get('type') == 'video' and item.get('id'):
+ item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
+ util.prefix_urls(item)
+ util.add_extra_html_info(item)
+ for song in info['music_list']:
+ song['url'] = util.prefix_url(song['url'])
+ if info['playlist']:
+ playlist_id = info['playlist']['id']
+ for item in info['playlist']['items']:
+ # Only set thumbnail if YouTube didn't provide one
+ if not item.get('thumbnail') and item.get('type') == 'video' and item.get('id'):
+ item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
+ util.prefix_urls(item)
+ util.add_extra_html_info(item)
+ if playlist_id:
+ item['url'] += f'&list={playlist_id}'
+ if item['index']:
+ item['url'] += f'&index={item["index"]}'
+ info['playlist']['author_url'] = util.prefix_url(
+ info['playlist']['author_url'])
+ if settings.img_prefix:
+ # Don't prefix hls_formats for now because the urls inside the manifest
+ # would need to be prefixed as well.
+ for fmt in info['formats']:
+ fmt['url'] = util.prefix_url(fmt['url'])
+ # Add video title to end of url path so it has a filename other than just
+ # "videoplayback" when downloaded
+ title = urllib.parse.quote(util.to_valid_filename(info['title'] or ''))
+ for fmt in info['formats']:
+ filename = title
+ ext = fmt.get('ext')
+ if ext:
+ filename += f'.{ext}'
+ fmt['url'] = fmt['url'].replace(
+ '/videoplayback',
+ f'/videoplayback/name/{filename}')
download_formats = []
- for format in info['formats']:
+ for format in (info['formats'] + info['hls_formats']):
+ if format['acodec'] and format['vcodec']:
+ codecs_string = f"{format['acodec']}, {format['vcodec']}"
+ else:
+ codecs_string = format['acodec'] or format['vcodec'] or '?'
download_formats.append({
'url': format['url'],
- 'ext': format['ext'],
- 'resolution': yt_dl_downloader.format_resolution(format),
- 'note': yt_dl_downloader._format_note(format),
+ 'ext': format['ext'] or '?',
+ 'audio_quality': audio_quality_string(format),
+ 'video_quality': video_quality_string(format),
+ 'file_size': format_bytes(format['file_size']),
+ 'codecs': codecs_string,
})
- video_sources = get_video_sources(info)
- video_height = video_sources[0]['height']
+ if (settings.route_tor == 2) or info['tor_bypass_used']:
+ target_resolution = 240
+ else:
+ res = settings.default_resolution
+ target_resolution = 1080 if res == 'auto' else int(res)
+
+ # Get video sources for no-JS fallback and DASH (av-merge) fallback
+ video_sources = get_video_sources(info, target_resolution)
+ uni_sources = video_sources['uni_sources']
+ pair_sources = video_sources['pair_sources']
+ pair_idx = video_sources['pair_idx']
+
+ # Build audio tracks list from HLS
+ audio_tracks = []
+ hls_audio_tracks = info.get('hls_audio_tracks', {})
+ hls_manifest_url = info.get('hls_manifest_url')
+ if hls_audio_tracks:
+ # Prefer "original" audio track
+ original_lang = None
+ for lang, track in hls_audio_tracks.items():
+ if 'original' in (track.get('name') or '').lower():
+ original_lang = lang
+ break
+
+ # Add tracks, preferring original as default
+ for lang, track in hls_audio_tracks.items():
+ is_default = (lang == original_lang) if original_lang else track['is_default']
+ if is_default:
+ audio_tracks.insert(0, {
+ 'id': lang,
+ 'name': track['name'],
+ 'is_default': True,
+ })
+ else:
+ audio_tracks.append({
+ 'id': lang,
+ 'name': track['name'],
+ 'is_default': False,
+ })
+ else:
+ # Fallback: single default audio track
+ audio_tracks = [{'id': 'default', 'name': 'Default', 'is_default': True}]
+
+ # Get video dimensions
+ video_height = info.get('height') or 360
+ video_width = info.get('width') or 640
+
+
# 1 second per pixel, or the actual video width
- theater_video_target_width = max(640, info['duration'], video_sources[0]['width'])
+ theater_video_target_width = max(640, info['duration'] or 0, video_width)
+
+ # Check for false determination of disabled comments, which comes from
+ # the watch page. But if we got comments in the separate request for those,
+ # then the determination is wrong.
+ if info['comments_disabled'] and comments_info.get('comments'):
+ info['comments_disabled'] = False
+ print('Warning: False determination that comments are disabled')
+ print('Comment count:', info['comment_count'])
+ info['comment_count'] = None # hack to make it obvious there's a bug
+
+ # captions and transcript
+ subtitle_sources = get_subtitle_sources(info)
+ other_downloads = []
+ for source in subtitle_sources:
+ best_caption_parse = urllib.parse.urlparse(
+ source['url'].lstrip('/'))
+ transcript_url = f'{util.URL_ORIGIN}/watch/transcript{best_caption_parse.path}?{best_caption_parse.query}'
+ other_downloads.append({
+ 'label': f'Video Transcript: {source["label"]}',
+ 'ext': 'txt',
+ 'url': transcript_url
+ })
- return flask.render_template('watch.html',
+ if request.path.startswith('/embed') and settings.embed_page_mode:
+ template_name = 'embed.html'
+ else:
+ template_name = 'watch.html'
+ return flask.render_template(template_name,
header_playlist_names = local_playlist.get_playlist_names(),
- uploader_channel_url = '/' + info['uploader_url'],
- upload_date = upload_date,
- views = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
- likes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
- dislikes = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)),
+ uploader_channel_url = f'/{info["author_url"]}' if info['author_url'] else '',
+ time_published = info['time_published'],
+ view_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("view_count", None)),
+ like_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("like_count", None)),
+ dislike_count = (lambda x: '{:,}'.format(x) if x is not None else "")(info.get("dislike_count", None)),
download_formats = download_formats,
+ other_downloads = other_downloads,
video_info = json.dumps(video_info),
- video_sources = video_sources,
- subtitle_sources = get_subtitle_sources(info),
- related = related_videos,
+ hls_formats = info['hls_formats'],
+ hls_manifest_url = hls_manifest_url,
+ audio_tracks = audio_tracks,
+ subtitle_sources = subtitle_sources,
+ uni_sources = uni_sources,
+ pair_sources = pair_sources,
+ pair_idx = pair_idx,
+ hls_unavailable = info.get('hls_unavailable', False),
+ playback_mode = settings.playback_mode,
+ related = info['related_videos'],
+ playlist = info['playlist'],
music_list = info['music_list'],
music_attributes = get_ordered_music_list_attributes(info['music_list']),
comments_info = comments_info,
-
- theater_mode = settings.theater_mode,
- related_videos_mode = settings.related_videos_mode,
- comments_mode = settings.comments_mode,
+ comment_count = info['comment_count'],
+ comments_disabled = info['comments_disabled'],
video_height = video_height,
+ video_width = video_width,
theater_video_target_width = theater_video_target_width,
title = info['title'],
- uploader = info['uploader'],
+ uploader = info['author'],
description = info['description'],
unlisted = info['unlisted'],
+ limited_state = info['limited_state'],
+ age_restricted = info['age_restricted'],
+ live = info['live'],
+ playability_error = info['playability_error'],
+
+ allowed_countries = info['allowed_countries'],
+ ip_address = info['ip_address'] if settings.route_tor else None,
+ invidious_used = info['invidious_used'],
+ invidious_reload_button = info['invidious_reload_button'],
+ video_url = f'{util.URL_ORIGIN}/watch?v={video_id}',
+ video_id = video_id,
+ storyboard_url = (f'{util.URL_ORIGIN}/ytl-api/storyboard.vtt?'
+ f'{urlencode([("spec_url", info["storyboard_spec_url"])])}'
+ if info['storyboard_spec_url'] else None),
+
+ js_data = {
+ 'video_id': info['id'],
+ 'video_duration': info['duration'],
+ 'settings': settings.current_settings_dict,
+ 'has_manual_captions': any(s.get('on') for s in subtitle_sources),
+ 'audio_tracks': audio_tracks,
+ 'hls_manifest_url': hls_manifest_url,
+ 'time_start': time_start,
+ 'playlist': info['playlist'],
+ 'related': info['related_videos'],
+ 'playability_error': info['playability_error'],
+ 'hls_unavailable': info.get('hls_unavailable', False),
+ 'pair_sources': pair_sources,
+ 'pair_idx': pair_idx,
+ 'uni_sources': uni_sources,
+ 'uni_idx': video_sources['uni_idx'],
+ 'using_pair_sources': bool(pair_sources),
+ },
+ font_family = youtube.font_choices[settings.font], # for embed page
)
@yt_app.route('/api/<path:dummy>')
def get_captions(dummy):
- result = util.fetch_url('https://www.youtube.com' + request.full_path)
- result = result.replace(b"align:start position:0%", b"")
- return result
+ url = f'https://www.youtube.com{request.full_path}'
+ try:
+ result = util.fetch_url(url, headers=util.mobile_ua)
+ result = result.replace(b"align:start position:0%", b"")
+ return flask.Response(result, mimetype='text/vtt')
+ except Exception as e:
+ logger.debug(f'Caption fetch failed: {e}')
+ return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
+
+times_reg = re.compile(r'^\d\d:\d\d:\d\d\.\d\d\d --> \d\d:\d\d:\d\d\.\d\d\d.*$')
+inner_timestamp_removal_reg = re.compile(r'<[^>]+>')
+@yt_app.route('/watch/transcript/<path:caption_path>')
+def get_transcript(caption_path):
+ try:
+ captions = util.fetch_url(f'https://www.youtube.com/{caption_path}?{request.environ["QUERY_STRING"]}').decode('utf-8')
+ except util.FetchError as e:
+ msg = f'Error retrieving captions: {e}\n\nThe caption url may have expired.'
+ print(msg)
+ return flask.Response(
+ msg,
+ status=e.code,
+ mimetype='text/plain;charset=UTF-8')
+
+ lines = captions.splitlines()
+ segments = []
+
+ # skip captions file header
+ i = 0
+ while lines[i] != '':
+ i += 1
+ current_segment = None
+ while i < len(lines):
+ line = lines[i]
+ if line == '':
+ if ((current_segment is not None)
+ and (current_segment['begin'] is not None)):
+ segments.append(current_segment)
+ current_segment = {
+ 'begin': None,
+ 'end': None,
+ 'lines': [],
+ }
+ elif times_reg.fullmatch(line.rstrip()):
+ current_segment['begin'], current_segment['end'] = line.split(' --> ')
+ else:
+ current_segment['lines'].append(
+ inner_timestamp_removal_reg.sub('', line))
+ i += 1
+
+ # if automatic captions, but not translated
+ if request.args.get('kind') == 'asr' and not request.args.get('tlang'):
+ # Automatic captions repeat content. The new segment is displayed
+ # on the bottom row; the old one is displayed on the top row.
+ # So grab the bottom row only
+ for seg in segments:
+ seg['text'] = seg['lines'][1]
+ else:
+ for seg in segments:
+ seg['text'] = ' '.join(map(str.rstrip, seg['lines']))
+ result = ''
+ for seg in segments:
+ if seg['text'] != ' ':
+ result += f"{seg['begin']} {seg['text']}\r\n"
+ return flask.Response(result.encode('utf-8'),
+ mimetype='text/plain;charset=UTF-8')