diff options
Diffstat (limited to 'youtube/ytdlp_service.py')
| -rw-r--r-- | youtube/ytdlp_service.py | 393 |
1 files changed, 0 insertions, 393 deletions
diff --git a/youtube/ytdlp_service.py b/youtube/ytdlp_service.py deleted file mode 100644 index 994cec4..0000000 --- a/youtube/ytdlp_service.py +++ /dev/null @@ -1,393 +0,0 @@ -#!/usr/bin/env python3 -""" -Centralized yt-dlp integration with caching, logging, and error handling. - -This module provides a clean interface for yt-dlp functionality: -- Multi-language audio track extraction -- Subtitle extraction -- Age-restricted video support - -All yt-dlp usage should go through this module for consistency. -""" -import logging -from functools import lru_cache -from typing import Dict, List, Optional, Any -import yt_dlp -import settings - -logger = logging.getLogger(__name__) - -# Language name mapping -LANGUAGE_NAMES = { - 'en': 'English', - 'es': 'Español', - 'fr': 'Français', - 'de': 'Deutsch', - 'it': 'Italiano', - 'pt': 'Português', - 'ru': 'Русский', - 'ja': '日本語', - 'ko': '한국어', - 'zh': '中文', - 'ar': 'العربية', - 'hi': 'हिन्दी', - 'und': 'Unknown', - 'zxx': 'No linguistic content', -} - - -def get_language_name(lang_code: str) -> str: - """Convert ISO 639-1/2 language code to readable name.""" - if not lang_code: - return 'Unknown' - return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper()) - - -def _get_ytdlp_config() -> Dict[str, Any]: - """Get yt-dlp configuration from settings.""" - config = { - 'quiet': True, - 'no_warnings': True, - 'extract_flat': False, - 'format': 'best', - 'skip_download': True, - 'socket_timeout': 30, - 'extractor_retries': 3, - 'http_chunk_size': 10485760, # 10MB - } - - # Configure Tor proxy if enabled - if settings.route_tor: - config['proxy'] = 'socks5://127.0.0.1:9150' - logger.debug('Tor proxy enabled for yt-dlp') - - # Use cookies if available - import os - cookies_file = 'youtube_cookies.txt' - if os.path.exists(cookies_file): - config['cookiefile'] = cookies_file - logger.debug('Using cookies file for yt-dlp') - - return config - - -@lru_cache(maxsize=128) -def extract_video_info(video_id: str) -> Dict[str, Any]: - """ - Extract video information using yt-dlp with caching. - - Args: - video_id: YouTube video ID - - Returns: - Dictionary with video information including audio tracks - - Caching: - Results are cached to avoid repeated requests to YouTube. - Cache size is limited to prevent memory issues. - """ - # Check if yt-dlp is enabled - if not getattr(settings, 'ytdlp_enabled', True): - logger.debug('yt-dlp integration is disabled') - return {'error': 'yt-dlp disabled', 'audio_tracks': []} - - url = f'https://www.youtube.com/watch?v={video_id}' - ydl_opts = _get_ytdlp_config() - - try: - logger.debug(f'Extracting video info: {video_id}') - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(url, download=False) - - if not info: - logger.warning(f'No info returned for video: {video_id}') - return {'error': 'No info returned', 'audio_tracks': []} - - logger.info(f'Extracted {len(info.get("formats", []))} total formats') - - # Extract audio tracks grouped by language - audio_tracks = _extract_audio_tracks(info) - - return { - 'video_id': video_id, - 'title': info.get('title', ''), - 'duration': info.get('duration', 0), - 'audio_tracks': audio_tracks, - 'formats': info.get('formats', []), - 'subtitles': info.get('subtitles', {}), - 'automatic_captions': info.get('automatic_captions', {}), - } - - except yt_dlp.utils.DownloadError as e: - logger.error(f'yt-dlp download error for {video_id}: {e}') - return {'error': str(e), 'audio_tracks': []} - except Exception as e: - logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True) - return {'error': str(e), 'audio_tracks': []} - - -def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]: - """ - Extract audio tracks from video info, grouped by language. - - Returns a list of unique audio tracks (one per language), - keeping the highest quality for each language. - """ - audio_by_language = {} - all_formats = info.get('formats', []) - - logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks') - - for fmt in all_formats: - # Only audio-only formats - has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' - has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' - - if not has_audio or has_video: - continue - - # Extract language information - lang = ( - fmt.get('language') or - fmt.get('audio_language') or - fmt.get('lang') or - 'und' - ) - - # Get language name - lang_name = ( - fmt.get('language_name') or - fmt.get('lang_name') or - get_language_name(lang) - ) - - # Get bitrate - bitrate = fmt.get('abr') or fmt.get('tbr') or 0 - - # Create track info - track_info = { - 'language': lang, - 'language_name': lang_name, - 'format_id': str(fmt.get('format_id', '')), - 'itag': str(fmt.get('format_id', '')), - 'ext': fmt.get('ext'), - 'acodec': fmt.get('acodec'), - 'audio_bitrate': int(bitrate) if bitrate else 0, - 'audio_sample_rate': fmt.get('asr'), - 'url': fmt.get('url'), - 'filesize': fmt.get('filesize'), - } - - # Keep best quality per language - lang_key = lang.lower() - if lang_key not in audio_by_language: - audio_by_language[lang_key] = track_info - logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k') - else: - current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0) - if bitrate > current_bitrate: - logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k') - audio_by_language[lang_key] = track_info - - # Convert to list and sort - audio_tracks = list(audio_by_language.values()) - - # Sort: English first, then by bitrate (descending) - audio_tracks.sort( - key=lambda x: ( - 0 if x['language'] == 'en' else 1, - -x.get('audio_bitrate', 0) - ) - ) - - logger.info(f'Extracted {len(audio_tracks)} unique audio languages') - for track in audio_tracks[:5]: # Log first 5 - logger.info(f' → {track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k') - - return audio_tracks - - -def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]: - """ - Get subtitle URL for a specific language. - - Args: - video_id: YouTube video ID - lang: Language code (default: 'en') - - Returns: - URL to subtitle file, or None if not available - """ - info = extract_video_info(video_id) - - if 'error' in info: - logger.warning(f'Cannot get subtitles: {info["error"]}') - return None - - # Try manual subtitles first - subtitles = info.get('subtitles', {}) - if lang in subtitles: - for sub in subtitles[lang]: - if sub.get('ext') == 'vtt': - logger.debug(f'Found manual {lang} subtitle') - return sub.get('url') - - # Try automatic captions - auto_captions = info.get('automatic_captions', {}) - if lang in auto_captions: - for sub in auto_captions[lang]: - if sub.get('ext') == 'vtt': - logger.debug(f'Found automatic {lang} subtitle') - return sub.get('url') - - logger.debug(f'No {lang} subtitle found') - return None - - -def find_best_unified_format( - video_id: str, - audio_language: str = 'en', - max_quality: int = 720 -) -> Optional[Dict[str, Any]]: - """ - Find best unified (video+audio) format for specific language and quality. - - Args: - video_id: YouTube video ID - audio_language: Preferred audio language - max_quality: Maximum video height (e.g., 720, 1080) - - Returns: - Format dict if found, None otherwise - """ - info = extract_video_info(video_id) - - if 'error' in info or not info.get('formats'): - return None - - # Quality thresholds (minimum acceptable height as % of requested) - thresholds = { - 2160: 0.85, - 1440: 0.80, - 1080: 0.70, - 720: 0.70, - 480: 0.60, - 360: 0.50, - } - - # Get threshold for requested quality - threshold = 0.70 - for q, t in thresholds.items(): - if max_quality >= q: - threshold = t - break - - min_height = int(max_quality * threshold) - logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p') - - candidates = [] - audio_lang_lower = audio_language.lower() - - for fmt in info['formats']: - # Must have both video and audio - has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' - has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' - - if not (has_video and has_audio): - continue - - # Skip HLS/DASH formats - protocol = fmt.get('protocol', '') - format_id = str(fmt.get('format_id', '')) - - if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']): - continue - if format_id.startswith('9'): # HLS formats - continue - - height = fmt.get('height', 0) - if height < min_height: - continue - - # Language matching - lang = ( - fmt.get('language') or - fmt.get('audio_language') or - 'en' - ).lower() - - lang_match = ( - lang == audio_lang_lower or - lang.startswith(audio_lang_lower[:2]) or - audio_lang_lower.startswith(lang[:2]) - ) - - if not lang_match: - continue - - # Calculate score - score = 0 - - # Language match bonus - if lang == audio_lang_lower: - score += 10000 - elif lang.startswith(audio_lang_lower[:2]): - score += 8000 - else: - score += 5000 - - # Quality score - quality_diff = abs(height - max_quality) - if height >= max_quality: - score += 3000 - quality_diff - else: - score += 2000 - quality_diff - - # Protocol preference - if protocol in ('https', 'http'): - score += 500 - - # Format preference - if fmt.get('ext') == 'mp4': - score += 100 - - candidates.append({ - 'format': fmt, - 'score': score, - 'height': height, - 'lang': lang, - }) - - if not candidates: - logger.debug(f'No unified format found for {max_quality}p + {audio_language}') - return None - - # Sort by score and return best - candidates.sort(key=lambda x: x['score'], reverse=True) - best = candidates[0] - - logger.info( - f'Selected unified format: {best["format"].get("format_id")} | ' - f'{best["lang"]} | {best["height"]}p | score={best["score"]}' - ) - - return best['format'] - - -def clear_cache(): - """Clear the video info cache.""" - extract_video_info.cache_clear() - logger.info('yt-dlp cache cleared') - - -def get_cache_info() -> Dict[str, Any]: - """Get cache statistics.""" - cache_info = extract_video_info.cache_info() - return { - 'hits': cache_info.hits, - 'misses': cache_info.misses, - 'size': cache_info.currsize, - 'maxsize': cache_info.maxsize, - 'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0, - } |
