aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/channel.py')
-rw-r--r--youtube/channel.py895
1 files changed, 576 insertions, 319 deletions
diff --git a/youtube/channel.py b/youtube/channel.py
index adc8929..72fac07 100644
--- a/youtube/channel.py
+++ b/youtube/channel.py
@@ -1,6 +1,9 @@
import base64
-import youtube.common as common
-from youtube.common import default_multi_get, URL_ORIGIN, get_thumbnail_url, video_id
+from youtube import (util, yt_data_extract, local_playlist, subscriptions,
+ playlist)
+from youtube import yt_app
+import settings
+
import urllib
import json
from string import Template
@@ -9,374 +12,628 @@ import html
import math
import gevent
import re
-import functools
-
-with open("yt_channel_items_template.html", "r") as file:
- yt_channel_items_template = Template(file.read())
-
-with open("yt_channel_about_template.html", "r") as file:
- yt_channel_about_template = Template(file.read())
-
-'''continuation = Proto(
- Field('optional', 'continuation', 80226972, Proto(
- Field('optional', 'browse_id', 2, String),
- Field('optional', 'params', 3, Base64(Proto(
- Field('optional', 'channel_tab', 2, String),
- Field('optional', 'sort', 3, ENUM
- Field('optional', 'page', 15, String),
- )))
- ))
-)'''
-
-
-'''channel_continuation = Proto(
- Field('optional', 'pointless_nest', 80226972, Proto(
- Field('optional', 'channel_id', 2, String),
- Field('optional', 'continuation_info', 3, Base64(Proto(
- Field('optional', 'channel_tab', 2, String),
- Field('optional', 'sort', 3, ENUM
- Field('optional', 'page', 15, String),
- )))
- ))
-)'''
+import cachetools.func
+import traceback
+
+import flask
+from flask import request
-headers_1 = (
+headers_desktop = (
('Accept', '*/*'),
('Accept-Language', 'en-US,en;q=0.5'),
('X-YouTube-Client-Name', '1'),
('X-YouTube-Client-Version', '2.20180830'),
-)
-# https://www.youtube.com/browse_ajax?action_continuation=1&direct_render=1&continuation=4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA%3D%3D
-# https://www.youtube.com/browse_ajax?ctoken=4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA%3D%3D&continuation=4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA%3D%3D&itct=CDsQybcCIhMIhZi1krTc2wIVjMicCh2HXQnhKJsc
+) + util.desktop_ua
+headers_mobile = (
+ ('Accept', '*/*'),
+ ('Accept-Language', 'en-US,en;q=0.5'),
+ ('X-YouTube-Client-Name', '2'),
+ ('X-YouTube-Client-Version', '2.20180830'),
+) + util.mobile_ua
+real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),)
+generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
+
+# FIXED 2026: YouTube changed continuation token structure (from Invidious commit a9f8127)
+# Sort values for YouTube API (from Invidious): 2=popular, 4=newest, 5=oldest
+def channel_ctoken_v5(channel_id, page, sort, tab, view=1):
+ # Map sort values to YouTube API values (Invidious values)
+ # Input: sort=3 (newest), sort=4 (newest no shorts)
+ # YouTube expects: 4=newest
+ sort_mapping = {'1': 2, '2': 5, '3': 4, '4': 4} # 4 is newest without shorts
+ new_sort = sort_mapping.get(sort, 4)
+
+ offset = 30*(int(page) - 1)
+
+ # Build continuation token using Invidious structure
+ # The structure is: base64(protobuf({
+ # 80226972: {
+ # 2: channel_id,
+ # 3: base64(protobuf({
+ # 110: {
+ # 3: {
+ # tab: {
+ # 1: {
+ # 1: base64(protobuf({
+ # 1: base64(protobuf({
+ # 2: "ST:" + base64(offset_varint)
+ # }))
+ # }))
+ # },
+ # 2: base64(protobuf({1: UUID}))
+ # 4: sort_value
+ # 8: base64(protobuf({
+ # 1: UUID
+ # 3: sort_value
+ # }))
+ # }
+ # }
+ # }
+ # }))
+ # }
+ # }))
+
+ # UUID placeholder
+ uuid_proto = proto.string(1, "00000000-0000-0000-0000-000000000000")
+
+ # Offset encoding
+ offset_varint = proto.uint(1, offset)
+ offset_encoded = proto.string(2, proto.unpadded_b64encode(offset_varint))
+ offset_wrapper = proto.string(1, proto.unpadded_b64encode(offset_encoded))
+ offset_base = proto.string(1, proto.unpadded_b64encode(offset_wrapper))
+
+ # Sort value varint
+ sort_varint = proto.uint(4, new_sort)
+
+ # Embedded message with UUID and sort
+ embedded_inner = uuid_proto + proto.uint(3, new_sort)
+ embedded_encoded = proto.string(8, proto.unpadded_b64encode(embedded_inner))
+
+ # Combine: uuid_wrapper + sort_varint + embedded
+ tab_inner_content = offset_base + uuid_proto + sort_varint + embedded_encoded
+
+ tab_inner = proto.string(1, proto.unpadded_b64encode(tab_inner_content))
+ tab_wrapper = proto.string(tab, tab_inner)
+
+ inner_container = proto.string(3, tab_wrapper)
+ outer_container = proto.string(110, inner_container)
+
+ encoded_inner = proto.percent_b64encode(outer_container)
+
+ pointless_nest = proto.string(80226972,
+ proto.string(2, channel_id)
+ + proto.string(3, encoded_inner)
+ )
+
+ return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
+
+
+def channel_about_ctoken(channel_id):
+ return proto.make_protobuf(
+ ('base64p',
+ [
+ [2, 80226972,
+ [
+ [2, 2, channel_id],
+ [2, 3,
+ ('base64p',
+ [
+ [2, 110,
+ [
+ [2, 3,
+ [
+ [2, 19,
+ [
+ [2, 1, b'66b0e9e9-0000-2820-9589-582429a83980'],
+ ]
+ ],
+ ]
+ ],
+ ]
+ ],
+ ]
+ )
+ ],
+ ]
+ ],
+ ]
+ )
+ )
+
+
+# https://github.com/user234683/youtube-local/issues/151
+def channel_ctoken_v4(channel_id, page, sort, tab, view=1):
+ new_sort = (2 if int(sort) == 1 else 1)
+ offset = str(30*(int(page) - 1))
+ pointless_nest = proto.string(80226972,
+ proto.string(2, channel_id)
+ + proto.string(3,
+ proto.percent_b64encode(
+ proto.string(110,
+ proto.string(3,
+ proto.string(15,
+ proto.string(1,
+ proto.string(1,
+ proto.unpadded_b64encode(
+ proto.string(1,
+ proto.unpadded_b64encode(
+ proto.string(2,
+ b"ST:"
+ + proto.unpadded_b64encode(
+ proto.string(2, offset)
+ )
+ )
+ )
+ )
+ )
+ )
+ # targetId, just needs to be present but
+ # doesn't need to be correct
+ + proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38")
+ )
+ # 1 - newest, 2 - popular
+ + proto.uint(3, new_sort)
+ )
+ )
+ )
+ )
+ )
+ )
+
+ return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
-# grid view: 4qmFsgJAEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJEVnWjJhV1JsYjNNZ0FEZ0JZQUZxQUhvQk1yZ0JBQSUzRCUzRA
-# list view: 4qmFsgJCEhhVQzdVY3M0MkZaeTN1WXpqcnF6T0lIc3caJkVnWjJhV1JsYjNNWUF5QUFNQUk0QVdBQmFnQjZBVEs0QVFBJTNE
# SORT:
-# Popular - 1
-# Oldest - 2
-# Newest - 3
+# videos:
+# Newest - 3
+# Last video added - 4
# view:
# grid: 0 or 1
# list: 2
-def channel_ctoken(channel_id, page, sort, tab, view=1):
-
- tab = proto.string(2, tab )
+def channel_ctoken_v3(channel_id, page, sort, tab, view=1):
+ # page > 1 doesn't work when sorting by oldest
+ offset = 30*(int(page) - 1)
+ page_token = proto.string(61, proto.unpadded_b64encode(
+ proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset)))
+ ))
+
+ tab = proto.string(2, tab)
sort = proto.uint(3, int(sort))
- page = proto.string(15, str(page) )
- # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos
+
shelf_view = proto.uint(4, 0)
view = proto.uint(6, int(view))
- continuation_info = proto.string( 3, proto.percent_b64encode(tab + view + sort + shelf_view + page) )
-
- channel_id = proto.string(2, channel_id )
+ continuation_info = proto.string(3,
+ proto.percent_b64encode(tab + sort + shelf_view + view + page_token)
+ )
+
+ channel_id = proto.string(2, channel_id)
pointless_nest = proto.string(80226972, channel_id + continuation_info)
return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
-def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1):
- ctoken = channel_ctoken(channel_id, page, sort, tab, view).replace('=', '%3D')
- url = "https://www.youtube.com/browse_ajax?ctoken=" + ctoken
- print("Sending channel tab ajax request")
- content = common.fetch_url(url, common.desktop_ua + headers_1)
- print("Finished recieving channel tab response")
+def channel_ctoken_v2(channel_id, page, sort, tab, view=1):
+ # see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646
+ # page > 1 doesn't work when sorting by oldest
+ offset = 30*(int(page) - 1)
+ schema_number = {
+ 3: 6307666885028338688,
+ 2: 17254859483345278706,
+ 1: 16570086088270825023,
+ }[int(sort)]
+ page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1,
+ proto.uint(1, schema_number) + proto.string(2,
+ proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset)))
+ )
+ )))
- '''with open('debug/channel_debug', 'wb') as f:
- f.write(content)'''
- info = json.loads(content)
- return info
+ tab = proto.string(2, tab)
+ sort = proto.uint(3, int(sort))
+ #page = proto.string(15, str(page))
+ shelf_view = proto.uint(4, 0)
+ view = proto.uint(6, int(view))
+ continuation_info = proto.string(
+ 3,
+ proto.percent_b64encode(tab + sort + shelf_view + view + page_token)
+ )
+ channel_id = proto.string(2, channel_id)
+ pointless_nest = proto.string(80226972, channel_id + continuation_info)
+ return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
-def get_number_of_videos(channel_id):
- # Uploads playlist
- playlist_id = 'UU' + channel_id[2:]
- url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&ajax=1&disable_polymer=true'
- print("Getting number of videos")
- response = common.fetch_url(url, common.mobile_ua + headers_1)
- '''with open('debug/playlist_debug_metadata', 'wb') as f:
- f.write(response)'''
- response = response.decode('utf-8')
- print("Got response for number of videos")
- match = re.search(r'"num_videos_text":\s*{(?:"item_type":\s*"formatted_string",)?\s*"runs":\s*\[{"text":\s*"([\d,]*) videos"', response)
- if match:
- return int(match.group(1).replace(',',''))
- else:
- return 0
-@functools.lru_cache(maxsize=128)
-def get_channel_id(username):
- # method that gives the smallest possible response at ~10 kb
- # needs to be as fast as possible
- url = 'https://m.youtube.com/user/' + username + '/about?ajax=1&disable_polymer=true'
- response = common.fetch_url(url, common.mobile_ua + headers_1).decode('utf-8')
- return re.search(r'"channel_id":\s*"([a-zA-Z0-9_-]*)"', response).group(1)
-
-def grid_items_html(items, additional_info={}):
- result = ''' <nav class="item-grid">\n'''
- for item in items:
- result += common.renderer_html(item, additional_info)
- result += '''\n</nav>'''
- return result
-
-def list_items_html(items, additional_info={}):
- result = ''' <nav class="item-list">'''
- for item in items:
- result += common.renderer_html(item, additional_info)
- result += '''\n</nav>'''
- return result
-
-channel_tab_template = Template('''\n<a class="tab page-button"$href_attribute>$tab_name</a>''')
-channel_search_template = Template('''
- <form class="channel-search" action="$action">
- <input type="search" name="query" class="search-box" value="$search_box_value">
- <button type="submit" value="Search" class="search-button">Search</button>
- </form>''')
-
-tabs = ('Videos', 'Playlists', 'About')
-def channel_tabs_html(channel_id, current_tab, search_box_value=''):
- result = ''
- for tab_name in tabs:
- if tab_name == current_tab:
- result += channel_tab_template.substitute(
- href_attribute = '',
- tab_name = tab_name,
- )
- else:
- result += channel_tab_template.substitute(
- href_attribute = 'href="' + URL_ORIGIN + "/channel/" + channel_id + "/" + tab_name.lower() + '"',
- tab_name = tab_name,
- )
- result += channel_search_template.substitute(
- action = URL_ORIGIN + "/channel/" + channel_id + "/search",
- search_box_value = html.escape(search_box_value),
- )
- return result
-
+def channel_ctoken_v1(channel_id, page, sort, tab, view=1):
+ tab = proto.string(2, tab)
+ sort = proto.uint(3, int(sort))
+ page = proto.string(15, str(page))
+ # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos
+ shelf_view = proto.uint(4, 0)
+ view = proto.uint(6, int(view))
+ continuation_info = proto.string(3, proto.percent_b64encode(tab + view + sort + shelf_view + page + proto.uint(23, 0)) )
+ channel_id = proto.string(2, channel_id)
+ pointless_nest = proto.string(80226972, channel_id + continuation_info)
+ return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
-def channel_videos_html(polymer_json, current_page=1, number_of_videos = 1000, current_query_string=''):
- microformat = polymer_json[1]['response']['microformat']['microformatDataRenderer']
- channel_url = microformat['urlCanonical'].rstrip('/')
- channel_id = channel_url[channel_url.rfind('/')+1:]
- try:
- items = polymer_json[1]['response']['continuationContents']['gridContinuation']['items']
- except KeyError:
- response = polymer_json[1]['response']
- try:
- contents = response['contents']
- except KeyError:
- items = []
- else:
- items = tab_with_content(contents['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['gridRenderer']['items']
- items_html = grid_items_html(items, {'author': microformat['title']})
-
- return yt_channel_items_template.substitute(
- header = common.get_header(),
- channel_title = microformat['title'],
- channel_tabs = channel_tabs_html(channel_id, 'Videos'),
- avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'],
- page_title = microformat['title'] + ' - Channel',
- items = items_html,
- page_buttons = common.page_buttons_html(current_page, math.ceil(number_of_videos/30), URL_ORIGIN + "/channel/" + channel_id + "/videos", current_query_string),
- number_of_results = '{:,}'.format(number_of_videos) + " videos",
- )
-def channel_playlists_html(polymer_json):
- microformat = polymer_json[1]['response']['microformat']['microformatDataRenderer']
- channel_url = microformat['urlCanonical'].rstrip('/')
- channel_id = channel_url[channel_url.rfind('/')+1:]
- try:
- items = polymer_json[1]['response']['continuationContents']['gridContinuation']['items']
- except KeyError:
- response = polymer_json[1]['response']
- try:
- contents = response['contents']
- except KeyError:
- items = []
+def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
+ ctoken=None, print_status=True):
+ message = 'Got channel tab' if print_status else None
+
+ if not ctoken:
+ if tab in ('videos', 'shorts', 'streams'):
+ ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view)
else:
- item_section = tab_with_content(contents['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]
- try:
- items = item_section['gridRenderer']['items']
- except KeyError:
- if "messageRenderer" in item_section:
- items = []
- else:
- raise
-
- items_html = grid_items_html(items, {'author': microformat['title']})
-
- return yt_channel_items_template.substitute(
- header = common.get_header(),
- channel_title = microformat['title'],
- channel_tabs = channel_tabs_html(channel_id, 'Playlists'),
- avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'],
- page_title = microformat['title'] + ' - Channel',
- items = items_html,
- page_buttons = '',
- number_of_results = '',
- )
+ ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view)
+ ctoken = ctoken.replace('=', '%3D')
+
+ # Not sure what the purpose of the key is or whether it will change
+ # For now it seems to be constant for the API endpoint, not dependent
+ # on the browsing session or channel
+ key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
+ url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
+
+ data = {
+ 'context': {
+ 'client': {
+ 'hl': 'en',
+ 'gl': 'US',
+ 'clientName': 'WEB',
+ 'clientVersion': '2.20240327.00.00',
+ },
+ },
+ 'continuation': ctoken,
+ }
+
+ content_type_header = (('Content-Type', 'application/json'),)
+ content = util.fetch_url(
+ url, headers_desktop + content_type_header,
+ data=json.dumps(data), debug_name='channel_tab', report_text=message)
+
+ return content
+
+
+# cache entries expire after 30 minutes
+number_of_videos_cache = cachetools.TTLCache(128, 30*60)
+@cachetools.cached(number_of_videos_cache)
+def get_number_of_videos_channel(channel_id):
+ if channel_id is None:
+ return 1000
-# Example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg
-def tab_with_content(tabs):
- for tab in tabs:
- try:
- renderer = tab['tabRenderer']
- except KeyError:
- renderer = tab['expandableTabRenderer']
- try:
- return renderer['content']
- except KeyError:
- pass
-
- raise Exception("No tabs found with content")
-
-channel_link_template = Template('''
-<li><a href="$url">$text</a></li>''')
-stat_template = Template('''
-<li>$stat_value</li>''')
-def channel_about_page(polymer_json):
- avatar = '/' + polymer_json[1]['response']['microformat']['microformatDataRenderer']['thumbnail']['thumbnails'][0]['url']
- # my goodness...
- channel_metadata = tab_with_content(polymer_json[1]['response']['contents']['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer']
- channel_links = ''
- for link_json in channel_metadata.get('primaryLinks', ()):
- channel_links += channel_link_template.substitute(
- url = html.escape(link_json['navigationEndpoint']['urlEndpoint']['url']),
- text = common.get_plain_text(link_json['title']),
- )
+ # Uploads playlist
+ playlist_id = 'UU' + channel_id[2:]
+ url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
- stats = ''
- for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
- try:
- stat_value = common.get_plain_text(channel_metadata[stat_name])
- except KeyError:
- continue
- else:
- stats += stat_template.substitute(stat_value=stat_value)
try:
- description = common.format_text_runs(common.get_formatted_text(channel_metadata['description']))
- except KeyError:
- description = ''
- return yt_channel_about_template.substitute(
- header = common.get_header(),
- page_title = common.get_plain_text(channel_metadata['title']) + ' - About',
- channel_title = common.get_plain_text(channel_metadata['title']),
- avatar = html.escape(avatar),
- description = description,
- links = channel_links,
- stats = stats,
- channel_tabs = channel_tabs_html(channel_metadata['channelId'], 'About'),
- )
+ response = util.fetch_url(url, headers_mobile,
+ debug_name='number_of_videos', report_text='Got number of videos')
+ except (urllib.error.HTTPError, util.FetchError) as e:
+ traceback.print_exc()
+ print("Couldn't retrieve number of videos")
+ return 1000
+
+ response = response.decode('utf-8')
+
+ # match = re.search(r'"numVideosText":\s*{\s*"runs":\s*\[{"text":\s*"([\d,]*) videos"', response)
+ match = re.search(r'"numVideosText".*?([,\d]+)', response)
+ if match:
+ return int(match.group(1).replace(',',''))
+ else:
+ return 0
+def set_cached_number_of_videos(channel_id, num_videos):
+ @cachetools.cached(number_of_videos_cache)
+ def dummy_func_using_same_cache(channel_id):
+ return num_videos
+ dummy_func_using_same_cache(channel_id)
+
+
+channel_id_re = re.compile(r'videos\.xml\?channel_id=([a-zA-Z0-9_-]{24})"')
+@cachetools.func.lru_cache(maxsize=128)
+def get_channel_id(base_url):
+ # method that gives the smallest possible response at ~4 kb
+ # needs to be as fast as possible
+ base_url = base_url.replace('https://www', 'https://m') # avoid redirect
+ response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile,
+ debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
+ match = channel_id_re.search(response)
+ if match:
+ return match.group(1)
+ return None
+
+
+metadata_cache = cachetools.LRUCache(128)
+@cachetools.cached(metadata_cache)
+def get_metadata(channel_id):
+ base_url = 'https://www.youtube.com/channel/' + channel_id
+ polymer_json = util.fetch_url(base_url + '/about?pbj=1',
+ headers_desktop,
+ debug_name='gen_channel_about',
+ report_text='Retrieved channel metadata')
+ info = yt_data_extract.extract_channel_info(json.loads(polymer_json),
+ 'about',
+ continuation=False)
+ return extract_metadata_for_caching(info)
+def set_cached_metadata(channel_id, metadata):
+ @cachetools.cached(metadata_cache)
+ def dummy_func_using_same_cache(channel_id):
+ return metadata
+ dummy_func_using_same_cache(channel_id)
+def extract_metadata_for_caching(channel_info):
+ metadata = {}
+ for key in ('approx_subscriber_count', 'short_description', 'channel_name',
+ 'avatar'):
+ metadata[key] = channel_info[key]
+ return metadata
+
+
+def get_number_of_videos_general(base_url):
+ return get_number_of_videos_channel(get_channel_id(base_url))
-def channel_search_page(polymer_json, query, current_page=1, number_of_videos = 1000, current_query_string=''):
- microformat = polymer_json[1]['response']['microformat']['microformatDataRenderer']
- channel_url = microformat['urlCanonical'].rstrip('/')
- channel_id = channel_url[channel_url.rfind('/')+1:]
- response = polymer_json[1]['response']
- try:
- items = tab_with_content(response['contents']['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents']
- except KeyError:
- items = response['continuationContents']['sectionListContinuation']['contents']
-
- items_html = list_items_html(items)
-
- return yt_channel_items_template.substitute(
- header = common.get_header(),
- channel_title = html.escape(microformat['title']),
- channel_tabs = channel_tabs_html(channel_id, '', query),
- avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'],
- page_title = html.escape(query + ' - Channel search'),
- items = items_html,
- page_buttons = common.page_buttons_html(current_page, math.ceil(number_of_videos/29), URL_ORIGIN + "/channel/" + channel_id + "/search", current_query_string),
- number_of_results = '',
- )
def get_channel_search_json(channel_id, query, page):
- params = proto.string(2, 'search') + proto.string(15, str(page))
+ offset = proto.unpadded_b64encode(proto.uint(3, (page-1)*30))
+ params = proto.string(2, 'search') + proto.string(15, offset)
params = proto.percent_b64encode(params)
ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query)
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
- polymer_json = common.fetch_url("https://www.youtube.com/browse_ajax?ctoken=" + ctoken, common.desktop_ua + headers_1)
- '''with open('debug/channel_search_debug', 'wb') as f:
- f.write(polymer_json)'''
- polymer_json = json.loads(polymer_json)
+ key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
+ url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
+
+ data = {
+ 'context': {
+ 'client': {
+ 'hl': 'en',
+ 'gl': 'US',
+ 'clientName': 'WEB',
+ 'clientVersion': '2.20240327.00.00',
+ },
+ },
+ 'continuation': ctoken,
+ }
+
+ content_type_header = (('Content-Type', 'application/json'),)
+ polymer_json = util.fetch_url(
+ url, headers_desktop + content_type_header,
+ data=json.dumps(data), debug_name='channel_search')
return polymer_json
-
-def get_channel_page(url, query_string=''):
- path_components = url.rstrip('/').lstrip('/').split('/')
- channel_id = path_components[0]
- try:
- tab = path_components[1]
- except IndexError:
- tab = 'videos'
-
- parameters = urllib.parse.parse_qs(query_string)
- page_number = int(common.default_multi_get(parameters, 'page', 0, default='1'))
- sort = common.default_multi_get(parameters, 'sort', 0, default='3')
- view = common.default_multi_get(parameters, 'view', 0, default='1')
- query = common.default_multi_get(parameters, 'query', 0, default='')
-
- if tab == 'videos':
+
+def post_process_channel_info(info):
+ info['avatar'] = util.prefix_url(info['avatar'])
+ info['channel_url'] = util.prefix_url(info['channel_url'])
+ for item in info['items']:
+ # For playlists, use first_video_id for thumbnail, not playlist id
+ if item.get('type') == 'playlist' and item.get('first_video_id'):
+ item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['first_video_id'])
+ elif item.get('type') == 'video':
+ item['thumbnail'] = "https://i.ytimg.com/vi/{}/hq720.jpg".format(item['id'])
+ # For channels and other types, keep existing thumbnail
+ util.prefix_urls(item)
+ util.add_extra_html_info(item)
+ if info['current_tab'] == 'about':
+ for i, (text, url) in enumerate(info['links']):
+ if isinstance(url, str) and util.YOUTUBE_URL_RE.fullmatch(url):
+ info['links'][i] = (text, util.prefix_url(url))
+
+
+def get_channel_first_page(base_url=None, tab='videos', channel_id=None, sort=None):
+ if channel_id:
+ base_url = 'https://www.youtube.com/channel/' + channel_id
+
+ # Build URL with sort parameter
+ # YouTube URL sort params: p=popular, dd=newest, lad=newest no shorts
+ # Note: 'da' (oldest) was removed by YouTube in January 2026
+ url = base_url + '/' + tab + '?pbj=1&view=0'
+ if sort:
+ # Map sort values to YouTube's URL parameter values
+ sort_map = {'3': 'dd', '4': 'lad'}
+ url += '&sort=' + sort_map.get(sort, 'dd')
+
+ return util.fetch_url(url, headers_desktop, debug_name='gen_channel_' + tab)
+
+
+playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
+
+# youtube.com/[channel_id]/[tab]
+# youtube.com/user/[username]/[tab]
+# youtube.com/c/[custom]/[tab]
+# youtube.com/[custom]/[tab]
+def get_channel_page_general_url(base_url, tab, request, channel_id=None):
+
+ page_number = int(request.args.get('page', 1))
+ # sort 1: views
+ # sort 2: oldest
+ # sort 4: newest - no shorts (Just a kludge on our end, not internal to yt)
+ default_sort = '3' if settings.include_shorts_in_channel else '4'
+ sort = request.args.get('sort', default_sort)
+ view = request.args.get('view', '1')
+ query = request.args.get('query', '')
+ ctoken = request.args.get('ctoken', '')
+ include_shorts = (sort != '4')
+ default_params = (page_number == 1 and sort in ('3', '4') and view == '1')
+ continuation = bool(ctoken) # whether or not we're using a continuation
+ page_size = 30
+ try_channel_api = True
+ polymer_json = None
+
+ # Use the special UU playlist which contains all the channel's uploads
+ if tab == 'videos' and sort in ('3', '4'):
+ if not channel_id:
+ channel_id = get_channel_id(base_url)
+ if page_number == 1 and include_shorts:
+ tasks = (
+ gevent.spawn(playlist.playlist_first_page,
+ 'UU' + channel_id[2:],
+ report_text='Retrieved channel videos'),
+ gevent.spawn(get_metadata, channel_id),
+ )
+ gevent.joinall(tasks)
+ util.check_gevent_exceptions(*tasks)
+
+ # Ignore the metadata for now, it is cached and will be
+ # recalled later
+ pl_json = tasks[0].value
+ pl_info = yt_data_extract.extract_playlist_info(pl_json)
+ number_of_videos = pl_info['metadata']['video_count']
+ if number_of_videos is None:
+ number_of_videos = 1000
+ else:
+ set_cached_number_of_videos(channel_id, number_of_videos)
+ else:
+ tasks = (
+ gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:],
+ page_number, include_shorts=include_shorts),
+ gevent.spawn(get_metadata, channel_id),
+ gevent.spawn(get_number_of_videos_channel, channel_id),
+ )
+ gevent.joinall(tasks)
+ util.check_gevent_exceptions(*tasks)
+
+ pl_json = tasks[0].value
+ pl_info = yt_data_extract.extract_playlist_info(pl_json)
+ number_of_videos = tasks[2].value
+
+ info = pl_info
+ info['channel_id'] = channel_id
+ info['current_tab'] = 'videos'
+ if info['items']: # Success
+ page_size = 100
+ try_channel_api = False
+ else: # Try the first-page method next
+ try_channel_api = True
+
+ # Use the regular channel API
+ if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api):
+ if channel_id:
+ num_videos_call = (get_number_of_videos_channel, channel_id)
+ else:
+ num_videos_call = (get_number_of_videos_general, base_url)
+
+ # For page 1, use the first-page method which won't break
+ # Pass sort parameter directly (2=oldest, 3=newest, etc.)
+ if page_number == 1:
+ # Always use first-page method for page 1 with sort parameter
+ page_call = (get_channel_first_page, base_url, tab, None, sort)
+ else:
+ # For page 2+, we can't paginate without continuation tokens
+ # This is a YouTube limitation, not our bug
+ flask.abort(404, 'Pagination not available for this sort option. YouTube removed this feature.')
+
tasks = (
- gevent.spawn(get_number_of_videos, channel_id ),
- gevent.spawn(get_channel_tab, channel_id, page_number, sort, 'videos', view)
+ gevent.spawn(*num_videos_call),
+ gevent.spawn(*page_call),
)
gevent.joinall(tasks)
+ util.check_gevent_exceptions(*tasks)
number_of_videos, polymer_json = tasks[0].value, tasks[1].value
- return channel_videos_html(polymer_json, page_number, number_of_videos, query_string)
elif tab == 'about':
- polymer_json = common.fetch_url('https://www.youtube.com/channel/' + channel_id + '/about?pbj=1', common.desktop_ua + headers_1)
- polymer_json = json.loads(polymer_json)
- return channel_about_page(polymer_json)
+ # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about')
+ channel_id = get_channel_id(base_url)
+ ctoken = channel_about_ctoken(channel_id)
+ polymer_json = util.call_youtube_api('web', 'browse', {
+ 'continuation': ctoken,
+ })
+ continuation=True
+ elif tab == 'playlists' and page_number == 1:
+ # Use youtubei API instead of deprecated pbj=1 format
+ if not channel_id:
+ channel_id = get_channel_id(base_url)
+ ctoken = channel_ctoken_v3(channel_id, page='1', sort=sort, tab='playlists', view=view)
+ polymer_json = util.call_youtube_api('web', 'browse', {
+ 'continuation': ctoken,
+ })
+ continuation = True
elif tab == 'playlists':
- polymer_json = common.fetch_url('https://www.youtube.com/channel/' + channel_id + '/playlists?pbj=1&view=1', common.desktop_ua + headers_1)
- '''with open('debug/channel_playlists_debug', 'wb') as f:
- f.write(polymer_json)'''
- polymer_json = json.loads(polymer_json)
- return channel_playlists_html(polymer_json)
+ polymer_json = get_channel_tab(channel_id, page_number, sort,
+ 'playlists', view)
+ continuation = True
+ elif tab == 'search' and channel_id:
+ polymer_json = get_channel_search_json(channel_id, query, page_number)
elif tab == 'search':
- tasks = (
- gevent.spawn(get_number_of_videos, channel_id ),
- gevent.spawn(get_channel_search_json, channel_id, query, page_number)
+ url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='')
+ polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
+ elif tab == 'videos':
+ pass
+ else:
+ flask.abort(404, 'Unknown channel tab: ' + tab)
+
+ if polymer_json is not None:
+ info = yt_data_extract.extract_channel_info(
+ json.loads(polymer_json), tab, continuation=continuation
)
- gevent.joinall(tasks)
- number_of_videos, polymer_json = tasks[0].value, tasks[1].value
- return channel_search_page(polymer_json, query, page_number, number_of_videos, query_string)
- else:
- raise ValueError('Unknown channel tab: ' + tab)
-
-def get_user_page(url, query_string=''):
- path_components = url.rstrip('/').lstrip('/').split('/')
- username = path_components[0]
- try:
- page = path_components[1]
- except IndexError:
- page = 'videos'
- if page == 'videos':
- polymer_json = common.fetch_url('https://www.youtube.com/user/' + username + '/videos?pbj=1&view=0', common.desktop_ua + headers_1)
- polymer_json = json.loads(polymer_json)
- return channel_videos_html(polymer_json)
- elif page == 'about':
- polymer_json = common.fetch_url('https://www.youtube.com/user/' + username + '/about?pbj=1', common.desktop_ua + headers_1)
- polymer_json = json.loads(polymer_json)
- return channel_about_page(polymer_json)
- elif page == 'playlists':
- polymer_json = common.fetch_url('https://www.youtube.com/user/' + username + '/playlists?pbj=1&view=1', common.desktop_ua + headers_1)
- polymer_json = json.loads(polymer_json)
- return channel_playlists_html(polymer_json)
- elif page == 'search':
- raise NotImplementedError()
- '''polymer_json = common.fetch_url('https://www.youtube.com/user' + username + '/search?pbj=1&' + query_string, common.desktop_ua + headers_1)
- polymer_json = json.loads(polymer_json)
- return channel_search_page('''
+ if info['error'] is not None:
+ return flask.render_template('error.html', error_message=info['error'])
+
+ if channel_id:
+ info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
+ info['channel_id'] = channel_id
else:
- raise ValueError('Unknown channel page: ' + page) \ No newline at end of file
+ channel_id = info['channel_id']
+
+ # Will have microformat present, cache metadata while we have it
+ if channel_id and default_params and tab not in ('videos', 'about'):
+ metadata = extract_metadata_for_caching(info)
+ set_cached_metadata(channel_id, metadata)
+ # Otherwise, populate with our (hopefully cached) metadata
+ elif channel_id and info.get('channel_name') is None:
+ metadata = get_metadata(channel_id)
+ for key, value in metadata.items():
+ yt_data_extract.conservative_update(info, key, value)
+ # need to add this metadata to the videos/playlists
+ additional_info = {
+ 'author': info['channel_name'],
+ 'author_id': info['channel_id'],
+ 'author_url': info['channel_url'],
+ }
+ for item in info['items']:
+ item.update(additional_info)
+
+ if tab in ('videos', 'shorts', 'streams'):
+ info['number_of_videos'] = number_of_videos
+ info['number_of_pages'] = math.ceil(number_of_videos/page_size)
+ info['header_playlist_names'] = local_playlist.get_playlist_names()
+ if tab in ('videos', 'shorts', 'streams', 'playlists'):
+ info['current_sort'] = sort
+ elif tab == 'search':
+ info['search_box_value'] = query
+ info['header_playlist_names'] = local_playlist.get_playlist_names()
+ if tab in ('search', 'playlists'):
+ info['page_number'] = page_number
+ info['subscribed'] = subscriptions.is_subscribed(info['channel_id'])
+
+ post_process_channel_info(info)
+
+ return flask.render_template('channel.html',
+ parameters_dictionary = request.args,
+ **info
+ )
+
+
+@yt_app.route('/channel/<channel_id>/')
+@yt_app.route('/channel/<channel_id>/<tab>')
+def get_channel_page(channel_id, tab='videos'):
+ return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
+
+
+@yt_app.route('/user/<username>/')
+@yt_app.route('/user/<username>/<tab>')
+def get_user_page(username, tab='videos'):
+ return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
+
+
+@yt_app.route('/c/<custom>/')
+@yt_app.route('/c/<custom>/<tab>')
+def get_custom_c_page(custom, tab='videos'):
+ return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
+
+
+@yt_app.route('/<custom>')
+@yt_app.route('/<custom>/<tab>')
+def get_toplevel_custom_page(custom, tab='videos'):
+ return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)