aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/ytdlp_service.py
diff options
context:
space:
mode:
authorAstounds <kirito@disroot.org>2026-03-22 14:17:23 -0500
committerAstounds <kirito@disroot.org>2026-03-22 14:17:23 -0500
commit84e1acaab8f7e4e7e36d19e3b6847a0ab6c33759 (patch)
treea4021823e3e29d1efb57271dda5024825983bacf /youtube/ytdlp_service.py
parented4b05d9b616c688afc6ef03dc404009c4abfc0f (diff)
downloadyt-local-84e1acaab8f7e4e7e36d19e3b6847a0ab6c33759.tar.lz
yt-local-84e1acaab8f7e4e7e36d19e3b6847a0ab6c33759.tar.xz
yt-local-84e1acaab8f7e4e7e36d19e3b6847a0ab6c33759.zip
yt-dlp
Diffstat (limited to 'youtube/ytdlp_service.py')
-rw-r--r--youtube/ytdlp_service.py390
1 files changed, 390 insertions, 0 deletions
diff --git a/youtube/ytdlp_service.py b/youtube/ytdlp_service.py
new file mode 100644
index 0000000..2520193
--- /dev/null
+++ b/youtube/ytdlp_service.py
@@ -0,0 +1,390 @@
+#!/usr/bin/env python3
+"""
+Centralized yt-dlp integration with caching, logging, and error handling.
+
+This module provides a clean interface for yt-dlp functionality:
+- Multi-language audio track extraction
+- Subtitle extraction
+- Age-restricted video support
+
+All yt-dlp usage should go through this module for consistency.
+"""
+import logging
+from functools import lru_cache
+from typing import Dict, List, Optional, Any
+import yt_dlp
+import settings
+
+logger = logging.getLogger(__name__)
+
+# Language name mapping
+LANGUAGE_NAMES = {
+ 'en': 'English',
+ 'es': 'Español',
+ 'fr': 'Français',
+ 'de': 'Deutsch',
+ 'it': 'Italiano',
+ 'pt': 'Português',
+ 'ru': 'Русский',
+ 'ja': '日本語',
+ 'ko': '한국어',
+ 'zh': '中文',
+ 'ar': 'العربية',
+ 'hi': 'हिन्दी',
+ 'und': 'Unknown',
+ 'zxx': 'No linguistic content',
+}
+
+
+def get_language_name(lang_code: str) -> str:
+ """Convert ISO 639-1/2 language code to readable name."""
+ if not lang_code:
+ return 'Unknown'
+ return LANGUAGE_NAMES.get(lang_code.lower(), lang_code.upper())
+
+
+def _get_ytdlp_config() -> Dict[str, Any]:
+ """Get yt-dlp configuration from settings."""
+ config = {
+ 'quiet': True,
+ 'no_warnings': True,
+ 'extract_flat': False,
+ 'format': 'best',
+ 'skip_download': True,
+ 'socket_timeout': 30,
+ 'extractor_retries': 3,
+ 'http_chunk_size': 10485760, # 10MB
+ }
+
+ # Configure Tor proxy if enabled
+ if settings.route_tor:
+ config['proxy'] = 'socks5://127.0.0.1:9150'
+ logger.debug('Tor proxy enabled for yt-dlp')
+
+ # Use cookies if available
+ import os
+ cookies_file = 'youtube_cookies.txt'
+ if os.path.exists(cookies_file):
+ config['cookiefile'] = cookies_file
+ logger.debug('Using cookies file for yt-dlp')
+
+ return config
+
+
+@lru_cache(maxsize=128)
+def extract_video_info(video_id: str) -> Dict[str, Any]:
+ """
+ Extract video information using yt-dlp with caching.
+
+ Args:
+ video_id: YouTube video ID
+
+ Returns:
+ Dictionary with video information including audio tracks
+
+ Caching:
+ Results are cached to avoid repeated requests to YouTube.
+ Cache size is limited to prevent memory issues.
+ """
+ # Check if yt-dlp is enabled
+ if not getattr(settings, 'ytdlp_enabled', True):
+ logger.debug('yt-dlp integration is disabled')
+ return {'error': 'yt-dlp disabled', 'audio_tracks': []}
+
+ url = f'https://www.youtube.com/watch?v={video_id}'
+ ydl_opts = _get_ytdlp_config()
+
+ try:
+ logger.debug(f'Extracting video info: {video_id}')
+
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
+ info = ydl.extract_info(url, download=False)
+
+ if not info:
+ logger.warning(f'No info returned for video: {video_id}')
+ return {'error': 'No info returned', 'audio_tracks': []}
+
+ logger.debug(f'Extracted {len(info.get("formats", []))} formats')
+
+ # Extract audio tracks grouped by language
+ audio_tracks = _extract_audio_tracks(info)
+
+ return {
+ 'video_id': video_id,
+ 'title': info.get('title', ''),
+ 'duration': info.get('duration', 0),
+ 'audio_tracks': audio_tracks,
+ 'formats': info.get('formats', []),
+ 'subtitles': info.get('subtitles', {}),
+ 'automatic_captions': info.get('automatic_captions', {}),
+ }
+
+ except yt_dlp.utils.DownloadError as e:
+ logger.error(f'yt-dlp download error for {video_id}: {e}')
+ return {'error': str(e), 'audio_tracks': []}
+ except Exception as e:
+ logger.error(f'yt-dlp extraction error for {video_id}: {e}', exc_info=True)
+ return {'error': str(e), 'audio_tracks': []}
+
+
+def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
+ """
+ Extract audio tracks from video info, grouped by language.
+
+ Returns a list of unique audio tracks (one per language),
+ keeping the highest quality for each language.
+ """
+ audio_by_language = {}
+ all_formats = info.get('formats', [])
+
+ for fmt in all_formats:
+ # Only audio-only formats
+ has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
+ has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
+
+ if not has_audio or has_video:
+ continue
+
+ # Extract language information
+ lang = (
+ fmt.get('language') or
+ fmt.get('audio_language') or
+ fmt.get('lang') or
+ 'und'
+ )
+
+ # Get language name
+ lang_name = (
+ fmt.get('language_name') or
+ fmt.get('lang_name') or
+ get_language_name(lang)
+ )
+
+ # Get bitrate
+ bitrate = fmt.get('abr') or fmt.get('tbr') or 0
+
+ # Create track info
+ track_info = {
+ 'language': lang,
+ 'language_name': lang_name,
+ 'format_id': str(fmt.get('format_id', '')),
+ 'itag': str(fmt.get('format_id', '')),
+ 'ext': fmt.get('ext'),
+ 'acodec': fmt.get('acodec'),
+ 'audio_bitrate': int(bitrate) if bitrate else 0,
+ 'audio_sample_rate': fmt.get('asr'),
+ 'url': fmt.get('url'),
+ 'filesize': fmt.get('filesize'),
+ }
+
+ # Keep best quality per language
+ lang_key = lang.lower()
+ if lang_key not in audio_by_language:
+ audio_by_language[lang_key] = track_info
+ else:
+ current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
+ if bitrate > current_bitrate:
+ audio_by_language[lang_key] = track_info
+ logger.debug(f'Updated {lang} to higher bitrate: {bitrate}')
+
+ # Convert to list and sort
+ audio_tracks = list(audio_by_language.values())
+
+ # Sort: English first, then by bitrate (descending)
+ audio_tracks.sort(
+ key=lambda x: (
+ 0 if x['language'] == 'en' else 1,
+ -x.get('audio_bitrate', 0)
+ )
+ )
+
+ logger.debug(f'Found {len(audio_tracks)} unique audio tracks')
+ for track in audio_tracks[:3]: # Log first 3
+ logger.debug(f' - {track["language_name"]}: {track["audio_bitrate"]}k')
+
+ return audio_tracks
+
+
+def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
+ """
+ Get subtitle URL for a specific language.
+
+ Args:
+ video_id: YouTube video ID
+ lang: Language code (default: 'en')
+
+ Returns:
+ URL to subtitle file, or None if not available
+ """
+ info = extract_video_info(video_id)
+
+ if 'error' in info:
+ logger.warning(f'Cannot get subtitles: {info["error"]}')
+ return None
+
+ # Try manual subtitles first
+ subtitles = info.get('subtitles', {})
+ if lang in subtitles:
+ for sub in subtitles[lang]:
+ if sub.get('ext') == 'vtt':
+ logger.debug(f'Found manual {lang} subtitle')
+ return sub.get('url')
+
+ # Try automatic captions
+ auto_captions = info.get('automatic_captions', {})
+ if lang in auto_captions:
+ for sub in auto_captions[lang]:
+ if sub.get('ext') == 'vtt':
+ logger.debug(f'Found automatic {lang} subtitle')
+ return sub.get('url')
+
+ logger.debug(f'No {lang} subtitle found')
+ return None
+
+
+def find_best_unified_format(
+ video_id: str,
+ audio_language: str = 'en',
+ max_quality: int = 720
+) -> Optional[Dict[str, Any]]:
+ """
+ Find best unified (video+audio) format for specific language and quality.
+
+ Args:
+ video_id: YouTube video ID
+ audio_language: Preferred audio language
+ max_quality: Maximum video height (e.g., 720, 1080)
+
+ Returns:
+ Format dict if found, None otherwise
+ """
+ info = extract_video_info(video_id)
+
+ if 'error' in info or not info.get('formats'):
+ return None
+
+ # Quality thresholds (minimum acceptable height as % of requested)
+ thresholds = {
+ 2160: 0.85,
+ 1440: 0.80,
+ 1080: 0.70,
+ 720: 0.70,
+ 480: 0.60,
+ 360: 0.50,
+ }
+
+ # Get threshold for requested quality
+ threshold = 0.70
+ for q, t in thresholds.items():
+ if max_quality >= q:
+ threshold = t
+ break
+
+ min_height = int(max_quality * threshold)
+ logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
+
+ candidates = []
+ audio_lang_lower = audio_language.lower()
+
+ for fmt in info['formats']:
+ # Must have both video and audio
+ has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
+ has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
+
+ if not (has_video and has_audio):
+ continue
+
+ # Skip HLS/DASH formats
+ protocol = fmt.get('protocol', '')
+ format_id = str(fmt.get('format_id', ''))
+
+ if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
+ continue
+ if format_id.startswith('9'): # HLS formats
+ continue
+
+ height = fmt.get('height', 0)
+ if height < min_height:
+ continue
+
+ # Language matching
+ lang = (
+ fmt.get('language') or
+ fmt.get('audio_language') or
+ 'en'
+ ).lower()
+
+ lang_match = (
+ lang == audio_lang_lower or
+ lang.startswith(audio_lang_lower[:2]) or
+ audio_lang_lower.startswith(lang[:2])
+ )
+
+ if not lang_match:
+ continue
+
+ # Calculate score
+ score = 0
+
+ # Language match bonus
+ if lang == audio_lang_lower:
+ score += 10000
+ elif lang.startswith(audio_lang_lower[:2]):
+ score += 8000
+ else:
+ score += 5000
+
+ # Quality score
+ quality_diff = abs(height - max_quality)
+ if height >= max_quality:
+ score += 3000 - quality_diff
+ else:
+ score += 2000 - quality_diff
+
+ # Protocol preference
+ if protocol in ('https', 'http'):
+ score += 500
+
+ # Format preference
+ if fmt.get('ext') == 'mp4':
+ score += 100
+
+ candidates.append({
+ 'format': fmt,
+ 'score': score,
+ 'height': height,
+ 'lang': lang,
+ })
+
+ if not candidates:
+ logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
+ return None
+
+ # Sort by score and return best
+ candidates.sort(key=lambda x: x['score'], reverse=True)
+ best = candidates[0]
+
+ logger.info(
+ f'Selected unified format: {best["format"].get("format_id")} | '
+ f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
+ )
+
+ return best['format']
+
+
+def clear_cache():
+ """Clear the video info cache."""
+ extract_video_info.cache_clear()
+ logger.info('yt-dlp cache cleared')
+
+
+def get_cache_info() -> Dict[str, Any]:
+ """Get cache statistics."""
+ cache_info = extract_video_info.cache_info()
+ return {
+ 'hits': cache_info.hits,
+ 'misses': cache_info.misses,
+ 'size': cache_info.currsize,
+ 'maxsize': cache_info.maxsize,
+ 'hit_rate': cache_info.hits / (cache_info.hits + cache_info.misses) if (cache_info.hits + cache_info.misses) > 0 else 0,
+ }