From 84e1acaab8f7e4e7e36d19e3b6847a0ab6c33759 Mon Sep 17 00:00:00 2001 From: Astounds Date: Sun, 22 Mar 2026 14:17:23 -0500 Subject: yt-dlp --- youtube/ytdlp_service.py | 390 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 390 insertions(+) create mode 100644 youtube/ytdlp_service.py (limited to 'youtube/ytdlp_service.py') diff --git a/youtube/ytdlp_service.py b/youtube/ytdlp_service.py new file mode 100644 index 0000000..2520193 --- /dev/null +++ b/youtube/ytdlp_service.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +""" +Centralized yt-dlp integration with caching, logging, and error handling. + +This module provides a clean interface for yt-dlp functionality: +- Multi-language audio track extraction +- Subtitle extraction +- Age-restricted video support + +All yt-dlp usage should go through this module for consistency. +""" +import logging +from functools import lru_cache +from typing import Dict, List, Optional, Any +import yt_dlp +import settings + +logger = logging.getLogger(__name__) + +# Language name mapping +LANGUAGE_NAMES = { + 'en': 'English', + 'es': 'Español', + 'fr': 'Français', + 'de': 'Deutsch', + 'it': 'Italiano', + 'pt': 'Português', + 'ru': 'Русский', + 'ja': '日本語', + 'ko': '한국어', + 'zh': '中文', + 'ar': 'العربية', + 'hi': 'हिन्दी', + 'und': 'Unknown', + 'zxx': 'No linguistic content', +} + + +def get_language_name(lang_code: str) -> str: + """Convert ISO 639-1/2 language code to readable name.""" + if not lang_code: + return 'Unknown' + return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper()) + + +def _get_ytdlp_config() -> Dict[str, Any]: + """Get yt-dlp configuration from settings.""" + config = { + 'quiet': True, + 'no_warnings': True, + 'extract_flat': False, + 'format': 'best', + 'skip_download': True, + 'socket_timeout': 30, + 'extractor_retries': 3, + 'http_chunk_size': 10485760, # 10MB + } + + # Configure Tor proxy if enabled + if settings.route_tor: + config['proxy'] = 'socks5://127.0.0.1:9150' + logger.debug('Tor proxy enabled for yt-dlp') + + # Use cookies if available + import os + cookies_file = 'youtube_cookies.txt' + if os.path.exists(cookies_file): + config['cookiefile'] = cookies_file + logger.debug('Using cookies file for yt-dlp') + + return config + + +@lru_cache(maxsize=128) +def extract_video_info(video_id: str) -> Dict[str, Any]: + """ + Extract video information using yt-dlp with caching. + + Args: + video_id: YouTube video ID + + Returns: + Dictionary with video information including audio tracks + + Caching: + Results are cached to avoid repeated requests to YouTube. + Cache size is limited to prevent memory issues. + """ + # Check if yt-dlp is enabled + if not getattr(settings, 'ytdlp_enabled', True): + logger.debug('yt-dlp integration is disabled') + return {'error': 'yt-dlp disabled', 'audio_tracks': []} + + url = f'https://www.youtube.com/watch?v={video_id}' + ydl_opts = _get_ytdlp_config() + + try: + logger.debug(f'Extracting video info: {video_id}') + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + + if not info: + logger.warning(f'No info returned for video: {video_id}') + return {'error': 'No info returned', 'audio_tracks': []} + + logger.debug(f'Extracted {len(info.get("formats", []))} formats') + + # Extract audio tracks grouped by language + audio_tracks = _extract_audio_tracks(info) + + return { + 'video_id': video_id, + 'title': info.get('title', ''), + 'duration': info.get('duration', 0), + 'audio_tracks': audio_tracks, + 'formats': info.get('formats', []), + 'subtitles': info.get('subtitles', {}), + 'automatic_captions': info.get('automatic_captions', {}), + } + + except yt_dlp.utils.DownloadError as e: + logger.error(f'yt-dlp download error for {video_id}: {e}') + return {'error': str(e), 'audio_tracks': []} + except Exception as e: + logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True) + return {'error': str(e), 'audio_tracks': []} + + +def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Extract audio tracks from video info, grouped by language. + + Returns a list of unique audio tracks (one per language), + keeping the highest quality for each language. + """ + audio_by_language = {} + all_formats = info.get('formats', []) + + for fmt in all_formats: + # Only audio-only formats + has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' + has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' + + if not has_audio or has_video: + continue + + # Extract language information + lang = ( + fmt.get('language') or + fmt.get('audio_language') or + fmt.get('lang') or + 'und' + ) + + # Get language name + lang_name = ( + fmt.get('language_name') or + fmt.get('lang_name') or + get_language_name(lang) + ) + + # Get bitrate + bitrate = fmt.get('abr') or fmt.get('tbr') or 0 + + # Create track info + track_info = { + 'language': lang, + 'language_name': lang_name, + 'format_id': str(fmt.get('format_id', '')), + 'itag': str(fmt.get('format_id', '')), + 'ext': fmt.get('ext'), + 'acodec': fmt.get('acodec'), + 'audio_bitrate': int(bitrate) if bitrate else 0, + 'audio_sample_rate': fmt.get('asr'), + 'url': fmt.get('url'), + 'filesize': fmt.get('filesize'), + } + + # Keep best quality per language + lang_key = lang.lower() + if lang_key not in audio_by_language: + audio_by_language[lang_key] = track_info + else: + current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0) + if bitrate > current_bitrate: + audio_by_language[lang_key] = track_info + logger.debug(f'Updated {lang} to higher bitrate: {bitrate}') + + # Convert to list and sort + audio_tracks = list(audio_by_language.values()) + + # Sort: English first, then by bitrate (descending) + audio_tracks.sort( + key=lambda x: ( + 0 if x['language'] == 'en' else 1, + -x.get('audio_bitrate', 0) + ) + ) + + logger.debug(f'Found {len(audio_tracks)} unique audio tracks') + for track in audio_tracks[:3]: # Log first 3 + logger.debug(f' - {track["language_name"]}: {track["audio_bitrate"]}k') + + return audio_tracks + + +def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]: + """ + Get subtitle URL for a specific language. + + Args: + video_id: YouTube video ID + lang: Language code (default: 'en') + + Returns: + URL to subtitle file, or None if not available + """ + info = extract_video_info(video_id) + + if 'error' in info: + logger.warning(f'Cannot get subtitles: {info["error"]}') + return None + + # Try manual subtitles first + subtitles = info.get('subtitles', {}) + if lang in subtitles: + for sub in subtitles[lang]: + if sub.get('ext') == 'vtt': + logger.debug(f'Found manual {lang} subtitle') + return sub.get('url') + + # Try automatic captions + auto_captions = info.get('automatic_captions', {}) + if lang in auto_captions: + for sub in auto_captions[lang]: + if sub.get('ext') == 'vtt': + logger.debug(f'Found automatic {lang} subtitle') + return sub.get('url') + + logger.debug(f'No {lang} subtitle found') + return None + + +def find_best_unified_format( + video_id: str, + audio_language: str = 'en', + max_quality: int = 720 +) -> Optional[Dict[str, Any]]: + """ + Find best unified (video+audio) format for specific language and quality. + + Args: + video_id: YouTube video ID + audio_language: Preferred audio language + max_quality: Maximum video height (e.g., 720, 1080) + + Returns: + Format dict if found, None otherwise + """ + info = extract_video_info(video_id) + + if 'error' in info or not info.get('formats'): + return None + + # Quality thresholds (minimum acceptable height as % of requested) + thresholds = { + 2160: 0.85, + 1440: 0.80, + 1080: 0.70, + 720: 0.70, + 480: 0.60, + 360: 0.50, + } + + # Get threshold for requested quality + threshold = 0.70 + for q, t in thresholds.items(): + if max_quality >= q: + threshold = t + break + + min_height = int(max_quality * threshold) + logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p') + + candidates = [] + audio_lang_lower = audio_language.lower() + + for fmt in info['formats']: + # Must have both video and audio + has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none' + has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none' + + if not (has_video and has_audio): + continue + + # Skip HLS/DASH formats + protocol = fmt.get('protocol', '') + format_id = str(fmt.get('format_id', '')) + + if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']): + continue + if format_id.startswith('9'): # HLS formats + continue + + height = fmt.get('height', 0) + if height < min_height: + continue + + # Language matching + lang = ( + fmt.get('language') or + fmt.get('audio_language') or + 'en' + ).lower() + + lang_match = ( + lang == audio_lang_lower or + lang.startswith(audio_lang_lower[:2]) or + audio_lang_lower.startswith(lang[:2]) + ) + + if not lang_match: + continue + + # Calculate score + score = 0 + + # Language match bonus + if lang == audio_lang_lower: + score += 10000 + elif lang.startswith(audio_lang_lower[:2]): + score += 8000 + else: + score += 5000 + + # Quality score + quality_diff = abs(height - max_quality) + if height >= max_quality: + score += 3000 - quality_diff + else: + score += 2000 - quality_diff + + # Protocol preference + if protocol in ('https', 'http'): + score += 500 + + # Format preference + if fmt.get('ext') == 'mp4': + score += 100 + + candidates.append({ + 'format': fmt, + 'score': score, + 'height': height, + 'lang': lang, + }) + + if not candidates: + logger.debug(f'No unified format found for {max_quality}p + {audio_language}') + return None + + # Sort by score and return best + candidates.sort(key=lambda x: x['score'], reverse=True) + best = candidates[0] + + logger.info( + f'Selected unified format: {best["format"].get("format_id")} | ' + f'{best["lang"]} | {best["height"]}p | score={best["score"]}' + ) + + return best['format'] + + +def clear_cache(): + """Clear the video info cache.""" + extract_video_info.cache_clear() + logger.info('yt-dlp cache cleared') + + +def get_cache_info() -> Dict[str, Any]: + """Get cache statistics.""" + cache_info = extract_video_info.cache_info() + return { + 'hits': cache_info.hits, + 'misses': cache_info.misses, + 'size': cache_info.currsize, + 'maxsize': cache_info.maxsize, + 'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0, + } -- cgit v1.2.3