diff options
| -rw-r--r-- | requirements.txt | 1 | ||||
| -rw-r--r-- | settings.py | 12 | ||||
| -rw-r--r-- | youtube/templates/watch.html | 40 | ||||
| -rw-r--r-- | youtube/util.py | 45 | ||||
| -rw-r--r-- | youtube/watch.py | 93 | ||||
| -rw-r--r-- | youtube/yt_data_extract/watch_extraction.py | 20 | ||||
| -rw-r--r-- | youtube/ytdlp_integration.py | 78 | ||||
| -rw-r--r-- | youtube/ytdlp_proxy.py | 99 | ||||
| -rw-r--r-- | youtube/ytdlp_service.py | 393 |
9 files changed, 84 insertions, 697 deletions
diff --git a/requirements.txt b/requirements.txt index eed3186..291bc74 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,5 +8,4 @@ urllib3>=1.24.1 defusedxml>=0.5.0 cachetools>=4.0.0 stem>=1.8.0 -yt-dlp>=2026.01.01 requests>=2.25.0 diff --git a/settings.py b/settings.py index 27cfd7d..2aecc87 100644 --- a/settings.py +++ b/settings.py @@ -340,15 +340,6 @@ Archive: https://archive.ph/OZQbN''', 'hidden': True, }), - ('ytdlp_enabled', { - 'type': bool, - 'default': True, - 'comment': '''Enable yt-dlp integration for multi-language audio and subtitles''', - 'hidden': False, - 'label': 'Enable yt-dlp integration', - 'category': 'playback', - }), - ('settings_version', { 'type': int, 'default': 6, @@ -359,7 +350,8 @@ Archive: https://archive.ph/OZQbN''', program_directory = os.path.dirname(os.path.realpath(__file__)) acceptable_targets = SETTINGS_INFO.keys() | { - 'enable_comments', 'enable_related_videos', 'preferred_video_codec' + 'enable_comments', 'enable_related_videos', 'preferred_video_codec', + 'ytdlp_enabled', } diff --git a/youtube/templates/watch.html b/youtube/templates/watch.html index 7432bde..5ff31cb 100644 --- a/youtube/templates/watch.html +++ b/youtube/templates/watch.html @@ -86,15 +86,6 @@ {% endfor %} </select> - {% if audio_tracks and audio_tracks|length > 1 %} - <select id="audio-language-select" autocomplete="off" title="Audio language"> - {% for track in audio_tracks %} - <option value="{{ track.get('track_id', track['language']) }}" {{ 'selected' if loop.index0 == 0 else '' }}> - 🔊 {{ track['language_name'] }}{% if track.get('is_default') %} (Default){% endif %} - </option> - {% endfor %} - </select> - {% endif %} {% endif %} </div> <input class="v-checkbox" name="video_info_list" value="{{ video_info }}" form="playlist-edit" type="checkbox"> @@ -257,37 +248,6 @@ // @license-end </script> - <!-- Audio language selector handler --> - <script> - // @license magnet:?xt=urn:btih:0b31508aeb0634b347b8270c7bee4d411b5d4109&dn=agpl-3.0.txt AGPL-v3-or-Later - (function() { - 'use strict'; - const audioSelect = document.getElementById('audio-language-select'); - const qualitySelect = document.getElementById('quality-select'); - - if (audioSelect && qualitySelect) { - audioSelect.addEventListener('change', function() { - const selectedAudio = this.value; - const selectedQuality = qualitySelect.value; - - // Parse current quality selection - let qualityData; - try { - qualityData = JSON.parse(selectedQuality); - } catch(e) { - return; - } - - // Reload video with new audio language - const currentUrl = new URL(window.location.href); - currentUrl.searchParams.set('audio_lang', selectedAudio); - window.location.href = currentUrl.toString(); - }); - } - }()); - // @license-end - </script> - <script src="/youtube.com/static/js/common.js"></script> <script src="/youtube.com/static/js/transcript-table.js"></script> {% if settings.use_video_player == 2 %} diff --git a/youtube/util.py b/youtube/util.py index ae948ae..ebb5307 100644 --- a/youtube/util.py +++ b/youtube/util.py @@ -367,34 +367,25 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, response.getheader('Set-Cookie') or '') ip = ip.group(1) if ip else None - # If this is the last attempt, raise error + # Without Tor, no point retrying with same IP + if not use_tor or not settings.route_tor: + logger.warning('Rate limited (429). Enable Tor routing to retry with new IP.') + raise FetchError('429', reason=response.reason, ip=ip) + + # Tor: exhausted retries if attempt >= max_retries - 1: - if not use_tor or not settings.route_tor: - logger.warning(f'YouTube returned 429 but Tor is not enabled. Consider enabling Tor routing.') - raise FetchError('429', reason=response.reason, ip=ip) - else: - # Tor is enabled but we've exhausted retries - logger.error(f'YouTube blocked request - Tor exit node overutilized after {max_retries} retries. Exit IP: {ip}') - raise FetchError('429', reason=response.reason, ip=ip, - error_message='Tor exit node overutilized after multiple retries') - - # For Tor: get new identity immediately on 429 - if use_tor and settings.route_tor: - logger.info(f'YouTube blocked request - Tor exit node overutilized. Exit IP: {ip}. Getting new identity...') - - error = tor_manager.new_identity(start_time) - if error: - raise FetchError( - '429', reason=response.reason, ip=ip, - error_message='Automatic circuit change: ' + error) - else: - continue # retry with new identity - - # For non-Tor: exponential backoff - delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1) - logger.info(f'Rate limited (429). Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...') - time.sleep(delay) - continue # retry + logger.error(f'Rate limited after {max_retries} retries. Exit IP: {ip}') + raise FetchError('429', reason=response.reason, ip=ip, + error_message='Tor exit node overutilized after multiple retries') + + # Tor: get new identity and retry + logger.info(f'Rate limited. Getting new Tor identity... (IP: {ip})') + error = tor_manager.new_identity(start_time) + if error: + raise FetchError( + '429', reason=response.reason, ip=ip, + error_message='Automatic circuit change: ' + error) + continue # retry with new identity # Check for client errors (400, 404) - don't retry these if response.status == 400: diff --git a/youtube/watch.py b/youtube/watch.py index b76a462..2fbc1fc 100644 --- a/youtube/watch.py +++ b/youtube/watch.py @@ -180,8 +180,34 @@ def make_caption_src(info, lang, auto=False, trans_lang=None): label += ' (Automatic)' if trans_lang: label += ' -> ' + trans_lang + + # Try to use Android caption URL directly (no PO Token needed) + caption_url = None + for track in info.get('_android_caption_tracks', []): + track_lang = track.get('languageCode', '') + track_kind = track.get('kind', '') + if track_lang == lang and ( + (auto and track_kind == 'asr') or + (not auto and track_kind != 'asr') + ): + caption_url = track.get('baseUrl') + break + + if caption_url: + # Add format + if '&fmt=' in caption_url: + caption_url = re.sub(r'&fmt=[^&]*', '&fmt=vtt', caption_url) + else: + caption_url += '&fmt=vtt' + if trans_lang: + caption_url += '&tlang=' + trans_lang + url = util.prefix_url(caption_url) + else: + # Fallback to old method + url = util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang)) + return { - 'url': util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang)), + 'url': url, 'label': label, 'srclang': trans_lang[0:2] if trans_lang else lang[0:2], 'on': False, @@ -387,6 +413,19 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None): info = tasks[0].value or {} player_response = tasks[1].value or {} + # Save android_vr caption tracks (no PO Token needed for these URLs) + if isinstance(player_response, str): + try: + pr_data = json.loads(player_response) + except Exception: + pr_data = {} + else: + pr_data = player_response or {} + android_caption_tracks = yt_data_extract.deep_get( + pr_data, 'captions', 'playerCaptionsTracklistRenderer', + 'captionTracks', default=[]) + info['_android_caption_tracks'] = android_caption_tracks + yt_data_extract.update_with_new_urls(info, player_response) # Fallback to 'ios' if no valid URLs are found @@ -696,30 +735,6 @@ def get_watch_page(video_id=None): pair_sources = source_info['pair_sources'] uni_idx, pair_idx = source_info['uni_idx'], source_info['pair_idx'] - # Extract audio tracks using yt-dlp for multi-language support - audio_tracks = [] - try: - from youtube import ytdlp_integration - logger.info(f'Extracting audio tracks for video: {video_id}') - ytdlp_info = ytdlp_integration.extract_video_info_ytdlp(video_id) - audio_tracks = ytdlp_info.get('audio_tracks', []) - - if audio_tracks: - logger.info(f'✓ Found {len(audio_tracks)} audio tracks:') - for i, track in enumerate(audio_tracks[:10], 1): # Log first 10 - logger.info(f' [{i}] {track["language_name"]} ({track["language"]}) - ' - f'bitrate: {track.get("audio_bitrate", "N/A")}k, ' - f'codec: {track.get("acodec", "N/A")}, ' - f'format_id: {track.get("format_id", "N/A")}') - if len(audio_tracks) > 10: - logger.info(f' ... and {len(audio_tracks) - 10} more') - else: - logger.warning(f'No audio tracks found for video {video_id}') - - except Exception as e: - logger.error(f'Failed to extract audio tracks: {e}', exc_info=True) - audio_tracks = [] - pair_quality = yt_data_extract.deep_get(pair_sources, pair_idx, 'quality') uni_quality = yt_data_extract.deep_get(uni_sources, uni_idx, 'quality') @@ -843,9 +858,7 @@ def get_watch_page(video_id=None): 'playlist': info['playlist'], 'related': info['related_videos'], 'playability_error': info['playability_error'], - 'audio_tracks': audio_tracks, }, - audio_tracks = audio_tracks, font_family = youtube.font_choices[settings.font], # for embed page **source_info, using_pair_sources = using_pair_sources, @@ -854,16 +867,13 @@ def get_watch_page(video_id=None): @yt_app.route('/api/<path:dummy>') def get_captions(dummy): + url = 'https://www.youtube.com' + request.full_path try: - result = util.fetch_url('https://www.youtube.com' + request.full_path) + result = util.fetch_url(url, headers=util.mobile_ua) result = result.replace(b"align:start position:0%", b"") - return result - except util.FetchError as e: - # Return empty captions gracefully instead of error page - logger.warning(f'Failed to fetch captions: {e}') - return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200) + return flask.Response(result, mimetype='text/vtt') except Exception as e: - logger.error(f'Unexpected error fetching captions: {e}') + logger.debug(f'Caption fetch failed: {e}') return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200) @@ -929,18 +939,3 @@ def get_transcript(caption_path): return flask.Response(result.encode('utf-8'), mimetype='text/plain;charset=UTF-8') - - -# ============================================================================ -# yt-dlp Integration Routes -# ============================================================================ - -@yt_app.route('/ytl-api/video-with-audio/<video_id>') -def proxy_video_with_audio(video_id): - """ - Proxy para servir video con audio específico usando yt-dlp - """ - from youtube import ytdlp_proxy - audio_lang = request.args.get('lang', 'en') - max_quality = int(request.args.get('quality', 720)) - return ytdlp_proxy.stream_video_with_audio(video_id, audio_lang, max_quality) diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py index e09e2d3..85c8100 100644 --- a/youtube/yt_data_extract/watch_extraction.py +++ b/youtube/yt_data_extract/watch_extraction.py @@ -628,6 +628,7 @@ def extract_watch_info(polymer_json): info['manual_caption_languages'] = [] info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url info['translation_languages'] = [] + info['_caption_track_urls'] = {} # lang_code -> full baseUrl from player response captions_info = player_response.get('captions', {}) info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl')) # Sometimes the above playerCaptionsRender is randomly missing @@ -658,6 +659,10 @@ def extract_watch_info(polymer_json): else: info['manual_caption_languages'].append(lang_code) base_url = caption_track.get('baseUrl', '') + # Store the full URL from the player response (includes valid tokens) + if base_url: + normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url + info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0) if lang_name: info['_manual_caption_language_names'][lang_code] = lang_name @@ -825,6 +830,21 @@ def captions_available(info): def get_caption_url(info, language, format, automatic=False, translation_language=None): '''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.''' + # Try to use the direct URL from the player response first (has valid tokens) + track_key = language + ('_asr' if automatic else '') + direct_url = info.get('_caption_track_urls', {}).get(track_key) + if direct_url: + url = direct_url + # Override format + if '&fmt=' in url: + url = re.sub(r'&fmt=[^&]*', '&fmt=' + format, url) + else: + url += '&fmt=' + format + if translation_language: + url += '&tlang=' + translation_language + return url + + # Fallback to base_url construction url = info['_captions_base_url'] if not url: return None diff --git a/youtube/ytdlp_integration.py b/youtube/ytdlp_integration.py deleted file mode 100644 index f520e64..0000000 --- a/youtube/ytdlp_integration.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python3 -""" -yt-dlp integration wrapper for backward compatibility. - -This module now uses the centralized ytdlp_service for all operations. -""" -import logging -from youtube.ytdlp_service import ( - extract_video_info, - get_language_name, - clear_cache, - get_cache_info, -) - -logger = logging.getLogger(__name__) - - -def extract_video_info_ytdlp(video_id): - """ - Extract video information using yt-dlp (with caching). - - This is a wrapper around ytdlp_service.extract_video_info() - for backward compatibility. - - Args: - video_id: YouTube video ID - - Returns: - Dictionary with audio_tracks, formats, title, duration - """ - logger.debug(f'Extracting video info (legacy API): {video_id}') - - info = extract_video_info(video_id) - - # Convert to legacy format for backward compatibility - return { - 'audio_tracks': info.get('audio_tracks', []), - 'all_audio_formats': info.get('formats', []), - 'formats': info.get('formats', []), - 'title': info.get('title', ''), - 'duration': info.get('duration', 0), - 'error': info.get('error'), - } - - -def get_audio_formats_for_language(video_id, language='en'): - """ - Get available audio formats for a specific language. - - Args: - video_id: YouTube video ID - language: Language code (default: 'en') - - Returns: - List of audio format dicts - """ - info = extract_video_info_ytdlp(video_id) - - if 'error' in info: - logger.warning(f'Cannot get audio formats: {info["error"]}') - return [] - - audio_formats = [] - for track in info.get('audio_tracks', []): - if track['language'] == language: - audio_formats.append(track) - - logger.debug(f'Found {len(audio_formats)} {language} audio formats') - return audio_formats - - -__all__ = [ - 'extract_video_info_ytdlp', - 'get_audio_formats_for_language', - 'get_language_name', - 'clear_cache', - 'get_cache_info', -] diff --git a/youtube/ytdlp_proxy.py b/youtube/ytdlp_proxy.py deleted file mode 100644 index 023e278..0000000 --- a/youtube/ytdlp_proxy.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 -""" -Proxy for serving videos with specific audio using yt-dlp. - -This module provides streaming functionality for unified formats -with specific audio languages. -""" -import logging -from flask import Response, request, stream_with_context -import urllib.request -import urllib.error -from youtube.ytdlp_service import find_best_unified_format - -logger = logging.getLogger(__name__) - - -def stream_video_with_audio(video_id: str, audio_language: str = 'en', max_quality: int = 720): - """ - Stream video with specific audio language. - - Args: - video_id: YouTube video ID - audio_language: Preferred audio language (default: 'en') - max_quality: Maximum video height (default: 720) - - Returns: - Flask Response with video stream, or 404 if not available - """ - logger.info(f'Stream request: {video_id} | audio={audio_language} | quality={max_quality}p') - - # Find best unified format - best_format = find_best_unified_format(video_id, audio_language, max_quality) - - if not best_format: - logger.info(f'No suitable unified format found, returning 404 to trigger fallback') - return Response('No suitable unified format available', status=404) - - url = best_format.get('url') - if not url: - logger.error('Format found but no URL available') - return Response('Format URL not available', status=500) - - logger.debug(f'Streaming from: {url[:80]}...') - - # Stream the video - try: - req = urllib.request.Request(url) - req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') - req.add_header('Accept', '*/*') - - # Add Range header if client requests it - if 'Range' in request.headers: - req.add_header('Range', request.headers['Range']) - logger.debug(f'Range request: {request.headers["Range"]}') - - resp = urllib.request.urlopen(req, timeout=60) - - def generate(): - """Generator for streaming video chunks.""" - try: - while True: - chunk = resp.read(65536) # 64KB chunks - if not chunk: - break - yield chunk - except Exception as e: - logger.error(f'Stream error: {e}') - raise - - # Build response headers - response_headers = { - 'Content-Type': resp.headers.get('Content-Type', 'video/mp4'), - 'Access-Control-Allow-Origin': '*', - } - - # Copy important headers - for header in ['Content-Length', 'Content-Range', 'Accept-Ranges']: - if header in resp.headers: - response_headers[header] = resp.headers[header] - - status_code = resp.getcode() - logger.info(f'Streaming started: {status_code}') - - return Response( - stream_with_context(generate()), - status=status_code, - headers=response_headers, - direct_passthrough=True - ) - - except urllib.error.HTTPError as e: - logger.error(f'HTTP error streaming: {e.code} {e.reason}') - return Response(f'Error: {e.code} {e.reason}', status=e.code) - except urllib.error.URLError as e: - logger.error(f'URL error streaming: {e.reason}') - return Response(f'Network error: {e.reason}', status=502) - except Exception as e: - logger.error(f'Streaming error: {e}', exc_info=True) - return Response(f'Error: {e}', status=500) diff --git a/youtube/ytdlp_service.py b/youtube/ytdlp_service.py deleted file mode 100644 index 994cec4..0000000 --- a/youtube/ytdlp_service.py +++ /dev/null @@ -1,393 +0,0 @@ -#!/usr/bin/env python3 -""" -Centralized yt-dlp integration with caching, logging, and error handling. - -This module provides a clean interface for yt-dlp functionality: -- Multi-language audio track extraction -- Subtitle extraction -- Age-restricted video support - -All yt-dlp usage should go through this module for consistency. -""" -import logging -from functools import lru_cache -from typing import Dict, List, Optional, Any -import yt_dlp -import settings - -logger = logging.getLogger(__name__) - -# Language name mapping -LANGUAGE_NAMES = { - 'en': 'English', - 'es': 'Español', - 'fr': 'Français', - 'de': 'Deutsch', - 'it': 'Italiano', - 'pt': 'Português', - 'ru': 'Русский', - 'ja': '日本語', - 'ko': '한국어', - 'zh': '中文', - 'ar': 'العربية', - 'hi': 'हिन्दी', - 'und': 'Unknown', - 'zxx': 'No linguistic content', -} - - -def get_language_name(lang_code: str) -> str: - """Convert ISO 639-1/2 language code to readable name.""" - if not lang_code: - return 'Unknown' - return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper()) - - -def _get_ytdlp_config() -> Dict[str, Any]: - """Get yt-dlp configuration from settings.""" - config = { - 'quiet': True, - 'no_warnings': True, - 'extract_flat': False, - 'format': 'best', - 'skip_download': True, - 'socket_timeout': 30, - 'extractor_retries': 3, - 'http_chunk_size': 10485760, # 10MB - } - - # Configure Tor proxy if enabled - if settings.route_tor: - config['proxy'] = 'socks5://127.0.0.1:9150' - logger.debug('Tor proxy enabled for yt-dlp') - - # Use cookies if available - import os - cookies_file = 'youtube_cookies.txt' - if os.path.exists(cookies_file): - config['cookiefile'] = cookies_file - logger.debug('Using cookies file for yt-dlp') - - return config - - -@lru_cache(maxsize=128) -def extract_video_info(video_id: str) -> Dict[str, Any]: - """ - Extract video information using yt-dlp with caching. - - Args: - video_id: YouTube video ID - - Returns: - Dictionary with video information including audio tracks - - Caching: - Results are cached to avoid repeated requests to YouTube. - Cache size is limited to prevent memory issues. - """ - # Check if yt-dlp is enabled - if not getattr(settings, 'ytdlp_enabled', True): - logger.debug('yt-dlp integration is disabled') - return {'error': 'yt-dlp disabled', 'audio_tracks': []} - - url = f'https://www.youtube.com/watch?v={video_id}' - ydl_opts = _get_ytdlp_config() - - try: - logger.debug(f'Extracting video info: {video_id}') - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(url, download=False) - - if not info: - logger.warning(f'No info returned for video: {video_id}') - return {'error': 'No info returned', 'audio_tracks': []} - - logger.info(f'Extracted {len(info.get("formats", []))} total formats') - - # Extract audio tracks grouped by language - audio_tracks = _extract_audio_tracks(info) - - return { - 'video_id': video_id, - 'title': info.get('title', ''), - 'duration': info.get('duration', 0), - 'audio_tracks': audio_tracks, - 'formats': info.get('formats', []), - 'subtitles': info.get('subtitles', {}), - 'automatic_captions': info.get('automatic_captions', {}), - } - - except yt_dlp.utils.DownloadError as e: - logger.error(f'yt-dlp download error for {video_id}: {e}') - return {'error': str(e), 'audio_tracks': []} - except Exception as e: - logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True) - return {'error': str(e), 'audio_tracks': []} - - -def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]: - """ - Extract audio tracks from video info, grouped by language. - - Returns a list of unique audio tracks (one per language), - keeping the highest quality for each language. - """ - audio_by_language = {} - all_formats = info.get('formats', []) - - logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks') - - for fmt in all_formats: - # Only audio-only formats - has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' - has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' - - if not has_audio or has_video: - continue - - # Extract language information - lang = ( - fmt.get('language') or - fmt.get('audio_language') or - fmt.get('lang') or - 'und' - ) - - # Get language name - lang_name = ( - fmt.get('language_name') or - fmt.get('lang_name') or - get_language_name(lang) - ) - - # Get bitrate - bitrate = fmt.get('abr') or fmt.get('tbr') or 0 - - # Create track info - track_info = { - 'language': lang, - 'language_name': lang_name, - 'format_id': str(fmt.get('format_id', '')), - 'itag': str(fmt.get('format_id', '')), - 'ext': fmt.get('ext'), - 'acodec': fmt.get('acodec'), - 'audio_bitrate': int(bitrate) if bitrate else 0, - 'audio_sample_rate': fmt.get('asr'), - 'url': fmt.get('url'), - 'filesize': fmt.get('filesize'), - } - - # Keep best quality per language - lang_key = lang.lower() - if lang_key not in audio_by_language: - audio_by_language[lang_key] = track_info - logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k') - else: - current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0) - if bitrate > current_bitrate: - logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k') - audio_by_language[lang_key] = track_info - - # Convert to list and sort - audio_tracks = list(audio_by_language.values()) - - # Sort: English first, then by bitrate (descending) - audio_tracks.sort( - key=lambda x: ( - 0 if x['language'] == 'en' else 1, - -x.get('audio_bitrate', 0) - ) - ) - - logger.info(f'Extracted {len(audio_tracks)} unique audio languages') - for track in audio_tracks[:5]: # Log first 5 - logger.info(f' → {track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k') - - return audio_tracks - - -def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]: - """ - Get subtitle URL for a specific language. - - Args: - video_id: YouTube video ID - lang: Language code (default: 'en') - - Returns: - URL to subtitle file, or None if not available - """ - info = extract_video_info(video_id) - - if 'error' in info: - logger.warning(f'Cannot get subtitles: {info["error"]}') - return None - - # Try manual subtitles first - subtitles = info.get('subtitles', {}) - if lang in subtitles: - for sub in subtitles[lang]: - if sub.get('ext') == 'vtt': - logger.debug(f'Found manual {lang} subtitle') - return sub.get('url') - - # Try automatic captions - auto_captions = info.get('automatic_captions', {}) - if lang in auto_captions: - for sub in auto_captions[lang]: - if sub.get('ext') == 'vtt': - logger.debug(f'Found automatic {lang} subtitle') - return sub.get('url') - - logger.debug(f'No {lang} subtitle found') - return None - - -def find_best_unified_format( - video_id: str, - audio_language: str = 'en', - max_quality: int = 720 -) -> Optional[Dict[str, Any]]: - """ - Find best unified (video+audio) format for specific language and quality. - - Args: - video_id: YouTube video ID - audio_language: Preferred audio language - max_quality: Maximum video height (e.g., 720, 1080) - - Returns: - Format dict if found, None otherwise - """ - info = extract_video_info(video_id) - - if 'error' in info or not info.get('formats'): - return None - - # Quality thresholds (minimum acceptable height as % of requested) - thresholds = { - 2160: 0.85, - 1440: 0.80, - 1080: 0.70, - 720: 0.70, - 480: 0.60, - 360: 0.50, - } - - # Get threshold for requested quality - threshold = 0.70 - for q, t in thresholds.items(): - if max_quality >= q: - threshold = t - break - - min_height = int(max_quality * threshold) - logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p') - - candidates = [] - audio_lang_lower = audio_language.lower() - - for fmt in info['formats']: - # Must have both video and audio - has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' - has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' - - if not (has_video and has_audio): - continue - - # Skip HLS/DASH formats - protocol = fmt.get('protocol', '') - format_id = str(fmt.get('format_id', '')) - - if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']): - continue - if format_id.startswith('9'): # HLS formats - continue - - height = fmt.get('height', 0) - if height < min_height: - continue - - # Language matching - lang = ( - fmt.get('language') or - fmt.get('audio_language') or - 'en' - ).lower() - - lang_match = ( - lang == audio_lang_lower or - lang.startswith(audio_lang_lower[:2]) or - audio_lang_lower.startswith(lang[:2]) - ) - - if not lang_match: - continue - - # Calculate score - score = 0 - - # Language match bonus - if lang == audio_lang_lower: - score += 10000 - elif lang.startswith(audio_lang_lower[:2]): - score += 8000 - else: - score += 5000 - - # Quality score - quality_diff = abs(height - max_quality) - if height >= max_quality: - score += 3000 - quality_diff - else: - score += 2000 - quality_diff - - # Protocol preference - if protocol in ('https', 'http'): - score += 500 - - # Format preference - if fmt.get('ext') == 'mp4': - score += 100 - - candidates.append({ - 'format': fmt, - 'score': score, - 'height': height, - 'lang': lang, - }) - - if not candidates: - logger.debug(f'No unified format found for {max_quality}p + {audio_language}') - return None - - # Sort by score and return best - candidates.sort(key=lambda x: x['score'], reverse=True) - best = candidates[0] - - logger.info( - f'Selected unified format: {best["format"].get("format_id")} | ' - f'{best["lang"]} | {best["height"]}p | score={best["score"]}' - ) - - return best['format'] - - -def clear_cache(): - """Clear the video info cache.""" - extract_video_info.cache_clear() - logger.info('yt-dlp cache cleared') - - -def get_cache_info() -> Dict[str, Any]: - """Get cache statistics.""" - cache_info = extract_video_info.cache_info() - return { - 'hits': cache_info.hits, - 'misses': cache_info.misses, - 'size': cache_info.currsize, - 'maxsize': cache_info.maxsize, - 'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0, - } |
