#!/usr/bin/env python3 """ Centralized yt-dlp integration with caching, logging, and error handling. This module provides a clean interface for yt-dlp functionality: - Multi-language audio track extraction - Subtitle extraction - Age-restricted video support All yt-dlp usage should go through this module for consistency. """ import logging from functools import lru_cache from typing import Dict, List, Optional, Any import yt_dlp import settings logger = logging.getLogger(__name__) # Language name mapping LANGUAGE_NAMES = { 'en': 'English', 'es': 'Español', 'fr': 'Français', 'de': 'Deutsch', 'it': 'Italiano', 'pt': 'Português', 'ru': 'Русский', 'ja': '日本語', 'ko': '한국어', 'zh': '中文', 'ar': 'العربية', 'hi': 'हिन्दी', 'und': 'Unknown', 'zxx': 'No linguistic content', } def get_language_name(lang_code: str) -> str: """Convert ISO 639-1/2 language code to readable name.""" if not lang_code: return 'Unknown' return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper()) def _get_ytdlp_config() -> Dict[str, Any]: """Get yt-dlp configuration from settings.""" config = { 'quiet': True, 'no_warnings': True, 'extract_flat': False, 'format': 'best', 'skip_download': True, 'socket_timeout': 30, 'extractor_retries': 3, 'http_chunk_size': 10485760, # 10MB } # Configure Tor proxy if enabled if settings.route_tor: config['proxy'] = 'socks5://127.0.0.1:9150' logger.debug('Tor proxy enabled for yt-dlp') # Use cookies if available import os cookies_file = 'youtube_cookies.txt' if os.path.exists(cookies_file): config['cookiefile'] = cookies_file logger.debug('Using cookies file for yt-dlp') return config @lru_cache(maxsize=128) def extract_video_info(video_id: str) -> Dict[str, Any]: """ Extract video information using yt-dlp with caching. Args: video_id: YouTube video ID Returns: Dictionary with video information including audio tracks Caching: Results are cached to avoid repeated requests to YouTube. Cache size is limited to prevent memory issues. """ # Check if yt-dlp is enabled if not getattr(settings, 'ytdlp_enabled', True): logger.debug('yt-dlp integration is disabled') return {'error': 'yt-dlp disabled', 'audio_tracks': []} url = f'https://www.youtube.com/watch?v={video_id}' ydl_opts = _get_ytdlp_config() try: logger.debug(f'Extracting video info: {video_id}') with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if not info: logger.warning(f'No info returned for video: {video_id}') return {'error': 'No info returned', 'audio_tracks': []} logger.info(f'Extracted {len(info.get("formats", []))} total formats') # Extract audio tracks grouped by language audio_tracks = _extract_audio_tracks(info) return { 'video_id': video_id, 'title': info.get('title', ''), 'duration': info.get('duration', 0), 'audio_tracks': audio_tracks, 'formats': info.get('formats', []), 'subtitles': info.get('subtitles', {}), 'automatic_captions': info.get('automatic_captions', {}), } except yt_dlp.utils.DownloadError as e: logger.error(f'yt-dlp download error for {video_id}: {e}') return {'error': str(e), 'audio_tracks': []} except Exception as e: logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True) return {'error': str(e), 'audio_tracks': []} def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]: """ Extract audio tracks from video info, grouped by language. Returns a list of unique audio tracks (one per language), keeping the highest quality for each language. """ audio_by_language = {} all_formats = info.get('formats', []) logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks') for fmt in all_formats: # Only audio-only formats has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' if not has_audio or has_video: continue # Extract language information lang = ( fmt.get('language') or fmt.get('audio_language') or fmt.get('lang') or 'und' ) # Get language name lang_name = ( fmt.get('language_name') or fmt.get('lang_name') or get_language_name(lang) ) # Get bitrate bitrate = fmt.get('abr') or fmt.get('tbr') or 0 # Create track info track_info = { 'language': lang, 'language_name': lang_name, 'format_id': str(fmt.get('format_id', '')), 'itag': str(fmt.get('format_id', '')), 'ext': fmt.get('ext'), 'acodec': fmt.get('acodec'), 'audio_bitrate': int(bitrate) if bitrate else 0, 'audio_sample_rate': fmt.get('asr'), 'url': fmt.get('url'), 'filesize': fmt.get('filesize'), } # Keep best quality per language lang_key = lang.lower() if lang_key not in audio_by_language: audio_by_language[lang_key] = track_info logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k') else: current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0) if bitrate > current_bitrate: logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k') audio_by_language[lang_key] = track_info # Convert to list and sort audio_tracks = list(audio_by_language.values()) # Sort: English first, then by bitrate (descending) audio_tracks.sort( key=lambda x: ( 0 if x['language'] == 'en' else 1, -x.get('audio_bitrate', 0) ) ) logger.info(f'Extracted {len(audio_tracks)} unique audio languages') for track in audio_tracks[:5]: # Log first 5 logger.info(f' → {track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k') return audio_tracks def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]: """ Get subtitle URL for a specific language. Args: video_id: YouTube video ID lang: Language code (default: 'en') Returns: URL to subtitle file, or None if not available """ info = extract_video_info(video_id) if 'error' in info: logger.warning(f'Cannot get subtitles: {info["error"]}') return None # Try manual subtitles first subtitles = info.get('subtitles', {}) if lang in subtitles: for sub in subtitles[lang]: if sub.get('ext') == 'vtt': logger.debug(f'Found manual {lang} subtitle') return sub.get('url') # Try automatic captions auto_captions = info.get('automatic_captions', {}) if lang in auto_captions: for sub in auto_captions[lang]: if sub.get('ext') == 'vtt': logger.debug(f'Found automatic {lang} subtitle') return sub.get('url') logger.debug(f'No {lang} subtitle found') return None def find_best_unified_format( video_id: str, audio_language: str = 'en', max_quality: int = 720 ) -> Optional[Dict[str, Any]]: """ Find best unified (video+audio) format for specific language and quality. Args: video_id: YouTube video ID audio_language: Preferred audio language max_quality: Maximum video height (e.g., 720, 1080) Returns: Format dict if found, None otherwise """ info = extract_video_info(video_id) if 'error' in info or not info.get('formats'): return None # Quality thresholds (minimum acceptable height as % of requested) thresholds = { 2160: 0.85, 1440: 0.80, 1080: 0.70, 720: 0.70, 480: 0.60, 360: 0.50, } # Get threshold for requested quality threshold = 0.70 for q, t in thresholds.items(): if max_quality >= q: threshold = t break min_height = int(max_quality * threshold) logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p') candidates = [] audio_lang_lower = audio_language.lower() for fmt in info['formats']: # Must have both video and audio has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' if not (has_video and has_audio): continue # Skip HLS/DASH formats protocol = fmt.get('protocol', '') format_id = str(fmt.get('format_id', '')) if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']): continue if format_id.startswith('9'): # HLS formats continue height = fmt.get('height', 0) if height < min_height: continue # Language matching lang = ( fmt.get('language') or fmt.get('audio_language') or 'en' ).lower() lang_match = ( lang == audio_lang_lower or lang.startswith(audio_lang_lower[:2]) or audio_lang_lower.startswith(lang[:2]) ) if not lang_match: continue # Calculate score score = 0 # Language match bonus if lang == audio_lang_lower: score += 10000 elif lang.startswith(audio_lang_lower[:2]): score += 8000 else: score += 5000 # Quality score quality_diff = abs(height - max_quality) if height >= max_quality: score += 3000 - quality_diff else: score += 2000 - quality_diff # Protocol preference if protocol in ('https', 'http'): score += 500 # Format preference if fmt.get('ext') == 'mp4': score += 100 candidates.append({ 'format': fmt, 'score': score, 'height': height, 'lang': lang, }) if not candidates: logger.debug(f'No unified format found for {max_quality}p + {audio_language}') return None # Sort by score and return best candidates.sort(key=lambda x: x['score'], reverse=True) best = candidates[0] logger.info( f'Selected unified format: {best["format"].get("format_id")} | ' f'{best["lang"]} | {best["height"]}p | score={best["score"]}' ) return best['format'] def clear_cache(): """Clear the video info cache.""" extract_video_info.cache_clear() logger.info('yt-dlp cache cleared') def get_cache_info() -> Dict[str, Any]: """Get cache statistics.""" cache_info = extract_video_info.cache_info() return { 'hits': cache_info.hits, 'misses': cache_info.misses, 'size': cache_info.currsize, 'maxsize': cache_info.maxsize, 'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0, }