aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/ytdlp_service.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/ytdlp_service.py')
-rw-r--r--youtube/ytdlp_service.py393
1 files changed, 0 insertions, 393 deletions
diff --git a/youtube/ytdlp_service.py b/youtube/ytdlp_service.py
deleted file mode 100644
index 994cec4..0000000
--- a/youtube/ytdlp_service.py
+++ /dev/null
@@ -1,393 +0,0 @@
-#!/usr/bin/env python3
-"""
-Centralized yt-dlp integration with caching, logging, and error handling.
-
-This module provides a clean interface for yt-dlp functionality:
-- Multi-language audio track extraction
-- Subtitle extraction
-- Age-restricted video support
-
-All yt-dlp usage should go through this module for consistency.
-"""
-import logging
-from functools import lru_cache
-from typing import Dict, List, Optional, Any
-import yt_dlp
-import settings
-
-logger = logging.getLogger(__name__)
-
-# Language name mapping
-LANGUAGE_NAMES = {
- 'en': 'English',
- 'es': 'Español',
- 'fr': 'Français',
- 'de': 'Deutsch',
- 'it': 'Italiano',
- 'pt': 'Português',
- 'ru': 'Русский',
- 'ja': '日本語',
- 'ko': '한국어',
- 'zh': '中文',
- 'ar': 'العربية',
- 'hi': 'हिन्दी',
- 'und': 'Unknown',
- 'zxx': 'No linguistic content',
-}
-
-
-def get_language_name(lang_code: str) -> str:
- """Convert ISO 639-1/2 language code to readable name."""
- if not lang_code:
- return 'Unknown'
- return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper())
-
-
-def _get_ytdlp_config() -> Dict[str, Any]:
- """Get yt-dlp configuration from settings."""
- config = {
- 'quiet': True,
- 'no_warnings': True,
- 'extract_flat': False,
- 'format': 'best',
- 'skip_download': True,
- 'socket_timeout': 30,
- 'extractor_retries': 3,
- 'http_chunk_size': 10485760, # 10MB
- }
-
- # Configure Tor proxy if enabled
- if settings.route_tor:
- config['proxy'] = 'socks5://127.0.0.1:9150'
- logger.debug('Tor proxy enabled for yt-dlp')
-
- # Use cookies if available
- import os
- cookies_file = 'youtube_cookies.txt'
- if os.path.exists(cookies_file):
- config['cookiefile'] = cookies_file
- logger.debug('Using cookies file for yt-dlp')
-
- return config
-
-
-@lru_cache(maxsize=128)
-def extract_video_info(video_id: str) -> Dict[str, Any]:
- """
- Extract video information using yt-dlp with caching.
-
- Args:
- video_id: YouTube video ID
-
- Returns:
- Dictionary with video information including audio tracks
-
- Caching:
- Results are cached to avoid repeated requests to YouTube.
- Cache size is limited to prevent memory issues.
- """
- # Check if yt-dlp is enabled
- if not getattr(settings, 'ytdlp_enabled', True):
- logger.debug('yt-dlp integration is disabled')
- return {'error': 'yt-dlp disabled', 'audio_tracks': []}
-
- url = f'https://www.youtube.com/watch?v={video_id}'
- ydl_opts = _get_ytdlp_config()
-
- try:
- logger.debug(f'Extracting video info: {video_id}')
-
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
- info = ydl.extract_info(url, download=False)
-
- if not info:
- logger.warning(f'No info returned for video: {video_id}')
- return {'error': 'No info returned', 'audio_tracks': []}
-
- logger.info(f'Extracted {len(info.get("formats", []))} total formats')
-
- # Extract audio tracks grouped by language
- audio_tracks = _extract_audio_tracks(info)
-
- return {
- 'video_id': video_id,
- 'title': info.get('title', ''),
- 'duration': info.get('duration', 0),
- 'audio_tracks': audio_tracks,
- 'formats': info.get('formats', []),
- 'subtitles': info.get('subtitles', {}),
- 'automatic_captions': info.get('automatic_captions', {}),
- }
-
- except yt_dlp.utils.DownloadError as e:
- logger.error(f'yt-dlp download error for {video_id}: {e}')
- return {'error': str(e), 'audio_tracks': []}
- except Exception as e:
- logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True)
- return {'error': str(e), 'audio_tracks': []}
-
-
-def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- Extract audio tracks from video info, grouped by language.
-
- Returns a list of unique audio tracks (one per language),
- keeping the highest quality for each language.
- """
- audio_by_language = {}
- all_formats = info.get('formats', [])
-
- logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks')
-
- for fmt in all_formats:
- # Only audio-only formats
- has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
- has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
-
- if not has_audio or has_video:
- continue
-
- # Extract language information
- lang = (
- fmt.get('language') or
- fmt.get('audio_language') or
- fmt.get('lang') or
- 'und'
- )
-
- # Get language name
- lang_name = (
- fmt.get('language_name') or
- fmt.get('lang_name') or
- get_language_name(lang)
- )
-
- # Get bitrate
- bitrate = fmt.get('abr') or fmt.get('tbr') or 0
-
- # Create track info
- track_info = {
- 'language': lang,
- 'language_name': lang_name,
- 'format_id': str(fmt.get('format_id', '')),
- 'itag': str(fmt.get('format_id', '')),
- 'ext': fmt.get('ext'),
- 'acodec': fmt.get('acodec'),
- 'audio_bitrate': int(bitrate) if bitrate else 0,
- 'audio_sample_rate': fmt.get('asr'),
- 'url': fmt.get('url'),
- 'filesize': fmt.get('filesize'),
- }
-
- # Keep best quality per language
- lang_key = lang.lower()
- if lang_key not in audio_by_language:
- audio_by_language[lang_key] = track_info
- logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k')
- else:
- current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
- if bitrate > current_bitrate:
- logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k')
- audio_by_language[lang_key] = track_info
-
- # Convert to list and sort
- audio_tracks = list(audio_by_language.values())
-
- # Sort: English first, then by bitrate (descending)
- audio_tracks.sort(
- key=lambda x: (
- 0 if x['language'] == 'en' else 1,
- -x.get('audio_bitrate', 0)
- )
- )
-
- logger.info(f'Extracted {len(audio_tracks)} unique audio languages')
- for track in audio_tracks[:5]: # Log first 5
- logger.info(f' → {track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k')
-
- return audio_tracks
-
-
-def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
- """
- Get subtitle URL for a specific language.
-
- Args:
- video_id: YouTube video ID
- lang: Language code (default: 'en')
-
- Returns:
- URL to subtitle file, or None if not available
- """
- info = extract_video_info(video_id)
-
- if 'error' in info:
- logger.warning(f'Cannot get subtitles: {info["error"]}')
- return None
-
- # Try manual subtitles first
- subtitles = info.get('subtitles', {})
- if lang in subtitles:
- for sub in subtitles[lang]:
- if sub.get('ext') == 'vtt':
- logger.debug(f'Found manual {lang} subtitle')
- return sub.get('url')
-
- # Try automatic captions
- auto_captions = info.get('automatic_captions', {})
- if lang in auto_captions:
- for sub in auto_captions[lang]:
- if sub.get('ext') == 'vtt':
- logger.debug(f'Found automatic {lang} subtitle')
- return sub.get('url')
-
- logger.debug(f'No {lang} subtitle found')
- return None
-
-
-def find_best_unified_format(
- video_id: str,
- audio_language: str = 'en',
- max_quality: int = 720
-) -> Optional[Dict[str, Any]]:
- """
- Find best unified (video+audio) format for specific language and quality.
-
- Args:
- video_id: YouTube video ID
- audio_language: Preferred audio language
- max_quality: Maximum video height (e.g., 720, 1080)
-
- Returns:
- Format dict if found, None otherwise
- """
- info = extract_video_info(video_id)
-
- if 'error' in info or not info.get('formats'):
- return None
-
- # Quality thresholds (minimum acceptable height as % of requested)
- thresholds = {
- 2160: 0.85,
- 1440: 0.80,
- 1080: 0.70,
- 720: 0.70,
- 480: 0.60,
- 360: 0.50,
- }
-
- # Get threshold for requested quality
- threshold = 0.70
- for q, t in thresholds.items():
- if max_quality >= q:
- threshold = t
- break
-
- min_height = int(max_quality * threshold)
- logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
-
- candidates = []
- audio_lang_lower = audio_language.lower()
-
- for fmt in info['formats']:
- # Must have both video and audio
- has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
- has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
-
- if not (has_video and has_audio):
- continue
-
- # Skip HLS/DASH formats
- protocol = fmt.get('protocol', '')
- format_id = str(fmt.get('format_id', ''))
-
- if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
- continue
- if format_id.startswith('9'): # HLS formats
- continue
-
- height = fmt.get('height', 0)
- if height < min_height:
- continue
-
- # Language matching
- lang = (
- fmt.get('language') or
- fmt.get('audio_language') or
- 'en'
- ).lower()
-
- lang_match = (
- lang == audio_lang_lower or
- lang.startswith(audio_lang_lower[:2]) or
- audio_lang_lower.startswith(lang[:2])
- )
-
- if not lang_match:
- continue
-
- # Calculate score
- score = 0
-
- # Language match bonus
- if lang == audio_lang_lower:
- score += 10000
- elif lang.startswith(audio_lang_lower[:2]):
- score += 8000
- else:
- score += 5000
-
- # Quality score
- quality_diff = abs(height - max_quality)
- if height >= max_quality:
- score += 3000 - quality_diff
- else:
- score += 2000 - quality_diff
-
- # Protocol preference
- if protocol in ('https', 'http'):
- score += 500
-
- # Format preference
- if fmt.get('ext') == 'mp4':
- score += 100
-
- candidates.append({
- 'format': fmt,
- 'score': score,
- 'height': height,
- 'lang': lang,
- })
-
- if not candidates:
- logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
- return None
-
- # Sort by score and return best
- candidates.sort(key=lambda x: x['score'], reverse=True)
- best = candidates[0]
-
- logger.info(
- f'Selected unified format: {best["format"].get("format_id")} | '
- f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
- )
-
- return best['format']
-
-
-def clear_cache():
- """Clear the video info cache."""
- extract_video_info.cache_clear()
- logger.info('yt-dlp cache cleared')
-
-
-def get_cache_info() -> Dict[str, Any]:
- """Get cache statistics."""
- cache_info = extract_video_info.cache_info()
- return {
- 'hits': cache_info.hits,
- 'misses': cache_info.misses,
- 'size': cache_info.currsize,
- 'maxsize': cache_info.maxsize,
- 'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0,
- }