diff options
Diffstat (limited to 'youtube/channel.py')
| -rw-r--r-- | youtube/channel.py | 199 |
1 files changed, 128 insertions, 71 deletions
diff --git a/youtube/channel.py b/youtube/channel.py index 3352ca2..14a565e 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -6,9 +6,7 @@ import settings import urllib import json -from string import Template import youtube.proto as proto -import html import math import gevent import re @@ -33,9 +31,9 @@ headers_mobile = ( real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),) generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),) -# FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127) # Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest -def channel_ctoken_v5(channel_id, page, sort, tab, view=1): +# include_shorts only applies to tab='videos'; tab='shorts'/'streams' always include their own content. +def channel_ctoken_v5(channel_id, page, sort, tab, view=1, include_shorts=True): # Tab-specific protobuf field numbers (from Invidious source) # Each tab uses different field numbers in the protobuf structure: # videos: 110 -> 3 -> 15 -> { 2:{1:UUID}, 4:sort, 8:{1:UUID, 3:sort} } @@ -74,6 +72,11 @@ def channel_ctoken_v5(channel_id, page, sort, tab, view=1): inner_container = proto.string(3, tab_wrapper) outer_container = proto.string(110, inner_container) + # Add shorts filter when include_shorts=False (field 104, same as playlist.py) + # This tells YouTube to exclude shorts from the results + if not include_shorts: + outer_container += proto.string(104, proto.uint(2, 1)) + encoded_inner = proto.percent_b64encode(outer_container) pointless_nest = proto.string(80226972, @@ -236,12 +239,12 @@ def channel_ctoken_v1(channel_id, page, sort, tab, view=1): def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, - ctoken=None, print_status=True): + ctoken=None, print_status=True, include_shorts=True): message = 'Got channel tab' if print_status else None if not ctoken: if tab in ('videos', 'shorts', 'streams'): - ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view) + ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view, include_shorts) else: ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) ctoken = ctoken.replace('=', '%3D') @@ -274,6 +277,8 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, # cache entries expire after 30 minutes number_of_videos_cache = cachetools.TTLCache(128, 30*60) +# Cache for continuation tokens (shorts/streams pagination) +continuation_token_cache = cachetools.TTLCache(512, 15*60) @cachetools.cached(number_of_videos_cache) def get_number_of_videos_channel(channel_id): if channel_id is None: @@ -286,19 +291,30 @@ def get_number_of_videos_channel(channel_id): try: response = util.fetch_url(url, headers_mobile, debug_name='number_of_videos', report_text='Got number of videos') - except (urllib.error.HTTPError, util.FetchError) as e: + except (urllib.error.HTTPError, util.FetchError): traceback.print_exc() print("Couldn't retrieve number of videos") return 1000 response = response.decode('utf-8') - # match = re.search(r'"numVideosText":\s*{\s*"runs":\s*\[{"text":\s*"([\d,]*) videos"', response) - match = re.search(r'"numVideosText".*?([,\d]+)', response) - if match: - return int(match.group(1).replace(',','')) - else: - return 0 + # Try several patterns since YouTube's format changes: + # "numVideosText":{"runs":[{"text":"1,234"},{"text":" videos"}]} + # "stats":[..., {"runs":[{"text":"1,234"},{"text":" videos"}]}] + for pattern in ( + r'"numVideosText".*?"text":\s*"([\d,]+)"', + r'"numVideosText".*?([\d,]+)\s*videos?', + r'"numVideosText".*?([,\d]+)', + r'([\d,]+)\s*videos?\s*</span>', + ): + match = re.search(pattern, response) + if match: + try: + return int(match.group(1).replace(',', '')) + except ValueError: + continue + # Fallback: unknown count + return 0 def set_cached_number_of_videos(channel_id, num_videos): @cachetools.cached(number_of_videos_cache) def dummy_func_using_same_cache(channel_id): @@ -423,24 +439,27 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): page_number = int(request.args.get('page', 1)) # sort 1: views # sort 2: oldest - # sort 4: newest - no shorts (Just a kludge on our end, not internal to yt) + # sort 3: newest (includes shorts, via UU uploads playlist) + # sort 4: newest - no shorts (uses channel Videos tab API directly, like Invidious) default_sort = '3' if settings.include_shorts_in_channel else '4' sort = request.args.get('sort', default_sort) view = request.args.get('view', '1') query = request.args.get('query', '') ctoken = request.args.get('ctoken', '') - include_shorts = (sort != '4') default_params = (page_number == 1 and sort in ('3', '4') and view == '1') - continuation = bool(ctoken) # whether or not we're using a continuation + continuation = bool(ctoken) page_size = 30 - try_channel_api = True polymer_json = None + number_of_videos = 0 + info = None - # Use the special UU playlist which contains all the channel's uploads - if tab == 'videos' and sort in ('3', '4'): + # ------------------------------------------------------------------------- + # sort=3: use UU uploads playlist (includes shorts) + # ------------------------------------------------------------------------- + if tab == 'videos' and sort == '3': if not channel_id: channel_id = get_channel_id(base_url) - if page_number == 1 and include_shorts: + if page_number == 1: tasks = ( gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], @@ -449,9 +468,6 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) - - # Ignore the metadata for now, it is cached and will be - # recalled later pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) number_of_videos = pl_info['metadata']['video_count'] @@ -462,57 +478,70 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): else: tasks = ( gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], - page_number, include_shorts=include_shorts), + page_number, include_shorts=True), gevent.spawn(get_metadata, channel_id), gevent.spawn(get_number_of_videos_channel, channel_id), + gevent.spawn(playlist.playlist_first_page, 'UU' + channel_id[2:], + report_text='Retrieved channel video count'), ) gevent.joinall(tasks) util.check_gevent_exceptions(*tasks) - pl_json = tasks[0].value pl_info = yt_data_extract.extract_playlist_info(pl_json) - number_of_videos = tasks[2].value - - info = pl_info - info['channel_id'] = channel_id - info['current_tab'] = 'videos' - if info['items']: # Success + first_page_meta = yt_data_extract.extract_playlist_metadata(tasks[3].value) + number_of_videos = (tasks[2].value + or first_page_meta.get('video_count') + or 0) + + if pl_info['items']: + info = pl_info + info['channel_id'] = channel_id + info['current_tab'] = 'videos' page_size = 100 - try_channel_api = False - else: # Try the first-page method next - try_channel_api = True - - # Use the regular channel API - if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api): + # else fall through to the channel browse API below + + # ------------------------------------------------------------------------- + # Channel browse API: sort=4 (videos tab, no shorts), shorts, streams, + # or fallback when the UU playlist returned no items. + # Uses channel_ctoken_v5 per-tab tokens, mirroring Invidious's approach. + # Pagination is driven by the continuation token YouTube returns each page. + # ------------------------------------------------------------------------- + used_channel_api = False + if info is None and ( + tab in ('shorts', 'streams') + or (tab == 'videos' and sort == '4') + or (tab == 'videos' and sort == '3') # UU-playlist fallback + ): if not channel_id: channel_id = get_channel_id(base_url) - - # Use youtubei browse API with continuation token for all pages - page_call = (get_channel_tab, channel_id, str(page_number), sort, - tab, int(view)) - continuation = True - - if tab == 'videos': - # Only need video count for the videos tab - if channel_id: - num_videos_call = (get_number_of_videos_channel, channel_id) + used_channel_api = True + + # Determine what browse call to make + if ctoken: + browse_call = (util.call_youtube_api, 'web', 'browse', + {'continuation': ctoken}) + continuation = True + elif page_number > 1: + cache_key = (channel_id, tab, sort, page_number - 1) + cached_ctoken = continuation_token_cache.get(cache_key) + if cached_ctoken: + browse_call = (util.call_youtube_api, 'web', 'browse', + {'continuation': cached_ctoken}) else: - num_videos_call = (get_number_of_videos_general, base_url) - tasks = ( - gevent.spawn(*num_videos_call), - gevent.spawn(*page_call), - ) - gevent.joinall(tasks) - util.check_gevent_exceptions(*tasks) - number_of_videos, polymer_json = tasks[0].value, tasks[1].value + # Cache miss — restart from page 1 (better than an error) + browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) + continuation = True else: - # For shorts/streams, item count is used instead - polymer_json = gevent.spawn(*page_call) - polymer_json.join() - if polymer_json.exception: - raise polymer_json.exception - polymer_json = polymer_json.value - number_of_videos = 0 # will be replaced by actual item count later + browse_call = (get_channel_tab, channel_id, '1', sort, tab, int(view)) + continuation = True + + # Single browse call; number_of_videos is computed from items actually + # fetched so we don't mislead the user with a total that includes + # shorts (which this branch is explicitly excluding for sort=4). + task = gevent.spawn(*browse_call) + task.join() + util.check_gevent_exceptions(task) + polymer_json = task.value elif tab == 'about': # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about') @@ -540,16 +569,16 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): elif tab == 'search': url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') - elif tab == 'videos': - pass - else: + elif tab != 'videos': flask.abort(404, 'Unknown channel tab: ' + tab) - if polymer_json is not None: + if polymer_json is not None and info is None: info = yt_data_extract.extract_channel_info( json.loads(polymer_json), tab, continuation=continuation ) + if info is None: + return flask.render_template('error.html', error_message='Could not retrieve channel data') if info['error'] is not None: return flask.render_template('error.html', error_message=info['error']) @@ -579,12 +608,40 @@ def get_channel_page_general_url(base_url, tab, request, channel_id=None): item.update(additional_info) if tab in ('videos', 'shorts', 'streams'): - if tab in ('shorts', 'streams'): - # For shorts/streams, use the actual item count since - # get_number_of_videos_channel counts regular uploads only - number_of_videos = len(info.get('items', [])) + # For any tab using the channel browse API (sort=4, shorts, streams), + # pagination is driven by the ctoken YouTube returns in the response. + # Cache it so the next page request can use it. + if info.get('ctoken'): + cache_key = (channel_id, tab, sort, page_number) + continuation_token_cache[cache_key] = info['ctoken'] + + # Determine is_last_page and final number_of_pages. + # For channel-API-driven tabs (sort=4, shorts, streams, UU fallback), + # YouTube doesn't give us a reliable total filtered count. So instead + # of displaying a misleading number (the total-including-shorts from + # get_number_of_videos_channel), we count only what we've actually + # paged through, and use the ctoken to know whether to show "next". + if used_channel_api: + info['is_last_page'] = (info.get('ctoken') is None) + items_on_page = len(info.get('items', [])) + items_seen_so_far = (page_number - 1) * page_size + items_on_page + + # Use accumulated count as the displayed total so "N videos" shown + # to the user always matches what they could actually reach. + number_of_videos = items_seen_so_far + + # If there's more content, bump by 1 so the Next-page button exists + if info.get('ctoken'): + number_of_videos = max(number_of_videos, + page_number * page_size + 1) + # For sort=3 via UU playlist (used_channel_api=False), number_of_videos + # was already set from playlist metadata above. + info['number_of_videos'] = number_of_videos - info['number_of_pages'] = math.ceil(number_of_videos/page_size) if number_of_videos else 1 + info['number_of_pages'] = math.ceil(number_of_videos / page_size) if number_of_videos else 1 + # Never show fewer pages than the page the user is actually on + if info['number_of_pages'] < page_number: + info['number_of_pages'] = page_number info['header_playlist_names'] = local_playlist.get_playlist_names() if tab in ('videos', 'shorts', 'streams', 'playlists'): info['current_sort'] = sort |
