diff options
Diffstat (limited to 'youtube/channel.py')
-rw-r--r-- | youtube/channel.py | 528 |
1 files changed, 197 insertions, 331 deletions
diff --git a/youtube/channel.py b/youtube/channel.py index 1b345b5..4c7d380 100644 --- a/youtube/channel.py +++ b/youtube/channel.py @@ -1,7 +1,7 @@ import base64 -from youtube import util, yt_data_extract, html_common, subscriptions +from youtube import util, yt_data_extract, local_playlist +from youtube import yt_app -import http_errors import urllib import json from string import Template @@ -12,11 +12,8 @@ import gevent import re import functools -with open("yt_channel_items_template.html", "r") as file: - yt_channel_items_template = Template(file.read()) - -with open("yt_channel_about_template.html", "r") as file: - yt_channel_about_template = Template(file.read()) +import flask +from flask import request '''continuation = Proto( Field('optional', 'continuation', 80226972, Proto( @@ -91,16 +88,10 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1): url = "https://www.youtube.com/browse_ajax?ctoken=" + ctoken print("Sending channel tab ajax request") - content = util.fetch_url(url, util.desktop_ua + headers_1) + content = util.fetch_url(url, util.desktop_ua + headers_1, debug_name='channel_tab') print("Finished recieving channel tab response") - '''with open('debug/channel_debug', 'wb') as f: - f.write(content)''' - info = json.loads(content) - return info - - - + return content def get_number_of_videos(channel_id): # Uploads playlist @@ -110,15 +101,13 @@ def get_number_of_videos(channel_id): # Sometimes retrieving playlist info fails with 403 for no discernable reason try: - response = util.fetch_url(url, util.mobile_ua + headers_pbj) + response = util.fetch_url(url, util.mobile_ua + headers_pbj, debug_name='number_of_videos') except urllib.error.HTTPError as e: if e.code != 403: raise print("Couldn't retrieve number of videos") return 1000 - '''with open('debug/playlist_debug_metadata', 'wb') as f: - f.write(response)''' response = response.decode('utf-8') print("Got response for number of videos") @@ -136,71 +125,20 @@ def get_channel_id(username): response = util.fetch_url(url, util.mobile_ua + headers_1).decode('utf-8') return re.search(r'"channel_id":\s*"([a-zA-Z0-9_-]*)"', response).group(1) -def grid_items_html(items, additional_info={}): - result = ''' <nav class="item-grid">\n''' - for item in items: - result += html_common.renderer_html(item, additional_info) - result += '''\n</nav>''' - return result - -def list_items_html(items, additional_info={}): - result = ''' <nav class="item-list">''' - for item in items: - result += html_common.renderer_html(item, additional_info) - result += '''\n</nav>''' - return result - -channel_tab_template = Template('''\n<a class="tab page-button"$href_attribute>$tab_name</a>''') -channel_search_template = Template(''' - <form class="channel-search" action="$action"> - <input type="search" name="query" class="search-box" value="$search_box_value"> - <button type="submit" value="Search" class="search-button">Search</button> - </form>''') - -tabs = ('Videos', 'Playlists', 'About') -def channel_tabs_html(channel_id, current_tab, search_box_value=''): - result = '' - for tab_name in tabs: - if tab_name == current_tab: - result += channel_tab_template.substitute( - href_attribute = '', - tab_name = tab_name, - ) - else: - result += channel_tab_template.substitute( - href_attribute = ' href="' + util.URL_ORIGIN + '/channel/' + channel_id + '/' + tab_name.lower() + '"', - tab_name = tab_name, - ) - result += channel_search_template.substitute( - action = util.URL_ORIGIN + "/channel/" + channel_id + "/search", - search_box_value = html.escape(search_box_value), - ) - return result - -channel_sort_button_template = Template('''\n<a class="sort-button"$href_attribute>$text</a>''') -sorts = { - "videos": (('1', 'views'), ('2', 'oldest'), ('3', 'newest'),), - "playlists": (('2', 'oldest'), ('3', 'newest'), ('4', 'last video added'),), -} -def channel_sort_buttons_html(channel_id, tab, current_sort): - result = '' - for sort_number, sort_name in sorts[tab]: - if sort_number == str(current_sort): - result += channel_sort_button_template.substitute( - href_attribute='', - text = 'Sorted by ' + sort_name - ) - else: - result += channel_sort_button_template.substitute( - href_attribute=' href="' + util.URL_ORIGIN + '/channel/' + channel_id + '/' + tab + '?sort=' + sort_number + '"', - text = 'Sort by ' + sort_name - ) - return result +def get_channel_search_json(channel_id, query, page): + params = proto.string(2, 'search') + proto.string(15, str(page)) + params = proto.percent_b64encode(params) + ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query) + ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') + + polymer_json = util.fetch_url("https://www.youtube.com/browse_ajax?ctoken=" + ctoken, util.desktop_ua + headers_1, debug_name='channel_search') + return polymer_json -def get_microformat(response): +def extract_info(polymer_json, tab): + response = polymer_json[1]['response'] try: - return response['microformat']['microformatDataRenderer'] + microformat = response['microformat']['microformatDataRenderer'] # channel doesn't exist or was terminated # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org @@ -209,227 +147,136 @@ def get_microformat(response): result = '' for alert in response['alerts']: result += alert['alertRenderer']['text']['simpleText'] + '\n' - raise http_errors.Code200(result) + flask.abort(200, result) elif 'errors' in response['responseContext']: for error in response['responseContext']['errors']['error']: if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id': - raise http_errors.Error404('This channel does not exist') + flask.abort(404, 'This channel does not exist') raise -# example channel with no videos: https://www.youtube.com/user/jungleace -def get_grid_items(response): - try: - return response['continuationContents']['gridContinuation']['items'] - except KeyError: - try: - contents = response['contents'] - except KeyError: - return [] - - item_section = tab_with_content(contents['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0] - try: - return item_section['gridRenderer']['items'] - except KeyError: - if "messageRenderer" in item_section: - return [] - else: - raise + info = {} + info['current_tab'] = tab -def channel_videos_html(polymer_json, current_page=1, current_sort=3, number_of_videos = 1000, current_query_string=''): - response = polymer_json[1]['response'] - microformat = get_microformat(response) + + # stuff from microformat (info given by youtube for every page on channel) + info['short_description'] = microformat['description'] + info['channel_name'] = microformat['title'] + info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url'] channel_url = microformat['urlCanonical'].rstrip('/') channel_id = channel_url[channel_url.rfind('/')+1:] - if subscriptions.is_subscribed(channel_id): - action_name = 'Unsubscribe' - action = 'unsubscribe' - else: - action_name = 'Subscribe' - action = 'subscribe' + info['channel_id'] = channel_id + info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id + + info['items'] = [] + + # empty channel + if 'contents' not in response and 'continuationContents' not in response: + return info + + + # find the tab with content + # example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg + # TODO: maybe use the 'selected' attribute for this? + if 'continuationContents' not in response: + tab_renderer = None + tab_content = None + for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']: + try: + tab_renderer = tab_json['tabRenderer'] + except KeyError: + tab_renderer = tab_json['expandableTabRenderer'] + try: + tab_content = tab_renderer['content'] + break + except KeyError: + pass + else: # didn't break + raise Exception("No tabs found with content") + assert tab == tab_renderer['title'].lower() + + + # extract tab-specific info + if tab in ('videos', 'playlists', 'search'): # find the list of items + if 'continuationContents' in response: + try: + items = response['continuationContents']['gridContinuation']['items'] + except KeyError: + items = response['continuationContents']['sectionListContinuation']['contents'] # for search + else: + contents = tab_content['sectionListRenderer']['contents'] + if 'itemSectionRenderer' in contents[0]: + item_section = contents[0]['itemSectionRenderer']['contents'][0] + try: + items = item_section['gridRenderer']['items'] + except KeyError: + if "messageRenderer" in item_section: + items = [] + else: + raise Exception('gridRenderer missing but messageRenderer not found') + else: + items = contents # for search - items = get_grid_items(response) - items_html = grid_items_html(items, {'author': microformat['title']}) - - return yt_channel_items_template.substitute( - header = html_common.get_header(), - channel_title = microformat['title'], - channel_id = channel_id, - channel_tabs = channel_tabs_html(channel_id, 'Videos'), - sort_buttons = channel_sort_buttons_html(channel_id, 'videos', current_sort), - avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'], - page_title = microformat['title'] + ' - Channel', - items = items_html, - page_buttons = html_common.page_buttons_html(current_page, math.ceil(number_of_videos/30), util.URL_ORIGIN + "/channel/" + channel_id + "/videos", current_query_string), - number_of_results = '{:,}'.format(number_of_videos) + " videos", - action_name = action_name, - action = action, - ) + # TODO: Fix this URL prefixing shit + additional_info = {'author': info['channel_name'], 'author_url': '/channel/' + channel_id} + info['items'] = [yt_data_extract.renderer_info(renderer, additional_info) for renderer in items] -def channel_playlists_html(polymer_json, current_sort=3): - response = polymer_json[1]['response'] - microformat = get_microformat(response) - channel_url = microformat['urlCanonical'].rstrip('/') - channel_id = channel_url[channel_url.rfind('/')+1:] + elif tab == 'about': + channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer'] - if subscriptions.is_subscribed(channel_id): - action_name = 'Unsubscribe' - action = 'unsubscribe' - else: - action_name = 'Subscribe' - action = 'subscribe' - items = get_grid_items(response) - items_html = grid_items_html(items, {'author': microformat['title']}) - - return yt_channel_items_template.substitute( - header = html_common.get_header(), - channel_title = microformat['title'], - channel_id = channel_id, - channel_tabs = channel_tabs_html(channel_id, 'Playlists'), - sort_buttons = channel_sort_buttons_html(channel_id, 'playlists', current_sort), - avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'], - page_title = microformat['title'] + ' - Channel', - items = items_html, - page_buttons = '', - number_of_results = '', - action_name = action_name, - action = action, - ) + info['links'] = [] + for link_json in channel_metadata.get('primaryLinks', ()): + url = link_json['navigationEndpoint']['urlEndpoint']['url'] + if url.startswith('/redirect'): # youtube puts these on external links to do tracking + query_string = url[url.find('?')+1: ] + url = urllib.parse.parse_qs(query_string)['q'][0] -# Example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg -def tab_with_content(tabs): - for tab in tabs: - try: - renderer = tab['tabRenderer'] - except KeyError: - renderer = tab['expandableTabRenderer'] - try: - return renderer['content'] - except KeyError: - pass - - raise Exception("No tabs found with content") - -channel_link_template = Template(''' -<li><a href="$url">$text</a></li>''') -stat_template = Template(''' -<li>$stat_value</li>''') -def channel_about_page(polymer_json): - microformat = get_microformat(polymer_json[1]['response']) - avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'] - # my goodness... - channel_metadata = tab_with_content(polymer_json[1]['response']['contents']['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer'] - channel_links = '' - for link_json in channel_metadata.get('primaryLinks', ()): - url = link_json['navigationEndpoint']['urlEndpoint']['url'] - if url.startswith("/redirect"): - query_string = url[url.find('?')+1: ] - url = urllib.parse.parse_qs(query_string)['q'][0] - - channel_links += channel_link_template.substitute( - url = html.escape(url), - text = yt_data_extract.get_plain_text(link_json['title']), - ) + text = yt_data_extract.get_plain_text(link_json['title']) + + info['links'].append( (text, url) ) - stats = '' - for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): - try: - stat_value = yt_data_extract.get_plain_text(channel_metadata[stat_name]) - except KeyError: - continue - else: - stats += stat_template.substitute(stat_value=stat_value) + info['stats'] = [] + for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'): + try: + stat = channel_metadata[stat_name] + except KeyError: + continue + info['stats'].append(yt_data_extract.get_plain_text(stat)) + + if 'description' in channel_metadata: + info['description'] = yt_data_extract.get_text(channel_metadata['description']) + else: + info['description'] = '' - channel_id = channel_metadata['channelId'] - if subscriptions.is_subscribed(channel_id): - action_name = 'Unsubscribe' - action = 'unsubscribe' else: - action_name = 'Subscribe' - action = 'subscribe' + raise NotImplementedError('Unknown or unsupported channel tab: ' + tab) - try: - description = yt_data_extract.format_text_runs(yt_data_extract.get_formatted_text(channel_metadata['description'])) - except KeyError: - description = '' - return yt_channel_about_template.substitute( - header = html_common.get_header(), - page_title = yt_data_extract.get_plain_text(channel_metadata['title']) + ' - About', - channel_title = yt_data_extract.get_plain_text(channel_metadata['title']), - avatar = html.escape(avatar), - description = description, - links = channel_links, - stats = stats, - channel_id = channel_id, - channel_tabs = channel_tabs_html(channel_metadata['channelId'], 'About'), - action_name = action_name, - action = action, - ) + return info -def channel_search_page(polymer_json, query, current_page=1, number_of_videos = 1000, current_query_string=''): - response = polymer_json[1]['response'] - microformat = get_microformat(response) - channel_url = microformat['urlCanonical'].rstrip('/') - channel_id = channel_url[channel_url.rfind('/')+1:] +def post_process_channel_info(info): + info['avatar'] = '/' + info['avatar'] + info['channel_url'] = '/' + info['channel_url'] + for item in info['items']: + yt_data_extract.prefix_urls(item) + yt_data_extract.add_extra_html_info(item) - if subscriptions.is_subscribed(channel_id): - action_name = 'Unsubscribe' - action = 'unsubscribe' - else: - action_name = 'Subscribe' - action = 'subscribe' - try: - items = tab_with_content(response['contents']['twoColumnBrowseResultsRenderer']['tabs'])['sectionListRenderer']['contents'] - except KeyError: - items = response['continuationContents']['sectionListContinuation']['contents'] - - items_html = list_items_html(items) - - return yt_channel_items_template.substitute( - header = html_common.get_header(), - channel_title = html.escape(microformat['title']), - channel_id = channel_id, - channel_tabs = channel_tabs_html(channel_id, '', query), - avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'], - page_title = html.escape(query + ' - Channel search'), - items = items_html, - page_buttons = html_common.page_buttons_html(current_page, math.ceil(number_of_videos/29), util.URL_ORIGIN + "/channel/" + channel_id + "/search", current_query_string), - number_of_results = '', - sort_buttons = '', - action_name = action_name, - action = action, - ) -def get_channel_search_json(channel_id, query, page): - params = proto.string(2, 'search') + proto.string(15, str(page)) - params = proto.percent_b64encode(params) - ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query) - ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii') - polymer_json = util.fetch_url("https://www.youtube.com/browse_ajax?ctoken=" + ctoken, util.desktop_ua + headers_1) - '''with open('debug/channel_search_debug', 'wb') as f: - f.write(polymer_json)''' - polymer_json = json.loads(polymer_json) - return polymer_json - playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"} -def get_channel_page(env, start_response): - path_parts = env['path_parts'] - channel_id = path_parts[1] - try: - tab = path_parts[2] - except IndexError: - tab = 'videos' - - parameters = env['parameters'] - page_number = int(util.default_multi_get(parameters, 'page', 0, default='1')) - sort = util.default_multi_get(parameters, 'sort', 0, default='3') - view = util.default_multi_get(parameters, 'view', 0, default='1') - query = util.default_multi_get(parameters, 'query', 0, default='') + +@yt_app.route('/channel/<channel_id>/') +@yt_app.route('/channel/<channel_id>/<tab>') +def get_channel_page(channel_id, tab='videos'): + + page_number = int(request.args.get('page', 1)) + sort = request.args.get('sort', '3') + view = request.args.get('view', '1') + query = request.args.get('query', '') + if tab == 'videos': tasks = ( @@ -439,17 +286,10 @@ def get_channel_page(env, start_response): gevent.joinall(tasks) number_of_videos, polymer_json = tasks[0].value, tasks[1].value - result = channel_videos_html(polymer_json, page_number, sort, number_of_videos, env['QUERY_STRING']) elif tab == 'about': - polymer_json = util.fetch_url('https://www.youtube.com/channel/' + channel_id + '/about?pbj=1', util.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - result = channel_about_page(polymer_json) + polymer_json = util.fetch_url('https://www.youtube.com/channel/' + channel_id + '/about?pbj=1', util.desktop_ua + headers_1, debug_name='channel_about') elif tab == 'playlists': - polymer_json = util.fetch_url('https://www.youtube.com/channel/' + channel_id + '/playlists?pbj=1&view=1&sort=' + playlist_sort_codes[sort], util.desktop_ua + headers_1) - '''with open('debug/channel_playlists_debug', 'wb') as f: - f.write(polymer_json)''' - polymer_json = json.loads(polymer_json) - result = channel_playlists_html(polymer_json, sort) + polymer_json = util.fetch_url('https://www.youtube.com/channel/' + channel_id + '/playlists?pbj=1&view=1&sort=' + playlist_sort_codes[sort], util.desktop_ua + headers_1, debug_name='channel_playlists') elif tab == 'search': tasks = ( gevent.spawn(get_number_of_videos, channel_id ), @@ -458,54 +298,80 @@ def get_channel_page(env, start_response): gevent.joinall(tasks) number_of_videos, polymer_json = tasks[0].value, tasks[1].value - result = channel_search_page(polymer_json, query, page_number, number_of_videos, env['QUERY_STRING']) - else: - start_response('404 Not Found', [('Content-type', 'text/plain'),]) - return b'Unknown channel tab: ' + tab.encode('utf-8') - - start_response('200 OK', [('Content-type','text/html'),]) - return result.encode('utf-8') - -# youtube.com/user/[username]/[page] -# youtube.com/c/[custom]/[page] -# youtube.com/[custom]/[page] -def get_channel_page_general_url(env, start_response): - path_parts = env['path_parts'] - - is_toplevel = not path_parts[0] in ('user', 'c') - - if len(path_parts) + int(is_toplevel) == 3: # has /[page] after it - page = path_parts[2] - base_url = 'https://www.youtube.com/' + '/'.join(path_parts[0:-1]) - elif len(path_parts) + int(is_toplevel) == 2: # does not have /[page] after it, use /videos by default - page = 'videos' - base_url = 'https://www.youtube.com/' + '/'.join(path_parts) else: - start_response('404 Not Found', [('Content-type', 'text/plain'),]) - return b'Invalid channel url' - - if page == 'videos': - polymer_json = util.fetch_url(base_url + '/videos?pbj=1&view=0', util.desktop_ua + headers_1) - '''with open('debug/user_page_videos', 'wb') as f: - f.write(polymer_json)''' - polymer_json = json.loads(polymer_json) - result = channel_videos_html(polymer_json) - elif page == 'about': - polymer_json = util.fetch_url(base_url + '/about?pbj=1', util.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - result = channel_about_page(polymer_json) - elif page == 'playlists': - polymer_json = util.fetch_url(base_url+ '/playlists?pbj=1&view=1', util.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - result = channel_playlists_html(polymer_json) - elif page == 'search': + flask.abort(404, 'Unknown channel tab: ' + tab) + + + info = extract_info(json.loads(polymer_json), tab) + post_process_channel_info(info) + if tab in ('videos', 'search'): + info['number_of_videos'] = number_of_videos + info['number_of_pages'] = math.ceil(number_of_videos/30) + info['header_playlist_names'] = local_playlist.get_playlist_names() + if tab in ('videos', 'playlists'): + info['current_sort'] = sort + elif tab == 'search': + info['search_box_value'] = query + + + return flask.render_template('channel.html', + parameters_dictionary = request.args, + **info + ) + + +# youtube.com/user/[username]/[tab] +# youtube.com/c/[custom]/[tab] +# youtube.com/[custom]/[tab] +def get_channel_page_general_url(base_url, tab, request): + + page_number = int(request.args.get('page', 1)) + sort = request.args.get('sort', '3') + view = request.args.get('view', '1') + query = request.args.get('query', '') + + if tab == 'videos': + polymer_json = util.fetch_url(base_url + '/videos?pbj=1&view=0', util.desktop_ua + headers_1, debug_name='gen_channel_videos') + elif tab == 'about': + polymer_json = util.fetch_url(base_url + '/about?pbj=1', util.desktop_ua + headers_1, debug_name='gen_channel_about') + elif tab == 'playlists': + polymer_json = util.fetch_url(base_url+ '/playlists?pbj=1&view=1', util.desktop_ua + headers_1, debug_name='gen_channel_playlists') + elif tab == 'search': raise NotImplementedError() - '''polymer_json = util.fetch_url('https://www.youtube.com/user' + username + '/search?pbj=1&' + query_string, util.desktop_ua + headers_1) - polymer_json = json.loads(polymer_json) - return channel_search_page(''' else: - start_response('404 Not Found', [('Content-type', 'text/plain'),]) - return b'Unknown channel page: ' + page.encode('utf-8') + flask.abort(404, 'Unknown channel tab: ' + tab) + + + info = extract_info(json.loads(polymer_json), tab) + post_process_channel_info(info) + if tab in ('videos', 'search'): + info['number_of_videos'] = 1000 + info['number_of_pages'] = math.ceil(1000/30) + info['header_playlist_names'] = local_playlist.get_playlist_names() + if tab in ('videos', 'playlists'): + info['current_sort'] = sort + elif tab == 'search': + info['search_box_value'] = query + + + return flask.render_template('channel.html', + parameters_dictionary = request.args, + **info + ) + + +@yt_app.route('/user/<username>/') +@yt_app.route('/user/<username>/<tab>') +def get_user_page(username, tab='videos'): + return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request) + +@yt_app.route('/c/<custom>/') +@yt_app.route('/c/<custom>/<tab>') +def get_custom_c_page(custom, tab='videos'): + return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request) + +@yt_app.route('/<custom>') +@yt_app.route('/<custom>/<tab>') +def get_toplevel_custom_page(custom, tab='videos'): + return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request) - start_response('200 OK', [('Content-type','text/html'),]) - return result.encode('utf-8') |