aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--youtube/channel.py122
-rw-r--r--youtube/subscriptions.py2
-rw-r--r--youtube/yt_data_extract.py121
3 files changed, 124 insertions, 121 deletions
diff --git a/youtube/channel.py b/youtube/channel.py
index 79b7c9b..16d0a3f 100644
--- a/youtube/channel.py
+++ b/youtube/channel.py
@@ -137,124 +137,6 @@ def get_channel_search_json(channel_id, query, page):
return polymer_json
-def extract_info(polymer_json, tab):
- response = polymer_json[1]['response']
- try:
- microformat = response['microformat']['microformatDataRenderer']
-
- # channel doesn't exist or was terminated
- # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
- except KeyError:
- if 'alerts' in response and len(response['alerts']) > 0:
- result = ''
- for alert in response['alerts']:
- result += alert['alertRenderer']['text']['simpleText'] + '\n'
- flask.abort(200, result)
- elif 'errors' in response['responseContext']:
- for error in response['responseContext']['errors']['error']:
- if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id':
- flask.abort(404, 'This channel does not exist')
- raise
-
-
- info = {}
- info['current_tab'] = tab
-
-
- # stuff from microformat (info given by youtube for every page on channel)
- info['short_description'] = microformat['description']
- info['channel_name'] = microformat['title']
- info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url']
- channel_url = microformat['urlCanonical'].rstrip('/')
- channel_id = channel_url[channel_url.rfind('/')+1:]
- info['channel_id'] = channel_id
- info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
-
- info['items'] = []
-
- # empty channel
- if 'contents' not in response and 'continuationContents' not in response:
- return info
-
-
- # find the tab with content
- # example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg
- # TODO: maybe use the 'selected' attribute for this?
- if 'continuationContents' not in response:
- tab_renderer = None
- tab_content = None
- for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']:
- try:
- tab_renderer = tab_json['tabRenderer']
- except KeyError:
- tab_renderer = tab_json['expandableTabRenderer']
- try:
- tab_content = tab_renderer['content']
- break
- except KeyError:
- pass
- else: # didn't break
- raise Exception("No tabs found with content")
- assert tab == tab_renderer['title'].lower()
-
-
- # extract tab-specific info
- if tab in ('videos', 'playlists', 'search'): # find the list of items
- if 'continuationContents' in response:
- try:
- items = response['continuationContents']['gridContinuation']['items']
- except KeyError:
- items = response['continuationContents']['sectionListContinuation']['contents'] # for search
- else:
- contents = tab_content['sectionListRenderer']['contents']
- if 'itemSectionRenderer' in contents[0]:
- item_section = contents[0]['itemSectionRenderer']['contents'][0]
- try:
- items = item_section['gridRenderer']['items']
- except KeyError:
- if "messageRenderer" in item_section:
- items = []
- else:
- raise Exception('gridRenderer missing but messageRenderer not found')
- else:
- items = contents # for search
-
- additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id}
- info['items'] = [yt_data_extract.renderer_info(renderer, additional_info) for renderer in items]
-
- elif tab == 'about':
- channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer']
-
-
- info['links'] = []
- for link_json in channel_metadata.get('primaryLinks', ()):
- url = link_json['navigationEndpoint']['urlEndpoint']['url']
- if url.startswith('/redirect'): # youtube puts these on external links to do tracking
- query_string = url[url.find('?')+1: ]
- url = urllib.parse.parse_qs(query_string)['q'][0]
-
- text = yt_data_extract.get_plain_text(link_json['title'])
-
- info['links'].append( (text, url) )
-
-
- info['stats'] = []
- for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
- try:
- stat = channel_metadata[stat_name]
- except KeyError:
- continue
- info['stats'].append(yt_data_extract.get_plain_text(stat))
-
- if 'description' in channel_metadata:
- info['description'] = yt_data_extract.get_text(channel_metadata['description'])
- else:
- info['description'] = ''
-
- else:
- raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
-
- return info
def post_process_channel_info(info):
info['avatar'] = util.prefix_url(info['avatar'])
@@ -303,7 +185,7 @@ def get_channel_page(channel_id, tab='videos'):
flask.abort(404, 'Unknown channel tab: ' + tab)
- info = extract_info(json.loads(polymer_json), tab)
+ info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab)
post_process_channel_info(info)
if tab in ('videos', 'search'):
info['number_of_videos'] = number_of_videos
@@ -343,7 +225,7 @@ def get_channel_page_general_url(base_url, tab, request):
flask.abort(404, 'Unknown channel tab: ' + tab)
- info = extract_info(json.loads(polymer_json), tab)
+ info = yt_data_extract.extract_channel_info(json.loads(polymer_json), tab)
post_process_channel_info(info)
if tab in ('videos', 'search'):
info['number_of_videos'] = 1000
diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py
index 56bdf93..175622f 100644
--- a/youtube/subscriptions.py
+++ b/youtube/subscriptions.py
@@ -455,7 +455,7 @@ def _get_upstream_videos(channel_id):
print('Failed to read atoma feed for ' + channel_status_name)
traceback.print_exc()
- videos = channel.extract_info(json.loads(channel_tab), 'videos')['items']
+ videos = yt_data_extract.extract_channel_info(json.loads(channel_tab), 'videos')['items']
for i, video_item in enumerate(videos):
if 'description' not in video_item:
video_item['description'] = ''
diff --git a/youtube/yt_data_extract.py b/youtube/yt_data_extract.py
index 663edc4..c666ede 100644
--- a/youtube/yt_data_extract.py
+++ b/youtube/yt_data_extract.py
@@ -3,6 +3,7 @@ from youtube import util
import html
import json
import re
+import urllib
# videos (all of type str):
@@ -279,3 +280,123 @@ def parse_info_prepare_for_html(renderer, additional_info={}):
return item
+def extract_channel_info(polymer_json, tab):
+ response = polymer_json[1]['response']
+ try:
+ microformat = response['microformat']['microformatDataRenderer']
+
+ # channel doesn't exist or was terminated
+ # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
+ except KeyError:
+ if 'alerts' in response and len(response['alerts']) > 0:
+ result = ''
+ for alert in response['alerts']:
+ result += alert['alertRenderer']['text']['simpleText'] + '\n'
+ flask.abort(200, result)
+ elif 'errors' in response['responseContext']:
+ for error in response['responseContext']['errors']['error']:
+ if error['code'] == 'INVALID_VALUE' and error['location'] == 'browse_id':
+ flask.abort(404, 'This channel does not exist')
+ raise
+
+
+ info = {}
+ info['current_tab'] = tab
+
+
+ # stuff from microformat (info given by youtube for every page on channel)
+ info['short_description'] = microformat['description']
+ info['channel_name'] = microformat['title']
+ info['avatar'] = microformat['thumbnail']['thumbnails'][0]['url']
+ channel_url = microformat['urlCanonical'].rstrip('/')
+ channel_id = channel_url[channel_url.rfind('/')+1:]
+ info['channel_id'] = channel_id
+ info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
+
+ info['items'] = []
+
+ # empty channel
+ if 'contents' not in response and 'continuationContents' not in response:
+ return info
+
+
+ # find the tab with content
+ # example channel where tabs do not have definite index: https://www.youtube.com/channel/UC4gQ8i3FD7YbhOgqUkeQEJg
+ # TODO: maybe use the 'selected' attribute for this?
+ if 'continuationContents' not in response:
+ tab_renderer = None
+ tab_content = None
+ for tab_json in response['contents']['twoColumnBrowseResultsRenderer']['tabs']:
+ try:
+ tab_renderer = tab_json['tabRenderer']
+ except KeyError:
+ tab_renderer = tab_json['expandableTabRenderer']
+ try:
+ tab_content = tab_renderer['content']
+ break
+ except KeyError:
+ pass
+ else: # didn't break
+ raise Exception("No tabs found with content")
+ assert tab == tab_renderer['title'].lower()
+
+
+ # extract tab-specific info
+ if tab in ('videos', 'playlists', 'search'): # find the list of items
+ if 'continuationContents' in response:
+ try:
+ items = response['continuationContents']['gridContinuation']['items']
+ except KeyError:
+ items = response['continuationContents']['sectionListContinuation']['contents'] # for search
+ else:
+ contents = tab_content['sectionListRenderer']['contents']
+ if 'itemSectionRenderer' in contents[0]:
+ item_section = contents[0]['itemSectionRenderer']['contents'][0]
+ try:
+ items = item_section['gridRenderer']['items']
+ except KeyError:
+ if "messageRenderer" in item_section:
+ items = []
+ else:
+ raise Exception('gridRenderer missing but messageRenderer not found')
+ else:
+ items = contents # for search
+
+ additional_info = {'author': info['channel_name'], 'author_url': 'https://www.youtube.com/channel/' + channel_id}
+ info['items'] = [renderer_info(renderer, additional_info) for renderer in items]
+
+ elif tab == 'about':
+ channel_metadata = tab_content['sectionListRenderer']['contents'][0]['itemSectionRenderer']['contents'][0]['channelAboutFullMetadataRenderer']
+
+
+ info['links'] = []
+ for link_json in channel_metadata.get('primaryLinks', ()):
+ url = link_json['navigationEndpoint']['urlEndpoint']['url']
+ if url.startswith('/redirect'): # youtube puts these on external links to do tracking
+ query_string = url[url.find('?')+1: ]
+ url = urllib.parse.parse_qs(query_string)['q'][0]
+
+ text = get_plain_text(link_json['title'])
+
+ info['links'].append( (text, url) )
+
+
+ info['stats'] = []
+ for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
+ try:
+ stat = channel_metadata[stat_name]
+ except KeyError:
+ continue
+ info['stats'].append(get_plain_text(stat))
+
+ if 'description' in channel_metadata:
+ info['description'] = get_text(channel_metadata['description'])
+ else:
+ info['description'] = ''
+
+ else:
+ raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
+
+ return info
+
+