diff options
Diffstat (limited to 'youtube/channel.py')
| -rw-r--r-- | youtube/channel.py | 895 |
1 files changed, 576 insertions, 319 deletions
diff --git a/youtube/channel.py b/youtube/channel.py index adc8929..72fac07 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -1,6 +1,9 @@ import base64 -import youtube.common as common -from youtube.common import default_multi_get, URL_ORIGIN, get_thumbnail_url, video_id +from youtube import (util, yt_data_extract, local_playlist, subscriptions, + playlist) +from youtube import yt_app +import settings + import urllib import json from string import Template @@ -9,374 +12,628 @@ import html import math import gevent import re -import functools - -with open("yt_channel_items_template.html", "r") as file: - yt_channel_items_template = Template(file.read()) - -with open("yt_channel_about_template.html", "r") as file: - yt_channel_about_template = Template(file.read()) - -'''continuation = Proto( - Field('optional', 'continuation', 80226972, Proto( - Field('optional', 'browse_id', 2, String), - Field('optional', 'params', 3, Base64(Proto( - Field('optional', 'channel_tab', 2, String), - Field('optional', 'sort', 3, ENUM - Field('optional', 'page', 15, String), - ))) - )) -)''' - - -'''channel_continuation = Proto( - Field('optional', 'pointless_nest', 80226972, Proto( - Field('optional', 'channel_id', 2, String), - Field('optional', 'continuation_info', 3, Base64(Proto( - Field('optional', 'channel_tab', 2, String), - Field('optional', 'sort', 3, ENUM - Field('optional', 'page', 15, String), - ))) - )) -)''' +import cachetools.func +import traceback + +import flask +from flask import request -headers_1 = ( +headers_desktop = ( ('Accept', '*/*'), ('Accept-Language', 'en-US,en;q=0.5'), ('X-YouTube-Client-Name', '1'), ('X-YouTube-Client-Version', '2.20180830'), -) -# https://www.youtube.com/browse_ajax?action_continuation=1&direct_render=1&continuation=4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA%3D%3D -# https://www.youtube.com/browse_ajax?ctoken=4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA%3D%3D&continuation=4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA%3D%3D&itct=CDsQybcCIhMIhZi1krTc2wIVjMicCh2HXQnhKJsc +) + util.desktop_ua +headers_mobile = ( + ('Accept', '*/*'), + ('Accept-Language', 'en-US,en;q=0.5'), + ('X-YouTube-Client-Name', '2'), + ('X-YouTube-Client-Version', '2.20180830'), +) + util.mobile_ua +real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),) +generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),) + +# FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127) +# Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest +def channel_ctoken_v5(channel_id, page, sort, tab, view=1): + # Map sort values to YouTube API values (Invidious values) + # Input: sort=3 (newest), sort=4 (newest no shorts) + # YouTube expects: 4=newest + sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4} # 4 is newest without shorts + new_sort = sort_mapping.get(sort, 4) + + offset = 30*(int(page) - 1) + + # Build continuation token using Invidious structure + # The structure is: base64(protobuf({ + # 80226972: { + # 2: channel_id, + # 3: base64(protobuf({ + # 110: { + # 3: { + # tab: { + # 1: { + # 1: base64(protobuf({ + # 1: base64(protobuf({ + # 2: "ST:" + base64(offset_varint) + # })) + # })) + # }, + # 2: base64(protobuf({1: UUID})) + # 4: sort_value + # 8: base64(protobuf({ + # 1: UUID + # 3: sort_value + # })) + # } + # } + # } + # })) + # } + # })) + + # UUID placeholder + uuid_proto = proto.string(1, "00000000-0000-0000-0000-000000000000") + + # Offset encoding + offset_varint = proto.uint(1, offset) + offset_encoded = proto.string(2, proto.unpadded_b64encode(offset_varint)) + offset_wrapper = proto.string(1, proto.unpadded_b64encode(offset_encoded)) + offset_base = proto.string(1, proto.unpadded_b64encode(offset_wrapper)) + + # Sort value varint + sort_varint = proto.uint(4, new_sort) + + # Embedded message with UUID and sort + embedded_inner = uuid_proto + proto.uint(3, new_sort) + embedded_encoded = proto.string(8, proto.unpadded_b64encode(embedded_inner)) + + # Combine: uuid_wrapper + sort_varint + embedded + tab_inner_content = offset_base + uuid_proto + sort_varint + embedded_encoded + + tab_inner = proto.string(1, proto.unpadded_b64encode(tab_inner_content)) + tab_wrapper = proto.string(tab, tab_inner) + + inner_container = proto.string(3, tab_wrapper) + outer_container = proto.string(110, inner_container) + + encoded_inner = proto.percent_b64encode(outer_container) + + pointless_nest = proto.string(80226972, + proto.string(2, channel_id) + + proto.string(3, encoded_inner) + ) + + return base64.urlsafe_b64encode(pointless_nest).decode('ascii') + + +def channel_about_ctoken(channel_id): + return proto.make_protobuf( + ('base64p', + [ + [2, 80226972, + [ + [2, 2, channel_id], + [2, 3, + ('base64p', + [ + [2, 110, + [ + [2, 3, + [ + [2, 19, + [ + [2, 1, b'66b0e9e9-0000-2820-9589-582429a83980'], + ] + ], + ] + ], + ] + ], + ] + ) + ], + ] + ], + ] + ) + ) + + +# https://github.com/user234683/youtube-local/issues/151 +def channel_ctoken_v4(channel_id, page, sort, tab, view=1): + new_sort = (2 if int(sort) == 1 else 1) + offset = str(30*(int(page) - 1)) + pointless_nest = proto.string(80226972, + proto.string(2, channel_id) + + proto.string(3, + proto.percent_b64encode( + proto.string(110, + proto.string(3, + proto.string(15, + proto.string(1, + proto.string(1, + proto.unpadded_b64encode( + proto.string(1, + proto.unpadded_b64encode( + proto.string(2, + b"ST:" + + proto.unpadded_b64encode( + proto.string(2, offset) + ) + ) + ) + ) + ) + ) + # targetId, just needs to be present but + # doesn't need to be correct + + proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38") + ) + # 1 - newest, 2 - popular + + proto.uint(3, new_sort) + ) + ) + ) + ) + ) + ) + + return base64.urlsafe_b64encode(pointless_nest).decode('ascii') -# grid view: 4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA -# list view: 4qmFsgJCEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJkVnWjJhV1JsYjNNWUF5QUFNQUk0QVdBQmFnQjZBVEs0QVFBJTNE # SORT: -# Popular - 1 -# Oldest - 2 -# Newest - 3 +# videos: +# Newest - 3 +# Last video added - 4 # view: # grid: 0 or 1 # list: 2 -def channel_ctoken(channel_id, page, sort, tab, view=1): - - tab = proto.string(2, tab ) +def channel_ctoken_v3(channel_id, page, sort, tab, view=1): + # page > 1 doesn't work when sorting by oldest + offset = 30*(int(page) - 1) + page_token = proto.string(61, proto.unpadded_b64encode( + proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) + )) + + tab = proto.string(2, tab) sort = proto.uint(3, int(sort)) - page = proto.string(15, str(page) ) - # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos + shelf_view = proto.uint(4, 0) view = proto.uint(6, int(view)) - continuation_info = proto.string( 3, proto.percent_b64encode(tab + view + sort + shelf_view + page) ) - - channel_id = proto.string(2, channel_id ) + continuation_info = proto.string(3, + proto.percent_b64encode(tab + sort + shelf_view + view + page_token) + ) + + channel_id = proto.string(2, channel_id) pointless_nest = proto.string(80226972, channel_id + continuation_info) return base64.urlsafe_b64encode(pointless_nest).decode('ascii') -def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1): - ctoken = channel_ctoken(channel_id, page, sort, tab, view).replace('=', '%3D') - url = "https://www.youtube.com/browse_ajax?ctoken=" + ctoken - print("Sending channel tab ajax request") - content = common.fetch_url(url, common.desktop_ua + headers_1) - print("Finished recieving channel tab response") +def channel_ctoken_v2(channel_id, page, sort, tab, view=1): + # see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646 + # page > 1 doesn't work when sorting by oldest + offset = 30*(int(page) - 1) + schema_number = { + 3: 6307666885028338688, + 2: 17254859483345278706, + 1: 16570086088270825023, + }[int(sort)] + page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1, + proto.uint(1, schema_number) + proto.string(2, + proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset))) + ) + ))) - '''with open('debug/channel_debug', 'wb') as f: - f.write(content)''' - info = json.loads(content) - return info + tab = proto.string(2, tab) + sort = proto.uint(3, int(sort)) + #page = proto.string(15, str(page)) + shelf_view = proto.uint(4, 0) + view = proto.uint(6, int(view)) + continuation_info = proto.string( + 3, + proto.percent_b64encode(tab + sort + shelf_view + view + page_token) + ) + channel_id = proto.string(2, channel_id) + pointless_nest = proto.string(80226972, channel_id + continuation_info) + return base64.urlsafe_b64encode(pointless_nest).decode('ascii') -def get_number_of_videos(channel_id): - # Uploads playlist - playlist_id = 'UU' + channel_id[2:] - url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&ajax=1&disable_polymer=true' - print("Getting number of videos") - response = common.fetch_url(url, common.mobile_ua + headers_1) - '''with open('debug/playlist_debug_metadata', 'wb') as f: - f.write(response)''' - response = response.decode('utf-8') - print("Got response for number of videos") - match = re.search(r'"num_videos_text":\s*{(?:"item_type":\s*"formatted_string",)?\s*"runs":\s*\[{"text":\s*"([\d,]*) videos"', response) - if match: - return int(match.group(1).replace(',','')) - else: - return 0 -@functools.lru_cache(maxsize=128) -def get_channel_id(username): - # method that gives the smallest possible response at ~10 kb - # needs to be as fast as possible - url = 'https://m.youtube.com/user/' + username + '/about?ajax=1&disable_polymer=true' - response = common.fetch_url(url, common.mobile_ua + headers_1).decode('utf-8') - return re.search(r'"channel_id":\s*"([a-zA-Z0-9_-]*)"', response).group(1) - -def grid_items_html(items, additional_info={}): - result = ''' <nav class="item-grid">\n''' - for item in items: - result += common.renderer_html(item, additional_info) - result += '''\n</nav>''' - return result - -def list_items_html(items, additional_info={}): - result = ''' <nav class="item-list">''' - for item in items: - result += common.renderer_html(item, additional_info) - result += '''\n</nav>''' - return result - -channel_tab_template = Template('''\n<a class="tab page-button"$href_attribute>$tab_name</a>''') -channel_search_template = Template(''' - <form class="channel-search" action="$action"> - <input type="search" name="query" class="search-box" value="$search_box_value"> - <button type="submit" value="Search" class="search-button">Search</button> - </form>''') - -tabs = ('Videos', 'Playlists', 'About') -def channel_tabs_html(channel_id, current_tab, search_box_value=''): - result = '' - for tab_name in tabs: - if tab_name == current_tab: - result += channel_tab_template.substitute( - href_attribute = '', - tab_name = tab_name, - ) - else: - result += channel_tab_template.substitute( - href_attribute = 'href="' + URL_ORIGIN + "/channel/" + channel_id + "/" + tab_name.lower() + '"', - tab_name = tab_name, - ) - result += channel_search_template.substitute( - action = URL_ORIGIN + "/channel/" + channel_id + "/search", - search_box_value = html.escape(search_box_value), - ) - return result - +def channel_ctoken_v1(channel_id, page, sort, tab, view=1): + tab = proto.string(2, tab) + sort = proto.uint(3, int(sort)) + page = proto.string(15, str(page)) + # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos + shelf_view = proto.uint(4, 0) + view = proto.uint(6, int(view)) + continuation_info = proto.string(3, proto.percent_b64encode(tab + view + sort + shelf_view + page + proto.uint(23, 0)) ) + channel_id = proto.string(2, channel_id) + pointless_nest = proto.string(80226972, channel_id + continuation_info) + return base64.urlsafe_b64encode(pointless_nest).decode('ascii') -def channel_videos_html(polymer_json, current_page=1, number_of_videos = 1000, current_query_string=''): - microformat = polymer_json[1]['response']['microformat']['microformatDataRenderer'] - channel_url = microformat['urlCanonical'].rstrip('/') - channel_id = channel_url[channel_url.rfind('/')+1:] - try: - items = polymer_json[1]['response']['continuationContents']['gridContinuation']['items'] - except KeyError: - response = polymer_json[1]['response'] - try: - contents = response['contents'] - except KeyError: - items = [] - else: - items = tab_with_content(contents['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['gridRenderer']['items'] - items_html = grid_items_html(items, {'author': microformat['title']}) - - return yt_channel_items_template.substitute( - header = common.get_header(), - channel_title = microformat['title'], - channel_tabs = channel_tabs_html(channel_id, 'Videos'), - avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'], - page_title = microformat['title'] + ' - Channel', - items = items_html, - page_buttons = common.page_buttons_html(current_page, math.ceil(number_of_videos/30), URL_ORIGIN + "/channel/" + channel_id + "/videos", current_query_string), - number_of_results = '{:,}'.format(number_of_videos) + " videos", - ) -def channel_playlists_html(polymer_json): - microformat = polymer_json[1]['response']['microformat']['microformatDataRenderer'] - channel_url = microformat['urlCanonical'].rstrip('/') - channel_id = channel_url[channel_url.rfind('/')+1:] - try: - items = polymer_json[1]['response']['continuationContents']['gridContinuation']['items'] - except KeyError: - response = polymer_json[1]['response'] - try: - contents = response['contents'] - except KeyError: - items = [] +def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1, + ctoken=None, print_status=True): + message = 'Got channel tab' if print_status else None + + if not ctoken: + if tab in ('videos', 'shorts', 'streams'): + ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view) else: - item_section = tab_with_content(contents['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0] - try: - items = item_section['gridRenderer']['items'] - except KeyError: - if "messageRenderer" in item_section: - items = [] - else: - raise - - items_html = grid_items_html(items, {'author': microformat['title']}) - - return yt_channel_items_template.substitute( - header = common.get_header(), - channel_title = microformat['title'], - channel_tabs = channel_tabs_html(channel_id, 'Playlists'), - avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'], - page_title = microformat['title'] + ' - Channel', - items = items_html, - page_buttons = '', - number_of_results = '', - ) + ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view) + ctoken = ctoken.replace('=', '%3D') + + # Not sure what the purpose of the key is or whether it will change + # For now it seems to be constant for the API endpoint, not dependent + # on the browsing session or channel + key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' + url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + + data = { + 'context': { + 'client': { + 'hl': 'en', + 'gl': 'US', + 'clientName': 'WEB', + 'clientVersion': '2.20240327.00.00', + }, + }, + 'continuation': ctoken, + } + + content_type_header = (('Content-Type', 'application/json'),) + content = util.fetch_url( + url, headers_desktop + content_type_header, + data=json.dumps(data), debug_name='channel_tab', report_text=message) + + return content + + +# cache entries expire after 30 minutes +number_of_videos_cache = cachetools.TTLCache(128, 30*60) +@cachetools.cached(number_of_videos_cache) +def get_number_of_videos_channel(channel_id): + if channel_id is None: + return 1000 -# Example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg -def tab_with_content(tabs): - for tab in tabs: - try: - renderer = tab['tabRenderer'] - except KeyError: - renderer = tab['expandableTabRenderer'] - try: - return renderer['content'] - except KeyError: - pass - - raise Exception("No tabs found with content") - -channel_link_template = Template(''' -<li><a href="$url">$text</a></li>''') -stat_template = Template(''' -<li>$stat_value</li>''') -def channel_about_page(polymer_json): - avatar = '/' + polymer_json[1]['response']['microformat']['microformatDataRenderer']['thumbnail']['thumbnails'][0]['url'] - # my goodness... - channel_metadata = tab_with_content(polymer_json[1]['response']['contents']['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer'] - channel_links = '' - for link_json in channel_metadata.get('primaryLinks', ()): - channel_links += channel_link_template.substitute( - url = html.escape(link_json['navigationEndpoint']['urlEndpoint']['url']), - text = common.get_plain_text(link_json['title']), - ) + # Uploads playlist + playlist_id = 'UU' + channel_id[2:] + url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1' - stats = '' - for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): - try: - stat_value = common.get_plain_text(channel_metadata[stat_name]) - except KeyError: - continue - else: - stats += stat_template.substitute(stat_value=stat_value) try: - description = common.format_text_runs(common.get_formatted_text(channel_metadata['description'])) - except KeyError: - description = '' - return yt_channel_about_template.substitute( - header = common.get_header(), - page_title = common.get_plain_text(channel_metadata['title']) + ' - About', - channel_title = common.get_plain_text(channel_metadata['title']), - avatar = html.escape(avatar), - description = description, - links = channel_links, - stats = stats, - channel_tabs = channel_tabs_html(channel_metadata['channelId'], 'About'), - ) + response = util.fetch_url(url, headers_mobile, + debug_name='number_of_videos', report_text='Got number of videos') + except (urllib.error.HTTPError, util.FetchError) as e: + traceback.print_exc() + print("Couldn't retrieve number of videos") + return 1000 + + response = response.decode('utf-8') + + # match = re.search(r'"numVideosText":\s*{\s*"runs":\s*\[{"text":\s*"([\d,]*) videos"', response) + match = re.search(r'"numVideosText".*?([,\d]+)', response) + if match: + return int(match.group(1).replace(',','')) + else: + return 0 +def set_cached_number_of_videos(channel_id, num_videos): + @cachetools.cached(number_of_videos_cache) + def dummy_func_using_same_cache(channel_id): + return num_videos + dummy_func_using_same_cache(channel_id) + + +channel_id_re = re.compile(r'videos\.xml\?channel_id=([a-zA-Z0-9_-]{24})"') +@cachetools.func.lru_cache(maxsize=128) +def get_channel_id(base_url): + # method that gives the smallest possible response at ~4 kb + # needs to be as fast as possible + base_url = base_url.replace('https://www', 'https://m') # avoid redirect + response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile, + debug_name='get_channel_id', report_text='Got channel id').decode('utf-8') + match = channel_id_re.search(response) + if match: + return match.group(1) + return None + + +metadata_cache = cachetools.LRUCache(128) +@cachetools.cached(metadata_cache) +def get_metadata(channel_id): + base_url = 'https://www.youtube.com/channel/' + channel_id + polymer_json = util.fetch_url(base_url + '/about?pbj=1', + headers_desktop, + debug_name='gen_channel_about', + report_text='Retrieved channel metadata') + info = yt_data_extract.extract_channel_info(json.loads(polymer_json), + 'about', + continuation=False) + return extract_metadata_for_caching(info) +def set_cached_metadata(channel_id, metadata): + @cachetools.cached(metadata_cache) + def dummy_func_using_same_cache(channel_id): + return metadata + dummy_func_using_same_cache(channel_id) +def extract_metadata_for_caching(channel_info): + metadata = {} + for key in ('approx_subscriber_count', 'short_description', 'channel_name', + 'avatar'): + metadata[key] = channel_info[key] + return metadata + + +def get_number_of_videos_general(base_url): + return get_number_of_videos_channel(get_channel_id(base_url)) -def channel_search_page(polymer_json, query, current_page=1, number_of_videos = 1000, current_query_string=''): - microformat = polymer_json[1]['response']['microformat']['microformatDataRenderer'] - channel_url = microformat['urlCanonical'].rstrip('/') - channel_id = channel_url[channel_url.rfind('/')+1:] - response = polymer_json[1]['response'] - try: - items = tab_with_content(response['contents']['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'] - except KeyError: - items = response['continuationContents']['sectionListContinuation']['contents'] - - items_html = list_items_html(items) - - return yt_channel_items_template.substitute( - header = common.get_header(), - channel_title = html.escape(microformat['title']), - channel_tabs = channel_tabs_html(channel_id, '', query), - avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'], - page_title = html.escape(query + ' - Channel search'), - items = items_html, - page_buttons = common.page_buttons_html(current_page, math.ceil(number_of_videos/29), URL_ORIGIN + "/channel/" + channel_id + "/search", current_query_string), - number_of_results = '', - ) def get_channel_search_json(channel_id, query, page): - params = proto.string(2, 'search') + proto.string(15, str(page)) + offset = proto.unpadded_b64encode(proto.uint(3, (page-1)*30)) + params = proto.string(2, 'search') + proto.string(15, offset) params = proto.percent_b64encode(params) ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query) ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') - polymer_json = common.fetch_url("https://www.youtube.com/browse_ajax?ctoken=" + ctoken, common.desktop_ua + headers_1) - '''with open('debug/channel_search_debug', 'wb') as f: - f.write(polymer_json)''' - polymer_json = json.loads(polymer_json) + key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' + url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key + + data = { + 'context': { + 'client': { + 'hl': 'en', + 'gl': 'US', + 'clientName': 'WEB', + 'clientVersion': '2.20240327.00.00', + }, + }, + 'continuation': ctoken, + } + + content_type_header = (('Content-Type', 'application/json'),) + polymer_json = util.fetch_url( + url, headers_desktop + content_type_header, + data=json.dumps(data), debug_name='channel_search') return polymer_json - -def get_channel_page(url, query_string=''): - path_components = url.rstrip('/').lstrip('/').split('/') - channel_id = path_components[0] - try: - tab = path_components[1] - except IndexError: - tab = 'videos' - - parameters = urllib.parse.parse_qs(query_string) - page_number = int(common.default_multi_get(parameters, 'page', 0, default='1')) - sort = common.default_multi_get(parameters, 'sort', 0, default='3') - view = common.default_multi_get(parameters, 'view', 0, default='1') - query = common.default_multi_get(parameters, 'query', 0, default='') - - if tab == 'videos': + +def post_process_channel_info(info): + info['avatar'] = util.prefix_url(info['avatar']) + info['channel_url'] = util.prefix_url(info['channel_url']) + for item in info['items']: + # For playlists, use first_video_id for thumbnail, not playlist id + if item.get('type') == 'playlist' and item.get('first_video_id'): + item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['first_video_id']) + elif item.get('type') == 'video': + item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id']) + # For channels and other types, keep existing thumbnail + util.prefix_urls(item) + util.add_extra_html_info(item) + if info['current_tab'] == 'about': + for i, (text, url) in enumerate(info['links']): + if isinstance(url, str) and util.YOUTUBE_URL_RE.fullmatch(url): + info['links'][i] = (text, util.prefix_url(url)) + + +def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None): + if channel_id: + base_url = 'https://www.youtube.com/channel/' + channel_id + + # Build URL with sort parameter + # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts + # Note: 'da' (oldest) was removed by YouTube in January 2026 + url = base_url + '/' + tab + '?pbj=1&view=0' + if sort: + # Map sort values to YouTube's URL parameter values + sort_map = {'3': 'dd', '4': 'lad'} + url += '&sort=' + sort_map.get(sort, 'dd') + + return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab) + + +playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} + +# youtube.com/[channel_id]/[tab] +# youtube.com/user/[username]/[tab] +# youtube.com/c/[custom]/[tab] +# youtube.com/[custom]/[tab] +def get_channel_page_general_url(base_url, tab, request, channel_id=None): + + page_number = int(request.args.get('page', 1)) + # sort 1: views + # sort 2: oldest + # sort 4: newest - no shorts (Just a kludge on our end, not internal to yt) + default_sort = '3' if settings.include_shorts_in_channel else '4' + sort = request.args.get('sort', default_sort) + view = request.args.get('view', '1') + query = request.args.get('query', '') + ctoken = request.args.get('ctoken', '') + include_shorts = (sort != '4') + default_params = (page_number == 1 and sort in ('3', '4') and view == '1') + continuation = bool(ctoken) # whether or not we're using a continuation + page_size = 30 + try_channel_api = True + polymer_json = None + + # Use the special UU playlist which contains all the channel's uploads + if tab == 'videos' and sort in ('3', '4'): + if not channel_id: + channel_id = get_channel_id(base_url) + if page_number == 1 and include_shorts: + tasks = ( + gevent.spawn(playlist.playlist_first_page, + 'UU' + channel_id[2:], + report_text='Retrieved channel videos'), + gevent.spawn(get_metadata, channel_id), + ) + gevent.joinall(tasks) + util.check_gevent_exceptions(*tasks) + + # Ignore the metadata for now, it is cached and will be + # recalled later + pl_json = tasks[0].value + pl_info = yt_data_extract.extract_playlist_info(pl_json) + number_of_videos = pl_info['metadata']['video_count'] + if number_of_videos is None: + number_of_videos = 1000 + else: + set_cached_number_of_videos(channel_id, number_of_videos) + else: + tasks = ( + gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:], + page_number, include_shorts=include_shorts), + gevent.spawn(get_metadata, channel_id), + gevent.spawn(get_number_of_videos_channel, channel_id), + ) + gevent.joinall(tasks) + util.check_gevent_exceptions(*tasks) + + pl_json = tasks[0].value + pl_info = yt_data_extract.extract_playlist_info(pl_json) + number_of_videos = tasks[2].value + + info = pl_info + info['channel_id'] = channel_id + info['current_tab'] = 'videos' + if info['items']: # Success + page_size = 100 + try_channel_api = False + else: # Try the first-page method next + try_channel_api = True + + # Use the regular channel API + if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api): + if channel_id: + num_videos_call = (get_number_of_videos_channel, channel_id) + else: + num_videos_call = (get_number_of_videos_general, base_url) + + # For page 1, use the first-page method which won't break + # Pass sort parameter directly (2=oldest, 3=newest, etc.) + if page_number == 1: + # Always use first-page method for page 1 with sort parameter + page_call = (get_channel_first_page, base_url, tab, None, sort) + else: + # For page 2+, we can't paginate without continuation tokens + # This is a YouTube limitation, not our bug + flask.abort(404, 'Pagination not available for this sort option. YouTube removed this feature.') + tasks = ( - gevent.spawn(get_number_of_videos, channel_id ), - gevent.spawn(get_channel_tab, channel_id, page_number, sort, 'videos', view) + gevent.spawn(*num_videos_call), + gevent.spawn(*page_call), ) gevent.joinall(tasks) + util.check_gevent_exceptions(*tasks) number_of_videos, polymer_json = tasks[0].value, tasks[1].value - return channel_videos_html(polymer_json, page_number, number_of_videos, query_string) elif tab == 'about': - polymer_json = common.fetch_url('https://www.youtube.com/channel/' + channel_id + '/about?pbj=1', common.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - return channel_about_page(polymer_json) + # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about') + channel_id = get_channel_id(base_url) + ctoken = channel_about_ctoken(channel_id) + polymer_json = util.call_youtube_api('web', 'browse', { + 'continuation': ctoken, + }) + continuation=True + elif tab == 'playlists' and page_number == 1: + # Use youtubei API instead of deprecated pbj=1 format + if not channel_id: + channel_id = get_channel_id(base_url) + ctoken = channel_ctoken_v3(channel_id, page='1', sort=sort, tab='playlists', view=view) + polymer_json = util.call_youtube_api('web', 'browse', { + 'continuation': ctoken, + }) + continuation = True elif tab == 'playlists': - polymer_json = common.fetch_url('https://www.youtube.com/channel/' + channel_id + '/playlists?pbj=1&view=1', common.desktop_ua + headers_1) - '''with open('debug/channel_playlists_debug', 'wb') as f: - f.write(polymer_json)''' - polymer_json = json.loads(polymer_json) - return channel_playlists_html(polymer_json) + polymer_json = get_channel_tab(channel_id, page_number, sort, + 'playlists', view) + continuation = True + elif tab == 'search' and channel_id: + polymer_json = get_channel_search_json(channel_id, query, page_number) elif tab == 'search': - tasks = ( - gevent.spawn(get_number_of_videos, channel_id ), - gevent.spawn(get_channel_search_json, channel_id, query, page_number) + url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='') + polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search') + elif tab == 'videos': + pass + else: + flask.abort(404, 'Unknown channel tab: ' + tab) + + if polymer_json is not None: + info = yt_data_extract.extract_channel_info( + json.loads(polymer_json), tab, continuation=continuation ) - gevent.joinall(tasks) - number_of_videos, polymer_json = tasks[0].value, tasks[1].value - return channel_search_page(polymer_json, query, page_number, number_of_videos, query_string) - else: - raise ValueError('Unknown channel tab: ' + tab) - -def get_user_page(url, query_string=''): - path_components = url.rstrip('/').lstrip('/').split('/') - username = path_components[0] - try: - page = path_components[1] - except IndexError: - page = 'videos' - if page == 'videos': - polymer_json = common.fetch_url('https://www.youtube.com/user/' + username + '/videos?pbj=1&view=0', common.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - return channel_videos_html(polymer_json) - elif page == 'about': - polymer_json = common.fetch_url('https://www.youtube.com/user/' + username + '/about?pbj=1', common.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - return channel_about_page(polymer_json) - elif page == 'playlists': - polymer_json = common.fetch_url('https://www.youtube.com/user/' + username + '/playlists?pbj=1&view=1', common.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - return channel_playlists_html(polymer_json) - elif page == 'search': - raise NotImplementedError() - '''polymer_json = common.fetch_url('https://www.youtube.com/user' + username + '/search?pbj=1&' + query_string, common.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - return channel_search_page(''' + if info['error'] is not None: + return flask.render_template('error.html', error_message=info['error']) + + if channel_id: + info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id + info['channel_id'] = channel_id else: - raise ValueError('Unknown channel page: ' + page)
\ No newline at end of file + channel_id = info['channel_id'] + + # Will have microformat present, cache metadata while we have it + if channel_id and default_params and tab not in ('videos', 'about'): + metadata = extract_metadata_for_caching(info) + set_cached_metadata(channel_id, metadata) + # Otherwise, populate with our (hopefully cached) metadata + elif channel_id and info.get('channel_name') is None: + metadata = get_metadata(channel_id) + for key, value in metadata.items(): + yt_data_extract.conservative_update(info, key, value) + # need to add this metadata to the videos/playlists + additional_info = { + 'author': info['channel_name'], + 'author_id': info['channel_id'], + 'author_url': info['channel_url'], + } + for item in info['items']: + item.update(additional_info) + + if tab in ('videos', 'shorts', 'streams'): + info['number_of_videos'] = number_of_videos + info['number_of_pages'] = math.ceil(number_of_videos/page_size) + info['header_playlist_names'] = local_playlist.get_playlist_names() + if tab in ('videos', 'shorts', 'streams', 'playlists'): + info['current_sort'] = sort + elif tab == 'search': + info['search_box_value'] = query + info['header_playlist_names'] = local_playlist.get_playlist_names() + if tab in ('search', 'playlists'): + info['page_number'] = page_number + info['subscribed'] = subscriptions.is_subscribed(info['channel_id']) + + post_process_channel_info(info) + + return flask.render_template('channel.html', + parameters_dictionary = request.args, + **info + ) + + +@yt_app.route('/channel/<channel_id>/') +@yt_app.route('/channel/<channel_id>/<tab>') +def get_channel_page(channel_id, tab='videos'): + return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id) + + +@yt_app.route('/user/<username>/') +@yt_app.route('/user/<username>/<tab>') +def get_user_page(username, tab='videos'): + return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) + + +@yt_app.route('/c/<custom>/') +@yt_app.route('/c/<custom>/<tab>') +def get_custom_c_page(custom, tab='videos'): + return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) + + +@yt_app.route('/<custom>') +@yt_app.route('/<custom>/<tab>') +def get_toplevel_custom_page(custom, tab='videos'): + return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request) |
