aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--requirements.txt1
-rw-r--r--settings.py12
-rw-r--r--youtube/templates/watch.html40
-rw-r--r--youtube/util.py45
-rw-r--r--youtube/watch.py93
-rw-r--r--youtube/yt_data_extract/watch_extraction.py20
-rw-r--r--youtube/ytdlp_integration.py78
-rw-r--r--youtube/ytdlp_proxy.py99
-rw-r--r--youtube/ytdlp_service.py393
9 files changed, 84 insertions, 697 deletions
diff --git a/requirements.txt b/requirements.txt
index eed3186..291bc74 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,5 +8,4 @@ urllib3>=1.24.1
defusedxml>=0.5.0
cachetools>=4.0.0
stem>=1.8.0
-yt-dlp>=2026.01.01
requests>=2.25.0
diff --git a/settings.py b/settings.py
index 27cfd7d..2aecc87 100644
--- a/settings.py
+++ b/settings.py
@@ -340,15 +340,6 @@ Archive: https://archive.ph/OZQbN''',
'hidden': True,
}),
- ('ytdlp_enabled', {
- 'type': bool,
- 'default': True,
- 'comment': '''Enable yt-dlp integration for multi-language audio and subtitles''',
- 'hidden': False,
- 'label': 'Enable yt-dlp integration',
- 'category': 'playback',
- }),
-
('settings_version', {
'type': int,
'default': 6,
@@ -359,7 +350,8 @@ Archive: https://archive.ph/OZQbN''',
program_directory = os.path.dirname(os.path.realpath(__file__))
acceptable_targets = SETTINGS_INFO.keys() | {
- 'enable_comments', 'enable_related_videos', 'preferred_video_codec'
+ 'enable_comments', 'enable_related_videos', 'preferred_video_codec',
+ 'ytdlp_enabled',
}
diff --git a/youtube/templates/watch.html b/youtube/templates/watch.html
index 7432bde..5ff31cb 100644
--- a/youtube/templates/watch.html
+++ b/youtube/templates/watch.html
@@ -86,15 +86,6 @@
{% endfor %}
</select>
- {% if audio_tracks and audio_tracks|length > 1 %}
- <select id="audio-language-select" autocomplete="off" title="Audio language">
- {% for track in audio_tracks %}
- <option value="{{ track.get('track_id', track['language']) }}" {{ 'selected' if loop.index0 == 0 else '' }}>
- 🔊 {{ track['language_name'] }}{% if track.get('is_default') %} (Default){% endif %}
- </option>
- {% endfor %}
- </select>
- {% endif %}
{% endif %}
</div>
<input class="v-checkbox" name="video_info_list" value="{{ video_info }}" form="playlist-edit" type="checkbox">
@@ -257,37 +248,6 @@
// @license-end
</script>
- <!-- Audio language selector handler -->
- <script>
- // @license magnet:?xt=urn:btih:0b31508aeb0634b347b8270c7bee4d411b5d4109&dn=agpl-3.0.txt AGPL-v3-or-Later
- (function() {
- 'use strict';
- const audioSelect = document.getElementById('audio-language-select');
- const qualitySelect = document.getElementById('quality-select');
-
- if (audioSelect && qualitySelect) {
- audioSelect.addEventListener('change', function() {
- const selectedAudio = this.value;
- const selectedQuality = qualitySelect.value;
-
- // Parse current quality selection
- let qualityData;
- try {
- qualityData = JSON.parse(selectedQuality);
- } catch(e) {
- return;
- }
-
- // Reload video with new audio language
- const currentUrl = new URL(window.location.href);
- currentUrl.searchParams.set('audio_lang', selectedAudio);
- window.location.href = currentUrl.toString();
- });
- }
- }());
- // @license-end
- </script>
-
<script src="/youtube.com/static/js/common.js"></script>
<script src="/youtube.com/static/js/transcript-table.js"></script>
{% if settings.use_video_player == 2 %}
diff --git a/youtube/util.py b/youtube/util.py
index ae948ae..ebb5307 100644
--- a/youtube/util.py
+++ b/youtube/util.py
@@ -367,34 +367,25 @@ def fetch_url(url, headers=(), timeout=15, report_text=None, data=None,
response.getheader('Set-Cookie') or '')
ip = ip.group(1) if ip else None
- # If this is the last attempt, raise error
+ # Without Tor, no point retrying with same IP
+ if not use_tor or not settings.route_tor:
+ logger.warning('Rate limited (429). Enable Tor routing to retry with new IP.')
+ raise FetchError('429', reason=response.reason, ip=ip)
+
+ # Tor: exhausted retries
if attempt >= max_retries - 1:
- if not use_tor or not settings.route_tor:
- logger.warning(f'YouTube returned 429 but Tor is not enabled. Consider enabling Tor routing.')
- raise FetchError('429', reason=response.reason, ip=ip)
- else:
- # Tor is enabled but we've exhausted retries
- logger.error(f'YouTube blocked request - Tor exit node overutilized after {max_retries} retries. Exit IP: {ip}')
- raise FetchError('429', reason=response.reason, ip=ip,
- error_message='Tor exit node overutilized after multiple retries')
-
- # For Tor: get new identity immediately on 429
- if use_tor and settings.route_tor:
- logger.info(f'YouTube blocked request - Tor exit node overutilized. Exit IP: {ip}. Getting new identity...')
-
- error = tor_manager.new_identity(start_time)
- if error:
- raise FetchError(
- '429', reason=response.reason, ip=ip,
- error_message='Automatic circuit change: ' + error)
- else:
- continue # retry with new identity
-
- # For non-Tor: exponential backoff
- delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
- logger.info(f'Rate limited (429). Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}...')
- time.sleep(delay)
- continue # retry
+ logger.error(f'Rate limited after {max_retries} retries. Exit IP: {ip}')
+ raise FetchError('429', reason=response.reason, ip=ip,
+ error_message='Tor exit node overutilized after multiple retries')
+
+ # Tor: get new identity and retry
+ logger.info(f'Rate limited. Getting new Tor identity... (IP: {ip})')
+ error = tor_manager.new_identity(start_time)
+ if error:
+ raise FetchError(
+ '429', reason=response.reason, ip=ip,
+ error_message='Automatic circuit change: ' + error)
+ continue # retry with new identity
# Check for client errors (400, 404) - don't retry these
if response.status == 400:
diff --git a/youtube/watch.py b/youtube/watch.py
index b76a462..2fbc1fc 100644
--- a/youtube/watch.py
+++ b/youtube/watch.py
@@ -180,8 +180,34 @@ def make_caption_src(info, lang, auto=False, trans_lang=None):
label += ' (Automatic)'
if trans_lang:
label += ' -> ' + trans_lang
+
+ # Try to use Android caption URL directly (no PO Token needed)
+ caption_url = None
+ for track in info.get('_android_caption_tracks', []):
+ track_lang = track.get('languageCode', '')
+ track_kind = track.get('kind', '')
+ if track_lang == lang and (
+ (auto and track_kind == 'asr') or
+ (not auto and track_kind != 'asr')
+ ):
+ caption_url = track.get('baseUrl')
+ break
+
+ if caption_url:
+ # Add format
+ if '&fmt=' in caption_url:
+ caption_url = re.sub(r'&fmt=[^&]*', '&fmt=vtt', caption_url)
+ else:
+ caption_url += '&fmt=vtt'
+ if trans_lang:
+ caption_url += '&tlang=' + trans_lang
+ url = util.prefix_url(caption_url)
+ else:
+ # Fallback to old method
+ url = util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang))
+
return {
- 'url': util.prefix_url(yt_data_extract.get_caption_url(info, lang, 'vtt', auto, trans_lang)),
+ 'url': url,
'label': label,
'srclang': trans_lang[0:2] if trans_lang else lang[0:2],
'on': False,
@@ -387,6 +413,19 @@ def extract_info(video_id, use_invidious, playlist_id=None, index=None):
info = tasks[0].value or {}
player_response = tasks[1].value or {}
+ # Save android_vr caption tracks (no PO Token needed for these URLs)
+ if isinstance(player_response, str):
+ try:
+ pr_data = json.loads(player_response)
+ except Exception:
+ pr_data = {}
+ else:
+ pr_data = player_response or {}
+ android_caption_tracks = yt_data_extract.deep_get(
+ pr_data, 'captions', 'playerCaptionsTracklistRenderer',
+ 'captionTracks', default=[])
+ info['_android_caption_tracks'] = android_caption_tracks
+
yt_data_extract.update_with_new_urls(info, player_response)
# Fallback to 'ios' if no valid URLs are found
@@ -696,30 +735,6 @@ def get_watch_page(video_id=None):
pair_sources = source_info['pair_sources']
uni_idx, pair_idx = source_info['uni_idx'], source_info['pair_idx']
- # Extract audio tracks using yt-dlp for multi-language support
- audio_tracks = []
- try:
- from youtube import ytdlp_integration
- logger.info(f'Extracting audio tracks for video: {video_id}')
- ytdlp_info = ytdlp_integration.extract_video_info_ytdlp(video_id)
- audio_tracks = ytdlp_info.get('audio_tracks', [])
-
- if audio_tracks:
- logger.info(f'✓ Found {len(audio_tracks)} audio tracks:')
- for i, track in enumerate(audio_tracks[:10], 1): # Log first 10
- logger.info(f' [{i}] {track["language_name"]} ({track["language"]}) - '
- f'bitrate: {track.get("audio_bitrate", "N/A")}k, '
- f'codec: {track.get("acodec", "N/A")}, '
- f'format_id: {track.get("format_id", "N/A")}')
- if len(audio_tracks) > 10:
- logger.info(f' ... and {len(audio_tracks) - 10} more')
- else:
- logger.warning(f'No audio tracks found for video {video_id}')
-
- except Exception as e:
- logger.error(f'Failed to extract audio tracks: {e}', exc_info=True)
- audio_tracks = []
-
pair_quality = yt_data_extract.deep_get(pair_sources, pair_idx, 'quality')
uni_quality = yt_data_extract.deep_get(uni_sources, uni_idx, 'quality')
@@ -843,9 +858,7 @@ def get_watch_page(video_id=None):
'playlist': info['playlist'],
'related': info['related_videos'],
'playability_error': info['playability_error'],
- 'audio_tracks': audio_tracks,
},
- audio_tracks = audio_tracks,
font_family = youtube.font_choices[settings.font], # for embed page
**source_info,
using_pair_sources = using_pair_sources,
@@ -854,16 +867,13 @@ def get_watch_page(video_id=None):
@yt_app.route('/api/<path:dummy>')
def get_captions(dummy):
+ url = 'https://www.youtube.com' + request.full_path
try:
- result = util.fetch_url('https://www.youtube.com' + request.full_path)
+ result = util.fetch_url(url, headers=util.mobile_ua)
result = result.replace(b"align:start position:0%", b"")
- return result
- except util.FetchError as e:
- # Return empty captions gracefully instead of error page
- logger.warning(f'Failed to fetch captions: {e}')
- return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
+ return flask.Response(result, mimetype='text/vtt')
except Exception as e:
- logger.error(f'Unexpected error fetching captions: {e}')
+ logger.debug(f'Caption fetch failed: {e}')
return flask.Response(b'WEBVTT\n\n', mimetype='text/vtt', status=200)
@@ -929,18 +939,3 @@ def get_transcript(caption_path):
return flask.Response(result.encode('utf-8'),
mimetype='text/plain;charset=UTF-8')
-
-
-# ============================================================================
-# yt-dlp Integration Routes
-# ============================================================================
-
-@yt_app.route('/ytl-api/video-with-audio/<video_id>')
-def proxy_video_with_audio(video_id):
- """
- Proxy para servir video con audio específico usando yt-dlp
- """
- from youtube import ytdlp_proxy
- audio_lang = request.args.get('lang', 'en')
- max_quality = int(request.args.get('quality', 720))
- return ytdlp_proxy.stream_video_with_audio(video_id, audio_lang, max_quality)
diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py
index e09e2d3..85c8100 100644
--- a/youtube/yt_data_extract/watch_extraction.py
+++ b/youtube/yt_data_extract/watch_extraction.py
@@ -628,6 +628,7 @@ def extract_watch_info(polymer_json):
info['manual_caption_languages'] = []
info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url
info['translation_languages'] = []
+ info['_caption_track_urls'] = {} # lang_code -> full baseUrl from player response
captions_info = player_response.get('captions', {})
info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
# Sometimes the above playerCaptionsRender is randomly missing
@@ -658,6 +659,10 @@ def extract_watch_info(polymer_json):
else:
info['manual_caption_languages'].append(lang_code)
base_url = caption_track.get('baseUrl', '')
+ # Store the full URL from the player response (includes valid tokens)
+ if base_url:
+ normalized = normalize_url(base_url) if base_url.startswith('/') or not base_url.startswith('http') else base_url
+ info['_caption_track_urls'][lang_code + ('_asr' if caption_track.get('kind') == 'asr' else '')] = normalized
lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
if lang_name:
info['_manual_caption_language_names'][lang_code] = lang_name
@@ -825,6 +830,21 @@ def captions_available(info):
def get_caption_url(info, language, format, automatic=False, translation_language=None):
'''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.'''
+ # Try to use the direct URL from the player response first (has valid tokens)
+ track_key = language + ('_asr' if automatic else '')
+ direct_url = info.get('_caption_track_urls', {}).get(track_key)
+ if direct_url:
+ url = direct_url
+ # Override format
+ if '&fmt=' in url:
+ url = re.sub(r'&fmt=[^&]*', '&fmt=' + format, url)
+ else:
+ url += '&fmt=' + format
+ if translation_language:
+ url += '&tlang=' + translation_language
+ return url
+
+ # Fallback to base_url construction
url = info['_captions_base_url']
if not url:
return None
diff --git a/youtube/ytdlp_integration.py b/youtube/ytdlp_integration.py
deleted file mode 100644
index f520e64..0000000
--- a/youtube/ytdlp_integration.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#!/usr/bin/env python3
-"""
-yt-dlp integration wrapper for backward compatibility.
-
-This module now uses the centralized ytdlp_service for all operations.
-"""
-import logging
-from youtube.ytdlp_service import (
- extract_video_info,
- get_language_name,
- clear_cache,
- get_cache_info,
-)
-
-logger = logging.getLogger(__name__)
-
-
-def extract_video_info_ytdlp(video_id):
- """
- Extract video information using yt-dlp (with caching).
-
- This is a wrapper around ytdlp_service.extract_video_info()
- for backward compatibility.
-
- Args:
- video_id: YouTube video ID
-
- Returns:
- Dictionary with audio_tracks, formats, title, duration
- """
- logger.debug(f'Extracting video info (legacy API): {video_id}')
-
- info = extract_video_info(video_id)
-
- # Convert to legacy format for backward compatibility
- return {
- 'audio_tracks': info.get('audio_tracks', []),
- 'all_audio_formats': info.get('formats', []),
- 'formats': info.get('formats', []),
- 'title': info.get('title', ''),
- 'duration': info.get('duration', 0),
- 'error': info.get('error'),
- }
-
-
-def get_audio_formats_for_language(video_id, language='en'):
- """
- Get available audio formats for a specific language.
-
- Args:
- video_id: YouTube video ID
- language: Language code (default: 'en')
-
- Returns:
- List of audio format dicts
- """
- info = extract_video_info_ytdlp(video_id)
-
- if 'error' in info:
- logger.warning(f'Cannot get audio formats: {info["error"]}')
- return []
-
- audio_formats = []
- for track in info.get('audio_tracks', []):
- if track['language'] == language:
- audio_formats.append(track)
-
- logger.debug(f'Found {len(audio_formats)} {language} audio formats')
- return audio_formats
-
-
-__all__ = [
- 'extract_video_info_ytdlp',
- 'get_audio_formats_for_language',
- 'get_language_name',
- 'clear_cache',
- 'get_cache_info',
-]
diff --git a/youtube/ytdlp_proxy.py b/youtube/ytdlp_proxy.py
deleted file mode 100644
index 023e278..0000000
--- a/youtube/ytdlp_proxy.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env python3
-"""
-Proxy for serving videos with specific audio using yt-dlp.
-
-This module provides streaming functionality for unified formats
-with specific audio languages.
-"""
-import logging
-from flask import Response, request, stream_with_context
-import urllib.request
-import urllib.error
-from youtube.ytdlp_service import find_best_unified_format
-
-logger = logging.getLogger(__name__)
-
-
-def stream_video_with_audio(video_id: str, audio_language: str = 'en', max_quality: int = 720):
- """
- Stream video with specific audio language.
-
- Args:
- video_id: YouTube video ID
- audio_language: Preferred audio language (default: 'en')
- max_quality: Maximum video height (default: 720)
-
- Returns:
- Flask Response with video stream, or 404 if not available
- """
- logger.info(f'Stream request: {video_id} | audio={audio_language} | quality={max_quality}p')
-
- # Find best unified format
- best_format = find_best_unified_format(video_id, audio_language, max_quality)
-
- if not best_format:
- logger.info(f'No suitable unified format found, returning 404 to trigger fallback')
- return Response('No suitable unified format available', status=404)
-
- url = best_format.get('url')
- if not url:
- logger.error('Format found but no URL available')
- return Response('Format URL not available', status=500)
-
- logger.debug(f'Streaming from: {url[:80]}...')
-
- # Stream the video
- try:
- req = urllib.request.Request(url)
- req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
- req.add_header('Accept', '*/*')
-
- # Add Range header if client requests it
- if 'Range' in request.headers:
- req.add_header('Range', request.headers['Range'])
- logger.debug(f'Range request: {request.headers["Range"]}')
-
- resp = urllib.request.urlopen(req, timeout=60)
-
- def generate():
- """Generator for streaming video chunks."""
- try:
- while True:
- chunk = resp.read(65536) # 64KB chunks
- if not chunk:
- break
- yield chunk
- except Exception as e:
- logger.error(f'Stream error: {e}')
- raise
-
- # Build response headers
- response_headers = {
- 'Content-Type': resp.headers.get('Content-Type', 'video/mp4'),
- 'Access-Control-Allow-Origin': '*',
- }
-
- # Copy important headers
- for header in ['Content-Length', 'Content-Range', 'Accept-Ranges']:
- if header in resp.headers:
- response_headers[header] = resp.headers[header]
-
- status_code = resp.getcode()
- logger.info(f'Streaming started: {status_code}')
-
- return Response(
- stream_with_context(generate()),
- status=status_code,
- headers=response_headers,
- direct_passthrough=True
- )
-
- except urllib.error.HTTPError as e:
- logger.error(f'HTTP error streaming: {e.code} {e.reason}')
- return Response(f'Error: {e.code} {e.reason}', status=e.code)
- except urllib.error.URLError as e:
- logger.error(f'URL error streaming: {e.reason}')
- return Response(f'Network error: {e.reason}', status=502)
- except Exception as e:
- logger.error(f'Streaming error: {e}', exc_info=True)
- return Response(f'Error: {e}', status=500)
diff --git a/youtube/ytdlp_service.py b/youtube/ytdlp_service.py
deleted file mode 100644
index 994cec4..0000000
--- a/youtube/ytdlp_service.py
+++ /dev/null
@@ -1,393 +0,0 @@
-#!/usr/bin/env python3
-"""
-Centralized yt-dlp integration with caching, logging, and error handling.
-
-This module provides a clean interface for yt-dlp functionality:
-- Multi-language audio track extraction
-- Subtitle extraction
-- Age-restricted video support
-
-All yt-dlp usage should go through this module for consistency.
-"""
-import logging
-from functools import lru_cache
-from typing import Dict, List, Optional, Any
-import yt_dlp
-import settings
-
-logger = logging.getLogger(__name__)
-
-# Language name mapping
-LANGUAGE_NAMES = {
- 'en': 'English',
- 'es': 'Español',
- 'fr': 'Français',
- 'de': 'Deutsch',
- 'it': 'Italiano',
- 'pt': 'Português',
- 'ru': 'Русский',
- 'ja': '日本語',
- 'ko': '한국어',
- 'zh': '中文',
- 'ar': 'العربية',
- 'hi': 'हिन्दी',
- 'und': 'Unknown',
- 'zxx': 'No linguistic content',
-}
-
-
-def get_language_name(lang_code: str) -> str:
- """Convert ISO 639-1/2 language code to readable name."""
- if not lang_code:
- return 'Unknown'
- return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper())
-
-
-def _get_ytdlp_config() -> Dict[str, Any]:
- """Get yt-dlp configuration from settings."""
- config = {
- 'quiet': True,
- 'no_warnings': True,
- 'extract_flat': False,
- 'format': 'best',
- 'skip_download': True,
- 'socket_timeout': 30,
- 'extractor_retries': 3,
- 'http_chunk_size': 10485760, # 10MB
- }
-
- # Configure Tor proxy if enabled
- if settings.route_tor:
- config['proxy'] = 'socks5://127.0.0.1:9150'
- logger.debug('Tor proxy enabled for yt-dlp')
-
- # Use cookies if available
- import os
- cookies_file = 'youtube_cookies.txt'
- if os.path.exists(cookies_file):
- config['cookiefile'] = cookies_file
- logger.debug('Using cookies file for yt-dlp')
-
- return config
-
-
-@lru_cache(maxsize=128)
-def extract_video_info(video_id: str) -> Dict[str, Any]:
- """
- Extract video information using yt-dlp with caching.
-
- Args:
- video_id: YouTube video ID
-
- Returns:
- Dictionary with video information including audio tracks
-
- Caching:
- Results are cached to avoid repeated requests to YouTube.
- Cache size is limited to prevent memory issues.
- """
- # Check if yt-dlp is enabled
- if not getattr(settings, 'ytdlp_enabled', True):
- logger.debug('yt-dlp integration is disabled')
- return {'error': 'yt-dlp disabled', 'audio_tracks': []}
-
- url = f'https://www.youtube.com/watch?v={video_id}'
- ydl_opts = _get_ytdlp_config()
-
- try:
- logger.debug(f'Extracting video info: {video_id}')
-
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
- info = ydl.extract_info(url, download=False)
-
- if not info:
- logger.warning(f'No info returned for video: {video_id}')
- return {'error': 'No info returned', 'audio_tracks': []}
-
- logger.info(f'Extracted {len(info.get("formats", []))} total formats')
-
- # Extract audio tracks grouped by language
- audio_tracks = _extract_audio_tracks(info)
-
- return {
- 'video_id': video_id,
- 'title': info.get('title', ''),
- 'duration': info.get('duration', 0),
- 'audio_tracks': audio_tracks,
- 'formats': info.get('formats', []),
- 'subtitles': info.get('subtitles', {}),
- 'automatic_captions': info.get('automatic_captions', {}),
- }
-
- except yt_dlp.utils.DownloadError as e:
- logger.error(f'yt-dlp download error for {video_id}: {e}')
- return {'error': str(e), 'audio_tracks': []}
- except Exception as e:
- logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True)
- return {'error': str(e), 'audio_tracks': []}
-
-
-def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- Extract audio tracks from video info, grouped by language.
-
- Returns a list of unique audio tracks (one per language),
- keeping the highest quality for each language.
- """
- audio_by_language = {}
- all_formats = info.get('formats', [])
-
- logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks')
-
- for fmt in all_formats:
- # Only audio-only formats
- has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
- has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
-
- if not has_audio or has_video:
- continue
-
- # Extract language information
- lang = (
- fmt.get('language') or
- fmt.get('audio_language') or
- fmt.get('lang') or
- 'und'
- )
-
- # Get language name
- lang_name = (
- fmt.get('language_name') or
- fmt.get('lang_name') or
- get_language_name(lang)
- )
-
- # Get bitrate
- bitrate = fmt.get('abr') or fmt.get('tbr') or 0
-
- # Create track info
- track_info = {
- 'language': lang,
- 'language_name': lang_name,
- 'format_id': str(fmt.get('format_id', '')),
- 'itag': str(fmt.get('format_id', '')),
- 'ext': fmt.get('ext'),
- 'acodec': fmt.get('acodec'),
- 'audio_bitrate': int(bitrate) if bitrate else 0,
- 'audio_sample_rate': fmt.get('asr'),
- 'url': fmt.get('url'),
- 'filesize': fmt.get('filesize'),
- }
-
- # Keep best quality per language
- lang_key = lang.lower()
- if lang_key not in audio_by_language:
- audio_by_language[lang_key] = track_info
- logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k')
- else:
- current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
- if bitrate > current_bitrate:
- logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k')
- audio_by_language[lang_key] = track_info
-
- # Convert to list and sort
- audio_tracks = list(audio_by_language.values())
-
- # Sort: English first, then by bitrate (descending)
- audio_tracks.sort(
- key=lambda x: (
- 0 if x['language'] == 'en' else 1,
- -x.get('audio_bitrate', 0)
- )
- )
-
- logger.info(f'Extracted {len(audio_tracks)} unique audio languages')
- for track in audio_tracks[:5]: # Log first 5
- logger.info(f' → {track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k')
-
- return audio_tracks
-
-
-def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
- """
- Get subtitle URL for a specific language.
-
- Args:
- video_id: YouTube video ID
- lang: Language code (default: 'en')
-
- Returns:
- URL to subtitle file, or None if not available
- """
- info = extract_video_info(video_id)
-
- if 'error' in info:
- logger.warning(f'Cannot get subtitles: {info["error"]}')
- return None
-
- # Try manual subtitles first
- subtitles = info.get('subtitles', {})
- if lang in subtitles:
- for sub in subtitles[lang]:
- if sub.get('ext') == 'vtt':
- logger.debug(f'Found manual {lang} subtitle')
- return sub.get('url')
-
- # Try automatic captions
- auto_captions = info.get('automatic_captions', {})
- if lang in auto_captions:
- for sub in auto_captions[lang]:
- if sub.get('ext') == 'vtt':
- logger.debug(f'Found automatic {lang} subtitle')
- return sub.get('url')
-
- logger.debug(f'No {lang} subtitle found')
- return None
-
-
-def find_best_unified_format(
- video_id: str,
- audio_language: str = 'en',
- max_quality: int = 720
-) -> Optional[Dict[str, Any]]:
- """
- Find best unified (video+audio) format for specific language and quality.
-
- Args:
- video_id: YouTube video ID
- audio_language: Preferred audio language
- max_quality: Maximum video height (e.g., 720, 1080)
-
- Returns:
- Format dict if found, None otherwise
- """
- info = extract_video_info(video_id)
-
- if 'error' in info or not info.get('formats'):
- return None
-
- # Quality thresholds (minimum acceptable height as % of requested)
- thresholds = {
- 2160: 0.85,
- 1440: 0.80,
- 1080: 0.70,
- 720: 0.70,
- 480: 0.60,
- 360: 0.50,
- }
-
- # Get threshold for requested quality
- threshold = 0.70
- for q, t in thresholds.items():
- if max_quality >= q:
- threshold = t
- break
-
- min_height = int(max_quality * threshold)
- logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
-
- candidates = []
- audio_lang_lower = audio_language.lower()
-
- for fmt in info['formats']:
- # Must have both video and audio
- has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
- has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
-
- if not (has_video and has_audio):
- continue
-
- # Skip HLS/DASH formats
- protocol = fmt.get('protocol', '')
- format_id = str(fmt.get('format_id', ''))
-
- if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
- continue
- if format_id.startswith('9'): # HLS formats
- continue
-
- height = fmt.get('height', 0)
- if height < min_height:
- continue
-
- # Language matching
- lang = (
- fmt.get('language') or
- fmt.get('audio_language') or
- 'en'
- ).lower()
-
- lang_match = (
- lang == audio_lang_lower or
- lang.startswith(audio_lang_lower[:2]) or
- audio_lang_lower.startswith(lang[:2])
- )
-
- if not lang_match:
- continue
-
- # Calculate score
- score = 0
-
- # Language match bonus
- if lang == audio_lang_lower:
- score += 10000
- elif lang.startswith(audio_lang_lower[:2]):
- score += 8000
- else:
- score += 5000
-
- # Quality score
- quality_diff = abs(height - max_quality)
- if height >= max_quality:
- score += 3000 - quality_diff
- else:
- score += 2000 - quality_diff
-
- # Protocol preference
- if protocol in ('https', 'http'):
- score += 500
-
- # Format preference
- if fmt.get('ext') == 'mp4':
- score += 100
-
- candidates.append({
- 'format': fmt,
- 'score': score,
- 'height': height,
- 'lang': lang,
- })
-
- if not candidates:
- logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
- return None
-
- # Sort by score and return best
- candidates.sort(key=lambda x: x['score'], reverse=True)
- best = candidates[0]
-
- logger.info(
- f'Selected unified format: {best["format"].get("format_id")} | '
- f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
- )
-
- return best['format']
-
-
-def clear_cache():
- """Clear the video info cache."""
- extract_video_info.cache_clear()
- logger.info('yt-dlp cache cleared')
-
-
-def get_cache_info() -> Dict[str, Any]:
- """Get cache statistics."""
- cache_info = extract_video_info.cache_info()
- return {
- 'hits': cache_info.hits,
- 'misses': cache_info.misses,
- 'size': cache_info.currsize,
- 'maxsize': cache_info.maxsize,
- 'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0,
- }