aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/ytdlp_service.py
diff options
context:
space:
mode:
authorAstounds <kirito@disroot.org>2026-03-22 20:50:03 -0500
committerAstounds <kirito@disroot.org>2026-03-22 20:50:03 -0500
commit6a68f0664568cea6f9a12e8743f195fe0a41f3ce (patch)
tree4ad12a70811a4821c0cc9dc94c19c1ccf2bca808 /youtube/ytdlp_service.py
parent84e1acaab8f7e4e7e36d19e3b6847a0ab6c33759 (diff)
downloadyt-local-6a68f0664568cea6f9a12e8743f195fe0a41f3ce.tar.lz
yt-local-6a68f0664568cea6f9a12e8743f195fe0a41f3ce.tar.xz
yt-local-6a68f0664568cea6f9a12e8743f195fe0a41f3ce.zip
Release v0.4.0 - HD Thumbnails, YouTube 2024+ Support, and yt-dlp Integrationv0.4.0
Major Features: - HD video thumbnails (hq720.jpg) with automatic fallback to lower qualities - HD channel avatars (240x240 instead of 88x88) - YouTube 2024+ lockupViewModel support for channel playlists - youtubei/v1/browse API integration for channel playlist tabs - yt-dlp integration for multi-language audio and subtitles Bug Fixes: - Fixed undefined `abort` import in playlist.py - Fixed undefined functions in proto.py (encode_varint, bytes_to_hex, succinct_encode) - Fixed missing `traceback` import in proto_debug.py - Fixed blurry playlist thumbnails using default.jpg instead of HD versions - Fixed channel playlists page using deprecated pbj=1 format Improvements: - Automatic thumbnail fallback system (hq720 → sddefault → hqdefault → mqdefault → default) - JavaScript thumbnail_fallback() handler for 404 errors - Better thumbnail quality across all pages (watch, channel, playlist, subscriptions) - Consistent HD avatar display for all channel items - Settings system automatically adds new settings without breaking user config Files Modified: - youtube/watch.py - HD thumbnails for related videos and playlist items - youtube/channel.py - HD thumbnails for channel playlists, youtubei API integration - youtube/playlist.py - HD thumbnails, fixed abort import - youtube/util.py - HD thumbnail URLs, avatar HD upgrade, prefix_url improvements - youtube/comments.py - HD video thumbnail - youtube/subscriptions.py - HD thumbnails, fixed abort import - youtube/yt_data_extract/common.py - lockupViewModel support, extract_lockup_view_model_info() - youtube/yt_data_extract/everything_else.py - HD playlist thumbnails - youtube/proto.py - Fixed undefined function references - youtube/proto_debug.py - Added traceback import - youtube/static/js/common.js - thumbnail_fallback() handler - youtube/templates/*.html - Added onerror handlers for thumbnail fallback - youtube/version.py - Bump to v0.4.0 Technical Details: - All thumbnail URLs now use hq720.jpg (1280x720) when available - Fallback handled client-side via JavaScript onerror handler - Server-side avatar upgrade via regex in util.prefix_url() - lockupViewModel parser extracts contentType, metadata, and first_video_id - Channel playlist tabs now use youtubei/v1/browse instead of deprecated pbj=1 - Settings version system ensures backward compatibility
Diffstat (limited to 'youtube/ytdlp_service.py')
-rw-r--r--youtube/ytdlp_service.py127
1 files changed, 65 insertions, 62 deletions
diff --git a/youtube/ytdlp_service.py b/youtube/ytdlp_service.py
index 2520193..994cec4 100644
--- a/youtube/ytdlp_service.py
+++ b/youtube/ytdlp_service.py
@@ -55,19 +55,19 @@ def _get_ytdlp_config() -> Dict[str, Any]:
'extractor_retries': 3,
'http_chunk_size': 10485760, # 10MB
}
-
+
# Configure Tor proxy if enabled
if settings.route_tor:
config['proxy'] = 'socks5://127.0.0.1:9150'
logger.debug('Tor proxy enabled for yt-dlp')
-
+
# Use cookies if available
import os
cookies_file = 'youtube_cookies.txt'
if os.path.exists(cookies_file):
config['cookiefile'] = cookies_file
logger.debug('Using cookies file for yt-dlp')
-
+
return config
@@ -75,13 +75,13 @@ def _get_ytdlp_config() -> Dict[str, Any]:
def extract_video_info(video_id: str) -> Dict[str, Any]:
"""
Extract video information using yt-dlp with caching.
-
+
Args:
video_id: YouTube video ID
-
+
Returns:
Dictionary with video information including audio tracks
-
+
Caching:
Results are cached to avoid repeated requests to YouTube.
Cache size is limited to prevent memory issues.
@@ -90,25 +90,25 @@ def extract_video_info(video_id: str) -> Dict[str, Any]:
if not getattr(settings, 'ytdlp_enabled', True):
logger.debug('yt-dlp integration is disabled')
return {'error': 'yt-dlp disabled', 'audio_tracks': []}
-
+
url = f'https://www.youtube.com/watch?v={video_id}'
ydl_opts = _get_ytdlp_config()
-
+
try:
logger.debug(f'Extracting video info: {video_id}')
-
+
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
-
+
if not info:
logger.warning(f'No info returned for video: {video_id}')
return {'error': 'No info returned', 'audio_tracks': []}
-
- logger.debug(f'Extracted {len(info.get("formats", []))} formats')
-
+
+ logger.info(f'Extracted {len(info.get("formats", []))} total formats')
+
# Extract audio tracks grouped by language
audio_tracks = _extract_audio_tracks(info)
-
+
return {
'video_id': video_id,
'title': info.get('title', ''),
@@ -118,7 +118,7 @@ def extract_video_info(video_id: str) -> Dict[str, Any]:
'subtitles': info.get('subtitles', {}),
'automatic_captions': info.get('automatic_captions', {}),
}
-
+
except yt_dlp.utils.DownloadError as e:
logger.error(f'yt-dlp download error for {video_id}: {e}')
return {'error': str(e), 'audio_tracks': []}
@@ -130,21 +130,23 @@ def extract_video_info(video_id: str) -> Dict[str, Any]:
def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract audio tracks from video info, grouped by language.
-
+
Returns a list of unique audio tracks (one per language),
keeping the highest quality for each language.
"""
audio_by_language = {}
all_formats = info.get('formats', [])
-
+
+ logger.debug(f'Processing {len(all_formats)} formats to extract audio tracks')
+
for fmt in all_formats:
# Only audio-only formats
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
-
+
if not has_audio or has_video:
continue
-
+
# Extract language information
lang = (
fmt.get('language') or
@@ -152,17 +154,17 @@ def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
fmt.get('lang') or
'und'
)
-
+
# Get language name
lang_name = (
fmt.get('language_name') or
fmt.get('lang_name') or
get_language_name(lang)
)
-
+
# Get bitrate
bitrate = fmt.get('abr') or fmt.get('tbr') or 0
-
+
# Create track info
track_info = {
'language': lang,
@@ -176,20 +178,21 @@ def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
'url': fmt.get('url'),
'filesize': fmt.get('filesize'),
}
-
+
# Keep best quality per language
lang_key = lang.lower()
if lang_key not in audio_by_language:
audio_by_language[lang_key] = track_info
+ logger.debug(f' Added {lang} ({lang_name}) - {bitrate}k')
else:
current_bitrate = audio_by_language[lang_key].get('audio_bitrate', 0)
if bitrate > current_bitrate:
+ logger.debug(f' Updated {lang} ({lang_name}): {current_bitrate}k → {bitrate}k')
audio_by_language[lang_key] = track_info
- logger.debug(f'Updated {lang} to higher bitrate: {bitrate}')
-
+
# Convert to list and sort
audio_tracks = list(audio_by_language.values())
-
+
# Sort: English first, then by bitrate (descending)
audio_tracks.sort(
key=lambda x: (
@@ -197,31 +200,31 @@ def _extract_audio_tracks(info: Dict[str, Any]) -> List[Dict[str, Any]]:
-x.get('audio_bitrate', 0)
)
)
-
- logger.debug(f'Found {len(audio_tracks)} unique audio tracks')
- for track in audio_tracks[:3]: # Log first 3
- logger.debug(f' - {track["language_name"]}: {track["audio_bitrate"]}k')
-
+
+ logger.info(f'Extracted {len(audio_tracks)} unique audio languages')
+ for track in audio_tracks[:5]: # Log first 5
+ logger.info(f' → {track["language_name"]} ({track["language"]}): {track["audio_bitrate"]}k')
+
return audio_tracks
def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
"""
Get subtitle URL for a specific language.
-
+
Args:
video_id: YouTube video ID
lang: Language code (default: 'en')
-
+
Returns:
URL to subtitle file, or None if not available
"""
info = extract_video_info(video_id)
-
+
if 'error' in info:
logger.warning(f'Cannot get subtitles: {info["error"]}')
return None
-
+
# Try manual subtitles first
subtitles = info.get('subtitles', {})
if lang in subtitles:
@@ -229,7 +232,7 @@ def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
if sub.get('ext') == 'vtt':
logger.debug(f'Found manual {lang} subtitle')
return sub.get('url')
-
+
# Try automatic captions
auto_captions = info.get('automatic_captions', {})
if lang in auto_captions:
@@ -237,7 +240,7 @@ def get_subtitle_url(video_id: str, lang: str = 'en') -> Optional[str]:
if sub.get('ext') == 'vtt':
logger.debug(f'Found automatic {lang} subtitle')
return sub.get('url')
-
+
logger.debug(f'No {lang} subtitle found')
return None
@@ -249,20 +252,20 @@ def find_best_unified_format(
) -> Optional[Dict[str, Any]]:
"""
Find best unified (video+audio) format for specific language and quality.
-
+
Args:
video_id: YouTube video ID
audio_language: Preferred audio language
max_quality: Maximum video height (e.g., 720, 1080)
-
+
Returns:
Format dict if found, None otherwise
"""
info = extract_video_info(video_id)
-
+
if 'error' in info or not info.get('formats'):
return None
-
+
# Quality thresholds (minimum acceptable height as % of requested)
thresholds = {
2160: 0.85,
@@ -272,60 +275,60 @@ def find_best_unified_format(
480: 0.60,
360: 0.50,
}
-
+
# Get threshold for requested quality
threshold = 0.70
for q, t in thresholds.items():
if max_quality >= q:
threshold = t
break
-
+
min_height = int(max_quality * threshold)
logger.debug(f'Quality threshold: {threshold:.0%} = min {min_height}p for {max_quality}p')
-
+
candidates = []
audio_lang_lower = audio_language.lower()
-
+
for fmt in info['formats']:
# Must have both video and audio
has_video = fmt.get('vcodec') and fmt.get('vcodec') != 'none'
has_audio = fmt.get('acodec') and fmt.get('acodec') != 'none'
-
+
if not (has_video and has_audio):
continue
-
+
# Skip HLS/DASH formats
protocol = fmt.get('protocol', '')
format_id = str(fmt.get('format_id', ''))
-
+
if any(x in protocol.lower() for x in ['m3u8', 'hls', 'dash']):
continue
if format_id.startswith('9'): # HLS formats
continue
-
+
height = fmt.get('height', 0)
if height < min_height:
continue
-
+
# Language matching
lang = (
fmt.get('language') or
fmt.get('audio_language') or
'en'
).lower()
-
+
lang_match = (
lang == audio_lang_lower or
lang.startswith(audio_lang_lower[:2]) or
audio_lang_lower.startswith(lang[:2])
)
-
+
if not lang_match:
continue
-
+
# Calculate score
score = 0
-
+
# Language match bonus
if lang == audio_lang_lower:
score += 10000
@@ -333,42 +336,42 @@ def find_best_unified_format(
score += 8000
else:
score += 5000
-
+
# Quality score
quality_diff = abs(height - max_quality)
if height >= max_quality:
score += 3000 - quality_diff
else:
score += 2000 - quality_diff
-
+
# Protocol preference
if protocol in ('https', 'http'):
score += 500
-
+
# Format preference
if fmt.get('ext') == 'mp4':
score += 100
-
+
candidates.append({
'format': fmt,
'score': score,
'height': height,
'lang': lang,
})
-
+
if not candidates:
logger.debug(f'No unified format found for {max_quality}p + {audio_language}')
return None
-
+
# Sort by score and return best
candidates.sort(key=lambda x: x['score'], reverse=True)
best = candidates[0]
-
+
logger.info(
f'Selected unified format: {best["format"].get("format_id")} | '
f'{best["lang"]} | {best["height"]}p | score={best["score"]}'
)
-
+
return best['format']