aboutsummaryrefslogtreecommitdiffstats
path: root/youtube
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-06-01 23:23:18 -0700
committerJames Taylor <user234683@users.noreply.github.com>2019-06-02 02:25:39 -0700
commitaf9c4e0554c3475d959014e9e7cef78eff88afa5 (patch)
treeced7a2ccd6d0ab8e9d251dcd61bba09f3bb87074 /youtube
parent3905e7e64059b45479894ba1fdfb0ef9cef64475 (diff)
parent9f93b9429c77e631972186049fbc7518e2cf5d4b (diff)
downloadyt-local-af9c4e0554c3475d959014e9e7cef78eff88afa5.tar.lz
yt-local-af9c4e0554c3475d959014e9e7cef78eff88afa5.tar.xz
yt-local-af9c4e0554c3475d959014e9e7cef78eff88afa5.zip
Bring up to date with master
Diffstat (limited to 'youtube')
-rw-r--r--youtube/accounts.py18
-rw-r--r--youtube/channel.py64
-rw-r--r--youtube/comments.py72
-rw-r--r--youtube/html_common.py (renamed from youtube/common.py)395
-rw-r--r--youtube/local_playlist.py23
-rw-r--r--youtube/playlist.py30
-rw-r--r--youtube/post_comment.py36
-rw-r--r--youtube/proto.py2
-rw-r--r--youtube/search.py25
-rw-r--r--youtube/subscriptions.py12
-rw-r--r--youtube/util.py229
-rw-r--r--youtube/watch.py30
-rw-r--r--youtube/youtube.py4
-rw-r--r--youtube/yt_data_extract.py205
14 files changed, 612 insertions, 533 deletions
diff --git a/youtube/accounts.py b/youtube/accounts.py
index bde9852..375bf2a 100644
--- a/youtube/accounts.py
+++ b/youtube/accounts.py
@@ -1,10 +1,10 @@
# Contains functions having to do with logging in
+from youtube import util, html_common
+import settings
import urllib
import json
-from youtube import common
import re
-import settings
import http.cookiejar
import io
import os
@@ -106,7 +106,7 @@ def get_account_login_page(env, start_response):
'''
page = '''
- <form action="''' + common.URL_ORIGIN + '''/login" method="POST">
+ <form action="''' + util.URL_ORIGIN + '''/login" method="POST">
<div class="form-field">
<label for="username">Username:</label>
<input type="text" id="username" name="username">
@@ -130,10 +130,10 @@ Using Tor to log in should only be done if the account was created using a proxy
</div>
'''
- return common.yt_basic_template.substitute(
+ return html_common.yt_basic_template.substitute(
page_title = "Login",
style = style,
- header = common.get_header(),
+ header = html_common.get_header(),
page = page,
).encode('utf-8')
@@ -229,7 +229,7 @@ def _login(username, password, cookiejar, use_tor):
Taken from youtube-dl
"""
- login_page = common.fetch_url(_LOGIN_URL, yt_dl_headers, report_text='Downloaded login page', cookiejar_receive=cookiejar, use_tor=use_tor).decode('utf-8')
+ login_page = util.fetch_url(_LOGIN_URL, yt_dl_headers, report_text='Downloaded login page', cookiejar_receive=cookiejar, use_tor=use_tor).decode('utf-8')
'''with open('debug/login_page', 'w', encoding='utf-8') as f:
f.write(login_page)'''
#print(cookiejar.as_lwp_str())
@@ -255,7 +255,7 @@ def _login(username, password, cookiejar, use_tor):
'Google-Accounts-XSRF': 1,
}
headers.update(yt_dl_headers)
- result = common.fetch_url(url, headers, report_text=note, data=data, cookiejar_send=cookiejar, cookiejar_receive=cookiejar, use_tor=use_tor).decode('utf-8')
+ result = util.fetch_url(url, headers, report_text=note, data=data, cookiejar_send=cookiejar, cookiejar_receive=cookiejar, use_tor=use_tor).decode('utf-8')
#print(cookiejar.as_lwp_str())
'''with open('debug/' + note, 'w', encoding='utf-8') as f:
f.write(result)'''
@@ -387,7 +387,7 @@ def _login(username, password, cookiejar, use_tor):
return False
try:
- check_cookie_results = common.fetch_url(check_cookie_url, headers=yt_dl_headers, report_text="Checked cookie", cookiejar_send=cookiejar, cookiejar_receive=cookiejar, use_tor=use_tor).decode('utf-8')
+ check_cookie_results = util.fetch_url(check_cookie_url, headers=yt_dl_headers, report_text="Checked cookie", cookiejar_send=cookiejar, cookiejar_receive=cookiejar, use_tor=use_tor).decode('utf-8')
except (urllib.error.URLError, compat_http_client.HTTPException, socket.error) as err:
return False
@@ -398,7 +398,7 @@ def _login(username, password, cookiejar, use_tor):
warn('Unable to log in')
return False
- select_site_page = common.fetch_url('https://m.youtube.com/select_site', headers=common.mobile_ua, report_text="Retrieved page for channel id", cookiejar_send=cookiejar, use_tor=use_tor).decode('utf-8')
+ select_site_page = util.fetch_url('https://m.youtube.com/select_site', headers=util.mobile_ua, report_text="Retrieved page for channel id", cookiejar_send=cookiejar, use_tor=use_tor).decode('utf-8')
match = _CHANNEL_ID_RE.search(select_site_page)
if match is None:
warn('Failed to find channel id')
diff --git a/youtube/channel.py b/youtube/channel.py
index c83d7d1..55316e2 100644
--- a/youtube/channel.py
+++ b/youtube/channel.py
@@ -1,6 +1,6 @@
import base64
-import youtube.common as common
-from youtube.common import default_multi_get, URL_ORIGIN, get_thumbnail_url, video_id
+from youtube import util, yt_data_extract, html_common
+
import http_errors
import urllib
import json
@@ -91,7 +91,7 @@ def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1):
url = "https://www.youtube.com/browse_ajax?ctoken=" + ctoken
print("Sending channel tab ajax request")
- content = common.fetch_url(url, common.desktop_ua + headers_1)
+ content = util.fetch_url(url, util.desktop_ua + headers_1)
print("Finished recieving channel tab response")
'''with open('debug/channel_debug', 'wb') as f:
@@ -110,7 +110,7 @@ def get_number_of_videos(channel_id):
# Sometimes retrieving playlist info fails with 403 for no discernable reason
try:
- response = common.fetch_url(url, common.mobile_ua + headers_pbj)
+ response = util.fetch_url(url, util.mobile_ua + headers_pbj)
except urllib.error.HTTPError as e:
if e.code != 403:
raise
@@ -133,20 +133,20 @@ def get_channel_id(username):
# method that gives the smallest possible response at ~10 kb
# needs to be as fast as possible
url = 'https://m.youtube.com/user/' + username + '/about?ajax=1&disable_polymer=true'
- response = common.fetch_url(url, common.mobile_ua + headers_1).decode('utf-8')
+ response = util.fetch_url(url, util.mobile_ua + headers_1).decode('utf-8')
return re.search(r'"channel_id":\s*"([a-zA-Z0-9_-]*)"', response).group(1)
def grid_items_html(items, additional_info={}):
result = ''' <nav class="item-grid">\n'''
for item in items:
- result += common.renderer_html(item, additional_info)
+ result += html_common.renderer_html(item, additional_info)
result += '''\n</nav>'''
return result
def list_items_html(items, additional_info={}):
result = ''' <nav class="item-list">'''
for item in items:
- result += common.renderer_html(item, additional_info)
+ result += html_common.renderer_html(item, additional_info)
result += '''\n</nav>'''
return result
@@ -168,11 +168,11 @@ def channel_tabs_html(channel_id, current_tab, search_box_value=''):
)
else:
result += channel_tab_template.substitute(
- href_attribute = ' href="' + URL_ORIGIN + '/channel/' + channel_id + '/' + tab_name.lower() + '"',
+ href_attribute = ' href="' + util.URL_ORIGIN + '/channel/' + channel_id + '/' + tab_name.lower() + '"',
tab_name = tab_name,
)
result += channel_search_template.substitute(
- action = URL_ORIGIN + "/channel/" + channel_id + "/search",
+ action = util.URL_ORIGIN + "/channel/" + channel_id + "/search",
search_box_value = html.escape(search_box_value),
)
return result
@@ -192,7 +192,7 @@ def channel_sort_buttons_html(channel_id, tab, current_sort):
)
else:
result += channel_sort_button_template.substitute(
- href_attribute=' href="' + URL_ORIGIN + '/channel/' + channel_id + '/' + tab + '?sort=' + sort_number + '"',
+ href_attribute=' href="' + util.URL_ORIGIN + '/channel/' + channel_id + '/' + tab + '?sort=' + sort_number + '"',
text = 'Sort by ' + sort_name
)
return result
@@ -246,7 +246,7 @@ def channel_videos_html(polymer_json, current_page=1, current_sort=3, number_of_
items_html = grid_items_html(items, {'author': microformat['title']})
return yt_channel_items_template.substitute(
- header = common.get_header(),
+ header = html_common.get_header(),
channel_title = microformat['title'],
channel_id = channel_id,
channel_tabs = channel_tabs_html(channel_id, 'Videos'),
@@ -254,7 +254,7 @@ def channel_videos_html(polymer_json, current_page=1, current_sort=3, number_of_
avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'],
page_title = microformat['title'] + ' - Channel',
items = items_html,
- page_buttons = common.page_buttons_html(current_page, math.ceil(number_of_videos/30), URL_ORIGIN + "/channel/" + channel_id + "/videos", current_query_string),
+ page_buttons = html_common.page_buttons_html(current_page, math.ceil(number_of_videos/30), util.URL_ORIGIN + "/channel/" + channel_id + "/videos", current_query_string),
number_of_results = '{:,}'.format(number_of_videos) + " videos",
)
@@ -268,7 +268,7 @@ def channel_playlists_html(polymer_json, current_sort=3):
items_html = grid_items_html(items, {'author': microformat['title']})
return yt_channel_items_template.substitute(
- header = common.get_header(),
+ header = html_common.get_header(),
channel_title = microformat['title'],
channel_id = channel_id,
channel_tabs = channel_tabs_html(channel_id, 'Playlists'),
@@ -312,25 +312,25 @@ def channel_about_page(polymer_json):
channel_links += channel_link_template.substitute(
url = html.escape(url),
- text = common.get_plain_text(link_json['title']),
+ text = yt_data_extract.get_plain_text(link_json['title']),
)
stats = ''
for stat_name in ('subscriberCountText', 'joinedDateText', 'viewCountText', 'country'):
try:
- stat_value = common.get_plain_text(channel_metadata[stat_name])
+ stat_value = yt_data_extract.get_plain_text(channel_metadata[stat_name])
except KeyError:
continue
else:
stats += stat_template.substitute(stat_value=stat_value)
try:
- description = common.format_text_runs(common.get_formatted_text(channel_metadata['description']))
+ description = yt_data_extract.format_text_runs(yt_data_extract.get_formatted_text(channel_metadata['description']))
except KeyError:
description = ''
return yt_channel_about_template.substitute(
- header = common.get_header(),
- page_title = common.get_plain_text(channel_metadata['title']) + ' - About',
- channel_title = common.get_plain_text(channel_metadata['title']),
+ header = html_common.get_header(),
+ page_title = yt_data_extract.get_plain_text(channel_metadata['title']) + ' - About',
+ channel_title = yt_data_extract.get_plain_text(channel_metadata['title']),
avatar = html.escape(avatar),
description = description,
links = channel_links,
@@ -354,14 +354,14 @@ def channel_search_page(polymer_json, query, current_page=1, number_of_videos =
items_html = list_items_html(items)
return yt_channel_items_template.substitute(
- header = common.get_header(),
+ header = html_common.get_header(),
channel_title = html.escape(microformat['title']),
channel_id = channel_id,
channel_tabs = channel_tabs_html(channel_id, '', query),
avatar = '/' + microformat['thumbnail']['thumbnails'][0]['url'],
page_title = html.escape(query + ' - Channel search'),
items = items_html,
- page_buttons = common.page_buttons_html(current_page, math.ceil(number_of_videos/29), URL_ORIGIN + "/channel/" + channel_id + "/search", current_query_string),
+ page_buttons = html_common.page_buttons_html(current_page, math.ceil(number_of_videos/29), util.URL_ORIGIN + "/channel/" + channel_id + "/search", current_query_string),
number_of_results = '',
sort_buttons = '',
)
@@ -371,7 +371,7 @@ def get_channel_search_json(channel_id, query, page):
ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query)
ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
- polymer_json = common.fetch_url("https://www.youtube.com/browse_ajax?ctoken=" + ctoken, common.desktop_ua + headers_1)
+ polymer_json = util.fetch_url("https://www.youtube.com/browse_ajax?ctoken=" + ctoken, util.desktop_ua + headers_1)
'''with open('debug/channel_search_debug', 'wb') as f:
f.write(polymer_json)'''
polymer_json = json.loads(polymer_json)
@@ -388,10 +388,10 @@ def get_channel_page(env, start_response):
tab = 'videos'
parameters = env['parameters']
- page_number = int(common.default_multi_get(parameters, 'page', 0, default='1'))
- sort = common.default_multi_get(parameters, 'sort', 0, default='3')
- view = common.default_multi_get(parameters, 'view', 0, default='1')
- query = common.default_multi_get(parameters, 'query', 0, default='')
+ page_number = int(util.default_multi_get(parameters, 'page', 0, default='1'))
+ sort = util.default_multi_get(parameters, 'sort', 0, default='3')
+ view = util.default_multi_get(parameters, 'view', 0, default='1')
+ query = util.default_multi_get(parameters, 'query', 0, default='')
if tab == 'videos':
tasks = (
@@ -403,11 +403,11 @@ def get_channel_page(env, start_response):
result = channel_videos_html(polymer_json, page_number, sort, number_of_videos, env['QUERY_STRING'])
elif tab == 'about':
- polymer_json = common.fetch_url('https://www.youtube.com/channel/' + channel_id + '/about?pbj=1', common.desktop_ua + headers_1)
+ polymer_json = util.fetch_url('https://www.youtube.com/channel/' + channel_id + '/about?pbj=1', util.desktop_ua + headers_1)
polymer_json = json.loads(polymer_json)
result = channel_about_page(polymer_json)
elif tab == 'playlists':
- polymer_json = common.fetch_url('https://www.youtube.com/channel/' + channel_id + '/playlists?pbj=1&view=1&sort=' + playlist_sort_codes[sort], common.desktop_ua + headers_1)
+ polymer_json = util.fetch_url('https://www.youtube.com/channel/' + channel_id + '/playlists?pbj=1&view=1&sort=' + playlist_sort_codes[sort], util.desktop_ua + headers_1)
'''with open('debug/channel_playlists_debug', 'wb') as f:
f.write(polymer_json)'''
polymer_json = json.loads(polymer_json)
@@ -447,22 +447,22 @@ def get_channel_page_general_url(env, start_response):
return b'Invalid channel url'
if page == 'videos':
- polymer_json = common.fetch_url(base_url + '/videos?pbj=1&view=0', common.desktop_ua + headers_1)
+ polymer_json = util.fetch_url(base_url + '/videos?pbj=1&view=0', util.desktop_ua + headers_1)
'''with open('debug/user_page_videos', 'wb') as f:
f.write(polymer_json)'''
polymer_json = json.loads(polymer_json)
result = channel_videos_html(polymer_json)
elif page == 'about':
- polymer_json = common.fetch_url(base_url + '/about?pbj=1', common.desktop_ua + headers_1)
+ polymer_json = util.fetch_url(base_url + '/about?pbj=1', util.desktop_ua + headers_1)
polymer_json = json.loads(polymer_json)
result = channel_about_page(polymer_json)
elif page == 'playlists':
- polymer_json = common.fetch_url(base_url+ '/playlists?pbj=1&view=1', common.desktop_ua + headers_1)
+ polymer_json = util.fetch_url(base_url+ '/playlists?pbj=1&view=1', util.desktop_ua + headers_1)
polymer_json = json.loads(polymer_json)
result = channel_playlists_html(polymer_json)
elif page == 'search':
raise NotImplementedError()
- '''polymer_json = common.fetch_url('https://www.youtube.com/user' + username + '/search?pbj=1&' + query_string, common.desktop_ua + headers_1)
+ '''polymer_json = util.fetch_url('https://www.youtube.com/user' + username + '/search?pbj=1&' + query_string, util.desktop_ua + headers_1)
polymer_json = json.loads(polymer_json)
return channel_search_page('''
else:
diff --git a/youtube/comments.py b/youtube/comments.py
index 10209e7..94b086e 100644
--- a/youtube/comments.py
+++ b/youtube/comments.py
@@ -1,13 +1,14 @@
+from youtube import proto, util, html_common, yt_data_extract, accounts
+import settings
+
import json
-from youtube import proto, common, accounts
import base64
-from youtube.common import uppercase_escape, default_multi_get, format_text_runs, URL_ORIGIN, fetch_url
from string import Template
import urllib.request
import urllib
import html
-import settings
import re
+
comment_area_template = Template('''
<section class="comment-area">
$video-metadata
@@ -130,7 +131,7 @@ def request_comments(ctoken, replies=False):
url = base_url + ctoken.replace("=", "%3D") + "&pbj=1"
for i in range(0,8): # don't retry more than 8 times
- content = fetch_url(url, headers=mobile_headers, report_text="Retrieved comments")
+ content = util.fetch_url(url, headers=mobile_headers, report_text="Retrieved comments")
if content[0:4] == b")]}'": # random closing characters included at beginning of response for some reason
content = content[4:]
elif content[0:10] == b'\n<!DOCTYPE': # occasionally returns html instead of json for no reason
@@ -151,10 +152,10 @@ def single_comment_ctoken(video_id, comment_id):
def parse_comments_ajax(content, replies=False):
try:
- content = json.loads(uppercase_escape(content.decode('utf-8')))
+ content = json.loads(util.uppercase_escape(content.decode('utf-8')))
#print(content)
comments_raw = content['content']['continuation_contents']['contents']
- ctoken = default_multi_get(content, 'content', 'continuation_contents', 'continuations', 0, 'continuation', default='')
+ ctoken = util.default_multi_get(content, 'content', 'continuation_contents', 'continuations', 0, 'continuation', default='')
comments = []
for comment_raw in comments_raw:
@@ -163,7 +164,7 @@ def parse_comments_ajax(content, replies=False):
if comment_raw['replies'] is not None:
reply_ctoken = comment_raw['replies']['continuations'][0]['continuation']
comment_id, video_id = get_ids(reply_ctoken)
- replies_url = URL_ORIGIN + '/comments?parent_id=' + comment_id + "&video_id=" + video_id
+ replies_url = util.URL_ORIGIN + '/comments?parent_id=' + comment_id + "&video_id=" + video_id
comment_raw = comment_raw['comment']
comment = {
'author': comment_raw['author']['runs'][0]['text'],
@@ -189,7 +190,7 @@ reply_count_regex = re.compile(r'(\d+)')
def parse_comments_polymer(content, replies=False):
try:
video_title = ''
- content = json.loads(uppercase_escape(content.decode('utf-8')))
+ content = json.loads(util.uppercase_escape(content.decode('utf-8')))
url = content[1]['url']
ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
video_id = ctoken_metadata(ctoken)['video_id']
@@ -200,7 +201,7 @@ def parse_comments_polymer(content, replies=False):
comments_raw = content[1]['response']['continuationContents']['commentRepliesContinuation']['contents']
replies = True
- ctoken = default_multi_get(content, 1, 'response', 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='')
+ ctoken = util.default_multi_get(content, 1, 'response', 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='')
comments = []
for comment_raw in comments_raw:
@@ -219,8 +220,8 @@ def parse_comments_polymer(content, replies=False):
if 'replies' in comment_raw:
#reply_ctoken = comment_raw['replies']['commentRepliesRenderer']['continuations'][0]['nextContinuationData']['continuation']
#comment_id, video_id = get_ids(reply_ctoken)
- replies_url = URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id
- view_replies_text = common.get_plain_text(comment_raw['replies']['commentRepliesRenderer']['moreText'])
+ replies_url = util.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id
+ view_replies_text = yt_data_extract.get_plain_text(comment_raw['replies']['commentRepliesRenderer']['moreText'])
match = reply_count_regex.search(view_replies_text)
if match is None:
view_replies_text = '1 reply'
@@ -228,24 +229,31 @@ def parse_comments_polymer(content, replies=False):
view_replies_text = match.group(1) + " replies"
elif not replies:
view_replies_text = "Reply"
- replies_url = URL_ORIGIN + '/post_comment?parent_id=' + parent_id + "&video_id=" + video_id
+ replies_url = util.URL_ORIGIN + '/post_comment?parent_id=' + parent_id + "&video_id=" + video_id
comment_raw = comment_raw['comment']
comment_raw = comment_raw['commentRenderer']
comment = {
- 'author': common.get_plain_text(comment_raw['authorText']),
- 'author_url': comment_raw['authorEndpoint']['commandMetadata']['webCommandMetadata']['url'],
- 'author_channel_id': comment_raw['authorEndpoint']['browseEndpoint']['browseId'],
- 'author_id': comment_raw['authorId'],
+ 'author_id': comment_raw.get('authorId', ''),
'author_avatar': comment_raw['authorThumbnail']['thumbnails'][0]['url'],
'likes': comment_raw['likeCount'],
- 'published': common.get_plain_text(comment_raw['publishedTimeText']),
+ 'published': yt_data_extract.get_plain_text(comment_raw['publishedTimeText']),
'text': comment_raw['contentText'].get('runs', ''),
'view_replies_text': view_replies_text,
'replies_url': replies_url,
'video_id': video_id,
'comment_id': comment_raw['commentId'],
}
+
+ if 'authorText' in comment_raw: # deleted channels have no name or channel link
+ comment['author'] = yt_data_extract.get_plain_text(comment_raw['authorText'])
+ comment['author_url'] = comment_raw['authorEndpoint']['commandMetadata']['webCommandMetadata']['url']
+ comment['author_channel_id'] = comment_raw['authorEndpoint']['browseEndpoint']['browseId']
+ else:
+ comment['author'] = ''
+ comment['author_url'] = ''
+ comment['author_channel_id'] = ''
+
comments.append(comment)
except Exception as e:
print('Error parsing comments: ' + str(e))
@@ -264,13 +272,13 @@ def get_comments_html(comments):
replies = reply_link_template.substitute(url=comment['replies_url'], view_replies_text=html.escape(comment['view_replies_text']))
if settings.enable_comment_avatars:
avatar = comment_avatar_template.substitute(
- author_url = URL_ORIGIN + comment['author_url'],
+ author_url = util.URL_ORIGIN + comment['author_url'],
author_avatar = '/' + comment['author_avatar'],
)
else:
avatar = ''
if comment['author_channel_id'] in accounts.accounts:
- delete_url = (URL_ORIGIN + '/delete_comment?video_id='
+ delete_url = (util.URL_ORIGIN + '/delete_comment?video_id='
+ comment['video_id']
+ '&channel_id='+ comment['author_channel_id']
+ '&author_id=' + comment['author_id']
@@ -280,14 +288,14 @@ def get_comments_html(comments):
else:
action_buttons = ''
- permalink = URL_ORIGIN + '/watch?v=' + comment['video_id'] + '&lc=' + comment['comment_id']
+ permalink = util.URL_ORIGIN + '/watch?v=' + comment['video_id'] + '&lc=' + comment['comment_id']
html_result += comment_template.substitute(
author=comment['author'],
- author_url = URL_ORIGIN + comment['author_url'],
+ author_url = util.URL_ORIGIN + comment['author_url'],
avatar = avatar,
likes = str(comment['likes']) + ' likes' if str(comment['likes']) != '0' else '',
published = comment['published'],
- text = format_text_runs(comment['text']),
+ text = yt_data_extract.format_text_runs(comment['text']),
datetime = '', #TODO
replies = replies,
action_buttons = action_buttons,
@@ -297,10 +305,10 @@ def get_comments_html(comments):
def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if settings.enable_comments:
- post_comment_url = common.URL_ORIGIN + "/post_comment?video_id=" + video_id
+ post_comment_url = util.URL_ORIGIN + "/post_comment?video_id=" + video_id
post_comment_link = '''<a class="sort-button" href="''' + post_comment_url + '''">Post comment</a>'''
- other_sort_url = common.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
+ other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
other_sort_name = 'newest' if sort == 0 else 'top'
other_sort_link = '''<a class="sort-button" href="''' + other_sort_url + '''">Sort by ''' + other_sort_name + '''</a>'''
@@ -314,7 +322,7 @@ def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
if ctoken == '':
more_comments_button = ''
else:
- more_comments_button = more_comments_template.substitute(url = common.URL_ORIGIN + '/comments?ctoken=' + ctoken)
+ more_comments_button = more_comments_template.substitute(url = util.URL_ORIGIN + '/comments?ctoken=' + ctoken)
result = '''<section class="comments-area">\n'''
result += comment_links + '\n'
@@ -350,7 +358,7 @@ comment_box_template = Template('''
<select id="account-selection" name="channel_id">
$options
</select>
- <a href="''' + common.URL_ORIGIN + '''/login" target="_blank">Add account</a>
+ <a href="''' + util.URL_ORIGIN + '''/login" target="_blank">Add account</a>
</div>
<textarea name="comment_text"></textarea>
$video_id_input
@@ -359,7 +367,7 @@ $options
def get_comments_page(env, start_response):
start_response('200 OK', [('Content-type','text/html'),] )
parameters = env['parameters']
- ctoken = default_multi_get(parameters, 'ctoken', 0, default='')
+ ctoken = util.default_multi_get(parameters, 'ctoken', 0, default='')
replies = False
if not ctoken:
video_id = parameters['video_id'][0]
@@ -384,17 +392,17 @@ def get_comments_page(env, start_response):
page_number = page_number,
sort = 'top' if metadata['sort'] == 0 else 'newest',
title = html.escape(comment_info['video_title']),
- url = common.URL_ORIGIN + '/watch?v=' + metadata['video_id'],
+ url = util.URL_ORIGIN + '/watch?v=' + metadata['video_id'],
thumbnail = '/i.ytimg.com/vi/'+ metadata['video_id'] + '/mqdefault.jpg',
)
comment_box = comment_box_template.substitute(
- form_action= common.URL_ORIGIN + '/post_comment',
+ form_action= util.URL_ORIGIN + '/post_comment',
video_id_input='''<input type="hidden" name="video_id" value="''' + metadata['video_id'] + '''">''',
post_text='Post comment',
options=comment_box_account_options(),
)
- other_sort_url = common.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(metadata['video_id'], sort=1 - metadata['sort'])
+ other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(metadata['video_id'], sort=1 - metadata['sort'])
other_sort_name = 'newest' if metadata['sort'] == 0 else 'top'
other_sort_link = '''<a class="sort-button" href="''' + other_sort_url + '''">Sort by ''' + other_sort_name + '''</a>'''
@@ -408,7 +416,7 @@ def get_comments_page(env, start_response):
if ctoken == '':
more_comments_button = ''
else:
- more_comments_button = more_comments_template.substitute(url = URL_ORIGIN + '/comments?ctoken=' + ctoken)
+ more_comments_button = more_comments_template.substitute(url = util.URL_ORIGIN + '/comments?ctoken=' + ctoken)
comments_area = '<section class="comments-area">\n'
comments_area += video_metadata + comment_box + comment_links + '\n'
comments_area += '<div class="comments">\n'
@@ -417,7 +425,7 @@ def get_comments_page(env, start_response):
comments_area += more_comments_button + '\n'
comments_area += '</section>\n'
return yt_comments_template.substitute(
- header = common.get_header(),
+ header = html_common.get_header(),
comments_area = comments_area,
page_title = page_title,
).encode('utf-8')
diff --git a/youtube/common.py b/youtube/html_common.py
index cb963ce..8e65a1f 100644
--- a/youtube/common.py
+++ b/youtube/html_common.py
@@ -1,46 +1,8 @@
from youtube.template import Template
-from youtube import local_playlist
-import settings
-import html
+from youtube import local_playlist, yt_data_extract, util
+
import json
-import re
-import urllib.parse
-import gzip
-import brotli
-import time
-import socks, sockshandler
-
-URL_ORIGIN = "/https://www.youtube.com"
-
-
-# videos (all of type str):
-
-# id
-# title
-# url
-# author
-# author_url
-# thumbnail
-# description
-# published
-# duration
-# likes
-# dislikes
-# views
-# playlist_index
-
-# playlists:
-
-# id
-# title
-# url
-# author
-# author_url
-# thumbnail
-# description
-# updated
-# size
-# first_video_id
+import html
with open('yt_basic_template.html', 'r', encoding='utf-8') as file:
@@ -139,205 +101,8 @@ medium_channel_item_template = Template('''
''')
-class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
- '''Separate cookiejars for receiving and sending'''
- def __init__(self, cookiejar_send=None, cookiejar_receive=None):
- import http.cookiejar
- self.cookiejar_send = cookiejar_send
- self.cookiejar_receive = cookiejar_receive
-
- def http_request(self, request):
- if self.cookiejar_send is not None:
- self.cookiejar_send.add_cookie_header(request)
- return request
-
- def http_response(self, request, response):
- if self.cookiejar_receive is not None:
- self.cookiejar_receive.extract_cookies(response, request)
- return response
-
- https_request = http_request
- https_response = http_response
-
-
-def decode_content(content, encoding_header):
- encodings = encoding_header.replace(' ', '').split(',')
- for encoding in reversed(encodings):
- if encoding == 'identity':
- continue
- if encoding == 'br':
- content = brotli.decompress(content)
- elif encoding == 'gzip':
- content = gzip.decompress(content)
- return content
-
-def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True):
- '''
- When cookiejar_send is set to a CookieJar object,
- those cookies will be sent in the request (but cookies in response will not be merged into it)
- When cookiejar_receive is set to a CookieJar object,
- cookies received in the response will be merged into the object (nothing will be sent from it)
- When both are set to the same object, cookies will be sent from the object,
- and response cookies will be merged into it.
- '''
- headers = dict(headers) # Note: Calling dict() on a dict will make a copy
- headers['Accept-Encoding'] = 'gzip, br'
-
- # prevent python version being leaked by urllib if User-Agent isn't provided
- # (urllib will use ex. Python-urllib/3.6 otherwise)
- if 'User-Agent' not in headers and 'user-agent' not in headers and 'User-agent' not in headers:
- headers['User-Agent'] = 'Python-urllib'
-
- if data is not None:
- if isinstance(data, str):
- data = data.encode('ascii')
- elif not isinstance(data, bytes):
- data = urllib.parse.urlencode(data).encode('ascii')
-
- start_time = time.time()
-
-
- req = urllib.request.Request(url, data=data, headers=headers)
-
- cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
-
- if use_tor and settings.route_tor:
- opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9150), cookie_processor)
- else:
- opener = urllib.request.build_opener(cookie_processor)
-
- response = opener.open(req, timeout=timeout)
- response_time = time.time()
-
-
- content = response.read()
- read_finish = time.time()
- if report_text:
- print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
- content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
- return content
-
-mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
-mobile_ua = (('User-Agent', mobile_user_agent),)
-desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
-desktop_ua = (('User-Agent', desktop_user_agent),)
-def dict_add(*dicts):
- for dictionary in dicts[1:]:
- dicts[0].update(dictionary)
- return dicts[0]
-def video_id(url):
- url_parts = urllib.parse.urlparse(url)
- return urllib.parse.parse_qs(url_parts.query)['v'][0]
-
-def uppercase_escape(s):
- return re.sub(
- r'\\U([0-9a-fA-F]{8})',
- lambda m: chr(int(m.group(1), base=16)), s)
-
-def default_multi_get(object, *keys, default):
- ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
- try:
- for key in keys:
- object = object[key]
- return object
- except (IndexError, KeyError):
- return default
-
-def get_plain_text(node):
- try:
- return html.escape(node['simpleText'])
- except KeyError:
- return unformmated_text_runs(node['runs'])
-
-def unformmated_text_runs(runs):
- result = ''
- for text_run in runs:
- result += html.escape(text_run["text"])
- return result
-
-def format_text_runs(runs):
- if isinstance(runs, str):
- return runs
- result = ''
- for text_run in runs:
- if text_run.get("bold", False):
- result += "<b>" + html.escape(text_run["text"]) + "</b>"
- elif text_run.get('italics', False):
- result += "<i>" + html.escape(text_run["text"]) + "</i>"
- else:
- result += html.escape(text_run["text"])
- return result
-
-# default, sddefault, mqdefault, hqdefault, hq720
-def get_thumbnail_url(video_id):
- return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
-
-def seconds_to_timestamp(seconds):
- seconds = int(seconds)
- hours, seconds = divmod(seconds,3600)
- minutes, seconds = divmod(seconds,60)
- if hours != 0:
- timestamp = str(hours) + ":"
- timestamp += str(minutes).zfill(2) # zfill pads with zeros
- else:
- timestamp = str(minutes)
-
- timestamp += ":" + str(seconds).zfill(2)
- return timestamp
-
-
-# -----
-# HTML
-# -----
-
-def small_video_item_html(item):
- video_info = json.dumps({key: item[key] for key in ('id', 'title', 'author', 'duration')})
- return small_video_item_template.substitute(
- title = html.escape(item["title"]),
- views = item["views"],
- author = html.escape(item["author"]),
- duration = item["duration"],
- url = URL_ORIGIN + "/watch?v=" + item["id"],
- thumbnail = get_thumbnail_url(item['id']),
- video_info = html.escape(video_info),
- )
-
-def small_playlist_item_html(item):
- return small_playlist_item_template.substitute(
- title=html.escape(item["title"]),
- size = item['size'],
- author="",
- url = URL_ORIGIN + "/playlist?list=" + item["id"],
- thumbnail= get_thumbnail_url(item['first_video_id']),
- )
-
-def medium_playlist_item_html(item):
- return medium_playlist_item_template.substitute(
- title=html.escape(item["title"]),
- size = item['size'],
- author=item['author'],
- author_url= URL_ORIGIN + item['author_url'],
- url = URL_ORIGIN + "/playlist?list=" + item["id"],
- thumbnail= item['thumbnail'],
- )
-
-def medium_video_item_html(medium_video_info):
- info = medium_video_info
-
- return medium_video_item_template.substitute(
- title=html.escape(info["title"]),
- views=info["views"],
- published = info["published"],
- description = format_text_runs(info["description"]),
- author=html.escape(info["author"]),
- author_url=info["author_url"],
- duration=info["duration"],
- url = URL_ORIGIN + "/watch?v=" + info["id"],
- thumbnail=info['thumbnail'],
- datetime='', # TODO
- )
header_template = Template('''
@@ -440,158 +205,28 @@ def get_header(search_box_value=""):
-def get_url(node):
- try:
- return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
- except KeyError:
- return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
-def get_text(node):
- try:
- return node['simpleText']
- except KeyError:
- pass
- try:
- return node['runs'][0]['text']
- except IndexError: # empty text runs
- return ''
-def get_formatted_text(node):
- try:
- return node['runs']
- except KeyError:
- return node['simpleText']
-def get_badges(node):
- badges = []
- for badge_node in node:
- badge = badge_node['metadataBadgeRenderer']['label']
- if badge.lower() != 'new':
- badges.append(badge)
- return badges
-def get_thumbnail(node):
- try:
- return node['thumbnails'][0]['url'] # polymer format
- except KeyError:
- return node['url'] # ajax format
-
-dispatch = {
-
-# polymer format
- 'title': ('title', get_text),
- 'publishedTimeText': ('published', get_text),
- 'videoId': ('id', lambda node: node),
- 'descriptionSnippet': ('description', get_formatted_text),
- 'lengthText': ('duration', get_text),
- 'thumbnail': ('thumbnail', get_thumbnail),
- 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']),
-
- 'viewCountText': ('views', get_text),
- 'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos"
- 'videoCountText': ('size', get_text),
- 'playlistId': ('id', lambda node: node),
- 'descriptionText': ('description', get_formatted_text),
-
- 'subscriberCountText': ('subscriber_count', get_text),
- 'channelId': ('id', lambda node: node),
- 'badges': ('badges', get_badges),
-
-# ajax format
- 'view_count_text': ('views', get_text),
- 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]),
- 'owner_text': ('author', get_text),
- 'owner_endpoint': ('author_url', lambda node: node['url']),
- 'description': ('description', get_formatted_text),
- 'index': ('playlist_index', get_text),
- 'short_byline': ('author', get_text),
- 'length': ('duration', get_text),
- 'video_id': ('id', lambda node: node),
-}
-def renderer_info(renderer):
- try:
- info = {}
- if 'viewCountText' in renderer: # prefer this one as it contains all the digits
- info['views'] = get_text(renderer['viewCountText'])
- elif 'shortViewCountText' in renderer:
- info['views'] = get_text(renderer['shortViewCountText'])
-
- if 'ownerText' in renderer:
- info['author'] = renderer['ownerText']['runs'][0]['text']
- info['author_url'] = renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
- try:
- overlays = renderer['thumbnailOverlays']
- except KeyError:
- pass
- else:
- for overlay in overlays:
- if 'thumbnailOverlayTimeStatusRenderer' in overlay:
- info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text'])
- # show renderers don't have videoCountText
- elif 'thumbnailOverlayBottomPanelRenderer' in overlay:
- info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text'])
-
- # show renderers don't have playlistId, have to dig into the url to get it
- try:
- info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId']
- except KeyError:
- pass
- for key, node in renderer.items():
- if key in ('longBylineText', 'shortBylineText'):
- info['author'] = get_text(node)
- try:
- info['author_url'] = get_url(node)
- except KeyError:
- pass
-
- # show renderers don't have thumbnail key at top level, dig into thumbnailRenderer
- elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node:
- info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url']
- else:
- try:
- simple_key, function = dispatch[key]
- except KeyError:
- continue
- info[simple_key] = function(node)
- return info
- except KeyError:
- print(renderer)
- raise
-
-def ajax_info(item_json):
- try:
- info = {}
- for key, node in item_json.items():
- try:
- simple_key, function = dispatch[key]
- except KeyError:
- continue
- info[simple_key] = function(node)
- return info
- except KeyError:
- print(item_json)
- raise
-
+
def badges_html(badges):
return ' | '.join(map(html.escape, badges))
-
-
-
html_transform_dispatch = {
'title': html.escape,
'published': html.escape,
'id': html.escape,
- 'description': format_text_runs,
+ 'description': yt_data_extract.format_text_runs,
'duration': html.escape,
'thumbnail': lambda url: html.escape('/' + url.lstrip('/')),
'size': html.escape,
'author': html.escape,
- 'author_url': lambda url: html.escape(URL_ORIGIN + url),
+ 'author_url': lambda url: html.escape(util.URL_ORIGIN + url),
'views': html.escape,
'subscriber_count': html.escape,
'badges': badges_html,
@@ -645,7 +280,7 @@ def video_item_html(item, template, html_exclude=set()):
html_ready = get_html_ready(item)
html_ready['video_info'] = html.escape(json.dumps(video_info) )
- html_ready['url'] = URL_ORIGIN + "/watch?v=" + html_ready['id']
+ html_ready['url'] = util.URL_ORIGIN + "/watch?v=" + html_ready['id']
html_ready['datetime'] = '' #TODO
for key in html_exclude:
@@ -658,7 +293,7 @@ def video_item_html(item, template, html_exclude=set()):
def playlist_item_html(item, template, html_exclude=set()):
html_ready = get_html_ready(item)
- html_ready['url'] = URL_ORIGIN + "/playlist?list=" + html_ready['id']
+ html_ready['url'] = util.URL_ORIGIN + "/playlist?list=" + html_ready['id']
html_ready['datetime'] = '' #TODO
for key in html_exclude:
@@ -672,10 +307,6 @@ def playlist_item_html(item, template, html_exclude=set()):
-def update_query_string(query_string, items):
- parameters = urllib.parse.parse_qs(query_string)
- parameters.update(items)
- return urllib.parse.urlencode(parameters, doseq=True)
page_button_template = Template('''<a class="page-button" href="$href">$page</a>''')
current_page_button_template = Template('''<div class="page-button">$page</div>''')
@@ -694,7 +325,7 @@ def page_buttons_html(current_page, estimated_pages, url, current_query_string):
template = current_page_button_template
else:
template = page_button_template
- result += template.substitute(page=page, href = url + "?" + update_query_string(current_query_string, {'page': [str(page)]}) )
+ result += template.substitute(page=page, href = url + "?" + util.update_query_string(current_query_string, {'page': [str(page)]}) )
return result
@@ -723,15 +354,15 @@ def renderer_html(renderer, additional_info={}, current_query_string=''):
return renderer_html(renderer['contents'][0], additional_info, current_query_string)
if type == 'channelRenderer':
- info = renderer_info(renderer)
+ info = yt_data_extract.renderer_info(renderer)
html_ready = get_html_ready(info)
- html_ready['url'] = URL_ORIGIN + "/channel/" + html_ready['id']
+ html_ready['url'] = util.URL_ORIGIN + "/channel/" + html_ready['id']
return medium_channel_item_template.substitute(html_ready)
if type in ('movieRenderer', 'clarificationRenderer'):
return ''
- info = renderer_info(renderer)
+ info = yt_data_extract.renderer_info(renderer)
info.update(additional_info)
html_exclude = set(additional_info.keys())
if type == 'compactVideoRenderer':
@@ -745,4 +376,4 @@ def renderer_html(renderer, additional_info={}, current_query_string=''):
#print(renderer)
#raise NotImplementedError('Unknown renderer type: ' + type)
- return ''
+ return '' \ No newline at end of file
diff --git a/youtube/local_playlist.py b/youtube/local_playlist.py
index 0375040..e354013 100644
--- a/youtube/local_playlist.py
+++ b/youtube/local_playlist.py
@@ -1,11 +1,12 @@
+from youtube.template import Template
+from youtube import util, html_common
+import settings
+
import os
import json
-from youtube.template import Template
-from youtube import common
import html
import gevent
import urllib
-import settings
playlists_directory = os.path.join(settings.data_dir, "playlists")
thumbnails_directory = os.path.join(settings.data_dir, "playlist_thumbnails")
@@ -38,7 +39,7 @@ def download_thumbnail(playlist_name, video_id):
url = "https://i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
save_location = os.path.join(thumbnails_directory, playlist_name, video_id + ".jpg")
try:
- thumbnail = common.fetch_url(url, report_text="Saved local playlist thumbnail: " + video_id)
+ thumbnail = util.fetch_url(url, report_text="Saved local playlist thumbnail: " + video_id)
except urllib.error.HTTPError as e:
print("Failed to download thumbnail for " + video_id + ": " + str(e))
return
@@ -78,15 +79,15 @@ def get_local_playlist_page(name):
if info['id'] + ".jpg" in thumbnails:
info['thumbnail'] = "/youtube.com/data/playlist_thumbnails/" + name + "/" + info['id'] + ".jpg"
else:
- info['thumbnail'] = common.get_thumbnail_url(info['id'])
+ info['thumbnail'] = util.get_thumbnail_url(info['id'])
missing_thumbnails.append(info['id'])
- videos_html += common.video_item_html(info, common.small_video_item_template)
+ videos_html += html_common.video_item_html(info, html_common.small_video_item_template)
except json.decoder.JSONDecodeError:
pass
gevent.spawn(download_thumbnails, name, missing_thumbnails)
return local_playlist_template.substitute(
page_title = name + ' - Local playlist',
- header = common.get_header(),
+ header = html_common.get_header(),
videos = videos_html,
title = name,
page_buttons = ''
@@ -127,11 +128,11 @@ def get_playlists_list_page():
page = '''<ul>\n'''
list_item_template = Template(''' <li><a href="$url">$name</a></li>\n''')
for name in get_playlist_names():
- page += list_item_template.substitute(url = html.escape(common.URL_ORIGIN + '/playlists/' + name), name = html.escape(name))
+ page += list_item_template.substitute(url = html.escape(util.URL_ORIGIN + '/playlists/' + name), name = html.escape(name))
page += '''</ul>\n'''
- return common.yt_basic_template.substitute(
+ return html_common.yt_basic_template.substitute(
page_title = "Local playlists",
- header = common.get_header(),
+ header = html_common.get_header(),
style = '',
page = page,
)
@@ -151,7 +152,7 @@ def path_edit_playlist(env, start_response):
if parameters['action'][0] == 'remove':
playlist_name = env['path_parts'][1]
remove_from_playlist(playlist_name, parameters['video_info_list'])
- start_response('303 See Other', [('Location', common.URL_ORIGIN + env['PATH_INFO']),] )
+ start_response('303 See Other', [('Location', util.URL_ORIGIN + env['PATH_INFO']),] )
return b''
else:
diff --git a/youtube/playlist.py b/youtube/playlist.py
index cc0da33..fbe6448 100644
--- a/youtube/playlist.py
+++ b/youtube/playlist.py
@@ -1,14 +1,14 @@
+from youtube import util, yt_data_extract, html_common, template, proto
+
import base64
-import youtube.common as common
import urllib
import json
-from string import Template
-import youtube.proto as proto
+import string
import gevent
import math
with open("yt_playlist_template.html", "r") as file:
- yt_playlist_template = Template(file.read())
+ yt_playlist_template = template.Template(file.read())
@@ -48,10 +48,10 @@ headers_1 = (
def playlist_first_page(playlist_id, report_text = "Retrieved playlist"):
url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
- content = common.fetch_url(url, common.mobile_ua + headers_1, report_text=report_text)
+ content = util.fetch_url(url, util.mobile_ua + headers_1, report_text=report_text)
'''with open('debug/playlist_debug', 'wb') as f:
f.write(content)'''
- content = json.loads(common.uppercase_escape(content.decode('utf-8')))
+ content = json.loads(util.uppercase_escape(content.decode('utf-8')))
return content
@@ -68,15 +68,15 @@ def get_videos(playlist_id, page):
'X-YouTube-Client-Version': '2.20180508',
}
- content = common.fetch_url(url, headers, report_text="Retrieved playlist")
+ content = util.fetch_url(url, headers, report_text="Retrieved playlist")
'''with open('debug/playlist_debug', 'wb') as f:
f.write(content)'''
- info = json.loads(common.uppercase_escape(content.decode('utf-8')))
+ info = json.loads(util.uppercase_escape(content.decode('utf-8')))
return info
-playlist_stat_template = Template('''
+playlist_stat_template = string.Template('''
<div>$stat</div>''')
def get_playlist_page(env, start_response):
start_response('200 OK', [('Content-type','text/html'),])
@@ -100,22 +100,22 @@ def get_playlist_page(env, start_response):
video_list = this_page_json['response']['continuationContents']['playlistVideoListContinuation']['contents']
videos_html = ''
for video_json in video_list:
- info = common.renderer_info(video_json['playlistVideoRenderer'])
- videos_html += common.video_item_html(info, common.small_video_item_template)
+ info = yt_data_extract.renderer_info(video_json['playlistVideoRenderer'])
+ videos_html += html_common.video_item_html(info, html_common.small_video_item_template)
- metadata = common.renderer_info(first_page_json['response']['header']['playlistHeaderRenderer'])
+ metadata = yt_data_extract.renderer_info(first_page_json['response']['header']['playlistHeaderRenderer'])
video_count = int(metadata['size'].replace(',', ''))
- page_buttons = common.page_buttons_html(int(page), math.ceil(video_count/20), common.URL_ORIGIN + "/playlist", env['QUERY_STRING'])
+ page_buttons = html_common.page_buttons_html(int(page), math.ceil(video_count/20), util.URL_ORIGIN + "/playlist", env['QUERY_STRING'])
- html_ready = common.get_html_ready(metadata)
+ html_ready = html_common.get_html_ready(metadata)
html_ready['page_title'] = html_ready['title'] + ' - Page ' + str(page)
stats = ''
stats += playlist_stat_template.substitute(stat=html_ready['size'] + ' videos')
stats += playlist_stat_template.substitute(stat=html_ready['views'])
return yt_playlist_template.substitute(
- header = common.get_header(),
+ header = html_common.get_header(),
videos = videos_html,
page_buttons = page_buttons,
stats = stats,
diff --git a/youtube/post_comment.py b/youtube/post_comment.py
index 92c45e1..876a1c0 100644
--- a/youtube/post_comment.py
+++ b/youtube/post_comment.py
@@ -1,11 +1,11 @@
# Contains functions having to do with posting/editing/deleting comments
+from youtube import util, html_common, proto, comments, accounts
+import settings
import urllib
import json
-from youtube import common, proto, comments, accounts
import re
import traceback
-import settings
import os
def _post_comment(text, video_id, session_token, cookiejar):
@@ -31,7 +31,7 @@ def _post_comment(text, video_id, session_token, cookiejar):
data = urllib.parse.urlencode(data_dict).encode()
- content = common.fetch_url("https://m.youtube.com/service_ajax?name=createCommentEndpoint", headers=headers, data=data, cookiejar_send=cookiejar)
+ content = util.fetch_url("https://m.youtube.com/service_ajax?name=createCommentEndpoint", headers=headers, data=data, cookiejar_send=cookiejar)
code = json.loads(content)['code']
print("Comment posting code: " + code)
@@ -62,7 +62,7 @@ def _post_comment_reply(text, video_id, parent_comment_id, session_token, cookie
}
data = urllib.parse.urlencode(data_dict).encode()
- content = common.fetch_url("https://m.youtube.com/service_ajax?name=createCommentReplyEndpoint", headers=headers, data=data, cookiejar_send=cookiejar)
+ content = util.fetch_url("https://m.youtube.com/service_ajax?name=createCommentReplyEndpoint", headers=headers, data=data, cookiejar_send=cookiejar)
code = json.loads(content)['code']
print("Comment posting code: " + code)
@@ -90,7 +90,7 @@ def _delete_comment(video_id, comment_id, author_id, session_token, cookiejar):
}
data = urllib.parse.urlencode(data_dict).encode()
- content = common.fetch_url("https://m.youtube.com/service_ajax?name=performCommentActionEndpoint", headers=headers, data=data, cookiejar_send=cookiejar)
+ content = util.fetch_url("https://m.youtube.com/service_ajax?name=performCommentActionEndpoint", headers=headers, data=data, cookiejar_send=cookiejar)
code = json.loads(content)['code']
print("Comment deletion code: " + code)
return code
@@ -101,8 +101,8 @@ def get_session_token(video_id, cookiejar):
# youtube-dl uses disable_polymer=1 which uses a different request format which has an obfuscated javascript algorithm to generate a parameter called "bgr"
# Tokens retrieved from disable_polymer pages only work with that format. Tokens retrieved on mobile only work using mobile requests
# Additionally, tokens retrieved without sending the same cookie won't work. So this is necessary even if the bgr and stuff was reverse engineered.
- headers = {'User-Agent': common.mobile_user_agent}
- mobile_page = common.fetch_url('https://m.youtube.com/watch?v=' + video_id, headers, report_text="Retrieved session token for comment", cookiejar_send=cookiejar, cookiejar_receive=cookiejar).decode()
+ headers = {'User-Agent': util.mobile_user_agent}
+ mobile_page = util.fetch_url('https://m.youtube.com/watch?v=' + video_id, headers, report_text="Retrieved session token for comment", cookiejar_send=cookiejar, cookiejar_receive=cookiejar).decode()
match = xsrf_token_regex.search(mobile_page)
if match:
return match.group(1).replace("%3D", "=")
@@ -118,9 +118,9 @@ def delete_comment(env, start_response):
code = _delete_comment(video_id, parameters['comment_id'][0], parameters['author_id'][0], token, cookiejar)
if code == "SUCCESS":
- start_response('303 See Other', [('Location', common.URL_ORIGIN + '/comment_delete_success'),] )
+ start_response('303 See Other', [('Location', util.URL_ORIGIN + '/comment_delete_success'),] )
else:
- start_response('303 See Other', [('Location', common.URL_ORIGIN + '/comment_delete_fail'),] )
+ start_response('303 See Other', [('Location', util.URL_ORIGIN + '/comment_delete_fail'),] )
def post_comment(env, start_response):
parameters = env['parameters']
@@ -131,11 +131,11 @@ def post_comment(env, start_response):
if 'parent_id' in parameters:
code = _post_comment_reply(parameters['comment_text'][0], parameters['video_id'][0], parameters['parent_id'][0], token, cookiejar)
- start_response('303 See Other', (('Location', common.URL_ORIGIN + '/comments?' + env['QUERY_STRING']),) )
+ start_response('303 See Other', (('Location', util.URL_ORIGIN + '/comments?' + env['QUERY_STRING']),) )
else:
code = _post_comment(parameters['comment_text'][0], parameters['video_id'][0], token, cookiejar)
- start_response('303 See Other', (('Location', common.URL_ORIGIN + '/comments?ctoken=' + comments.make_comment_ctoken(video_id, sort=1)),) )
+ start_response('303 See Other', (('Location', util.URL_ORIGIN + '/comments?ctoken=' + comments.make_comment_ctoken(video_id, sort=1)),) )
return b''
@@ -163,10 +163,10 @@ def get_delete_comment_page(env, start_response):
page += '''
<input type="submit" value="Yes, delete it">
</form>'''
- return common.yt_basic_template.substitute(
+ return html_common.yt_basic_template.substitute(
page_title = "Delete comment?",
style = style,
- header = common.get_header(),
+ header = html_common.get_header(),
page = page,
).encode('utf-8')
@@ -174,7 +174,7 @@ def get_post_comment_page(env, start_response):
start_response('200 OK', [('Content-type','text/html'),])
parameters = env['parameters']
video_id = parameters['video_id'][0]
- parent_id = common.default_multi_get(parameters, 'parent_id', 0, default='')
+ parent_id = util.default_multi_get(parameters, 'parent_id', 0, default='')
style = ''' main{
display: grid;
@@ -194,23 +194,23 @@ textarea{
}'''
if parent_id: # comment reply
comment_box = comments.comment_box_template.substitute(
- form_action = common.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id,
+ form_action = util.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id,
video_id_input = '',
post_text = "Post reply",
options=comments.comment_box_account_options(),
)
else:
comment_box = comments.comment_box_template.substitute(
- form_action = common.URL_ORIGIN + '/post_comment',
+ form_action = util.URL_ORIGIN + '/post_comment',
video_id_input = '''<input type="hidden" name="video_id" value="''' + video_id + '''">''',
post_text = "Post comment",
options=comments.comment_box_account_options(),
)
page = '''<div class="left">\n''' + comment_box + '''</div>\n'''
- return common.yt_basic_template.substitute(
+ return html_common.yt_basic_template.substitute(
page_title = "Post comment reply" if parent_id else "Post a comment",
style = style,
- header = common.get_header(),
+ header = html_common.get_header(),
page = page,
).encode('utf-8')
diff --git a/youtube/proto.py b/youtube/proto.py
index 004375a..d966455 100644
--- a/youtube/proto.py
+++ b/youtube/proto.py
@@ -60,7 +60,7 @@ def unpadded_b64encode(data):
def as_bytes(value):
if isinstance(value, str):
- return value.encode('ascii')
+ return value.encode('utf-8')
return value
diff --git a/youtube/search.py b/youtube/search.py
index db65eaa..0cef0f3 100644
--- a/youtube/search.py
+++ b/youtube/search.py
@@ -1,11 +1,12 @@
+from youtube import util, html_common, yt_data_extract, proto
+
import json
import urllib
import html
from string import Template
import base64
from math import ceil
-from youtube.common import default_multi_get, get_thumbnail_url, URL_ORIGIN
-from youtube import common, proto
+
with open("yt_search_results_template.html", "r") as file:
yt_search_results_template = file.read()
@@ -54,7 +55,7 @@ def get_search_json(query, page, autocorrect, sort, filters):
'X-YouTube-Client-Version': '2.20180418',
}
url += "&pbj=1&sp=" + page_number_to_sp_parameter(page, autocorrect, sort, filters).replace("=", "%3D")
- content = common.fetch_url(url, headers=headers, report_text="Got search results")
+ content = util.fetch_url(url, headers=headers, report_text="Got search results")
info = json.loads(content)
return info
@@ -70,9 +71,9 @@ def get_search_page(env, start_response):
start_response('200 OK', [('Content-type','text/html'),])
parameters = env['parameters']
if len(parameters) == 0:
- return common.yt_basic_template.substitute(
+ return html_common.yt_basic_template.substitute(
page_title = "Search",
- header = common.get_header(),
+ header = html_common.get_header(),
style = '',
page = '',
).encode('utf-8')
@@ -100,24 +101,24 @@ def get_search_page(env, start_response):
renderer = renderer[type]
corrected_query_string = parameters.copy()
corrected_query_string['query'] = [renderer['correctedQueryEndpoint']['searchEndpoint']['query']]
- corrected_query_url = URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
+ corrected_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(corrected_query_string, doseq=True)
corrections = did_you_mean.substitute(
corrected_query_url = corrected_query_url,
- corrected_query = common.format_text_runs(renderer['correctedQuery']['runs']),
+ corrected_query = yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
)
continue
if type == 'showingResultsForRenderer':
renderer = renderer[type]
no_autocorrect_query_string = parameters.copy()
no_autocorrect_query_string['autocorrect'] = ['0']
- no_autocorrect_query_url = URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
+ no_autocorrect_query_url = util.URL_ORIGIN + '/search?' + urllib.parse.urlencode(no_autocorrect_query_string, doseq=True)
corrections = showing_results_for.substitute(
- corrected_query = common.format_text_runs(renderer['correctedQuery']['runs']),
+ corrected_query = yt_data_extract.format_text_runs(renderer['correctedQuery']['runs']),
original_query_url = no_autocorrect_query_url,
original_query = html.escape(renderer['originalQuery']['simpleText']),
)
continue
- result_list_html += common.renderer_html(renderer, current_query_string=env['QUERY_STRING'])
+ result_list_html += html_common.renderer_html(renderer, current_query_string=env['QUERY_STRING'])
page = int(page)
if page <= 5:
@@ -129,13 +130,13 @@ def get_search_page(env, start_response):
result = Template(yt_search_results_template).substitute(
- header = common.get_header(query),
+ header = html_common.get_header(query),
results = result_list_html,
page_title = query + " - Search",
search_box_value = html.escape(query),
number_of_results = '{:,}'.format(estimated_results),
number_of_pages = '{:,}'.format(estimated_pages),
- page_buttons = common.page_buttons_html(page, estimated_pages, URL_ORIGIN + "/search", env['QUERY_STRING']),
+ page_buttons = html_common.page_buttons_html(page, estimated_pages, util.URL_ORIGIN + "/search", env['QUERY_STRING']),
corrections = corrections
)
return result.encode('utf-8')
diff --git a/youtube/subscriptions.py b/youtube/subscriptions.py
index ff7d0df..0c7e8a5 100644
--- a/youtube/subscriptions.py
+++ b/youtube/subscriptions.py
@@ -1,4 +1,4 @@
-from youtube import common, channel
+from youtube import util, yt_data_extract, html_common, channel
import settings
from string import Template
import sqlite3
@@ -169,7 +169,7 @@ def _get_upstream_videos(channel_id, time_last_checked):
content = response.read()
print('Retrieved videos for ' + channel_id)
- content = common.decode_content(content, response.getheader('Content-Encoding', default='identity'))
+ content = util.decode_content(content, response.getheader('Content-Encoding', default='identity'))
feed = atoma.parse_atom_bytes(content)
@@ -191,7 +191,7 @@ def _get_upstream_videos(channel_id, time_last_checked):
# Now check channel page to retrieve missing information for videos
json_channel_videos = channel.get_grid_items(channel.get_channel_tab(channel_id)[1]['response'])
for json_video in json_channel_videos:
- info = common.renderer_info(json_video['gridVideoRenderer'])
+ info = yt_data_extract.renderer_info(json_video['gridVideoRenderer'])
if 'description' not in info:
info['description'] = ''
if info['id'] in atom_videos:
@@ -205,12 +205,12 @@ def get_subscriptions_page(env, start_response):
items_html = '''<nav class="item-grid">\n'''
for item in _get_videos(30, 0):
- items_html += common.video_item_html(item, common.small_video_item_template)
+ items_html += html_common.video_item_html(item, html_common.small_video_item_template)
items_html += '''\n</nav>'''
start_response('200 OK', [('Content-type','text/html'),])
return subscriptions_template.substitute(
- header = common.get_header(),
+ header = html_common.get_header(),
items = items_html,
page_buttons = '',
).encode('utf-8')
@@ -243,7 +243,7 @@ def post_subscriptions_page(env, start_response):
finally:
connection.close()
- start_response('303 See Other', [('Location', common.URL_ORIGIN + '/subscriptions'),] )
+ start_response('303 See Other', [('Location', util.URL_ORIGIN + '/subscriptions'),] )
return b''
else:
start_response('400 Bad Request', ())
diff --git a/youtube/util.py b/youtube/util.py
new file mode 100644
index 0000000..9950815
--- /dev/null
+++ b/youtube/util.py
@@ -0,0 +1,229 @@
+import settings
+import socks, sockshandler
+import gzip
+import brotli
+import urllib.parse
+import re
+import time
+
+# The trouble with the requests library: It ships its own certificate bundle via certifi
+# instead of using the system certificate store, meaning self-signed certificates
+# configured by the user will not work. Some draconian networks block TLS unless a corporate
+# certificate is installed on the system. Additionally, some users install a self signed cert
+# in order to use programs to modify or monitor requests made by programs on the system.
+
+# Finally, certificates expire and need to be updated, or are sometimes revoked. Sometimes
+# certificate authorites go rogue and need to be untrusted. Since we are going through Tor exit nodes,
+# this becomes all the more important. A rogue CA could issue a fake certificate for accounts.google.com, and a
+# malicious exit node could use this to decrypt traffic when logging in and retrieve passwords. Examples:
+# https://www.engadget.com/2015/10/29/google-warns-symantec-over-certificates/
+# https://nakedsecurity.sophos.com/2013/12/09/serious-security-google-finds-fake-but-trusted-ssl-certificates-for-its-domains-made-in-france/
+
+# In the requests documentation it says:
+# "Before version 2.16, Requests bundled a set of root CAs that it trusted, sourced from the Mozilla trust store.
+# The certificates were only updated once for each Requests version. When certifi was not installed,
+# this led to extremely out-of-date certificate bundles when using significantly older versions of Requests.
+# For the sake of security we recommend upgrading certifi frequently!"
+# (http://docs.python-requests.org/en/master/user/advanced/#ca-certificates)
+
+# Expecting users to remember to manually update certifi on Linux isn't reasonable in my view.
+# On windows, this is even worse since I am distributing all dependencies. This program is not
+# updated frequently, and using requests would lead to outdated certificates. Certificates
+# should be updated with OS updates, instead of thousands of developers of different programs
+# being expected to do this correctly 100% of the time.
+
+# There is hope that this might be fixed eventually:
+# https://github.com/kennethreitz/requests/issues/2966
+
+# Until then, I will use a mix of urllib3 and urllib.
+import urllib3
+import urllib3.contrib.socks
+
+URL_ORIGIN = "/https://www.youtube.com"
+
+connection_pool = urllib3.PoolManager(cert_reqs = 'CERT_REQUIRED')
+
+old_tor_connection_pool = None
+tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:9150/', cert_reqs = 'CERT_REQUIRED')
+
+tor_pool_refresh_time = time.monotonic() # prevent problems due to clock changes
+
+def get_pool(use_tor):
+ global old_tor_connection_pool
+ global tor_connection_pool
+ global tor_pool_refresh_time
+
+ if not use_tor:
+ return connection_pool
+
+ # Tor changes circuits after 10 minutes: https://tor.stackexchange.com/questions/262/for-how-long-does-a-circuit-stay-alive
+ current_time = time.monotonic()
+ if current_time - tor_pool_refresh_time > 300: # close pool after 5 minutes
+ tor_connection_pool.clear()
+
+ # Keep a reference for 5 min to avoid it getting garbage collected while sockets still in use
+ old_tor_connection_pool = tor_connection_pool
+
+ tor_connection_pool = urllib3.contrib.socks.SOCKSProxyManager('socks5://127.0.0.1:9150/', cert_reqs = 'CERT_REQUIRED')
+ tor_pool_refresh_time = current_time
+
+ return tor_connection_pool
+
+
+
+class HTTPAsymmetricCookieProcessor(urllib.request.BaseHandler):
+ '''Separate cookiejars for receiving and sending'''
+ def __init__(self, cookiejar_send=None, cookiejar_receive=None):
+ import http.cookiejar
+ self.cookiejar_send = cookiejar_send
+ self.cookiejar_receive = cookiejar_receive
+
+ def http_request(self, request):
+ if self.cookiejar_send is not None:
+ self.cookiejar_send.add_cookie_header(request)
+ return request
+
+ def http_response(self, request, response):
+ if self.cookiejar_receive is not None:
+ self.cookiejar_receive.extract_cookies(response, request)
+ return response
+
+ https_request = http_request
+ https_response = http_response
+
+
+def decode_content(content, encoding_header):
+ encodings = encoding_header.replace(' ', '').split(',')
+ for encoding in reversed(encodings):
+ if encoding == 'identity':
+ continue
+ if encoding == 'br':
+ content = brotli.decompress(content)
+ elif encoding == 'gzip':
+ content = gzip.decompress(content)
+ return content
+
+def fetch_url(url, headers=(), timeout=15, report_text=None, data=None, cookiejar_send=None, cookiejar_receive=None, use_tor=True, return_response=False):
+ '''
+ When cookiejar_send is set to a CookieJar object,
+ those cookies will be sent in the request (but cookies in response will not be merged into it)
+ When cookiejar_receive is set to a CookieJar object,
+ cookies received in the response will be merged into the object (nothing will be sent from it)
+ When both are set to the same object, cookies will be sent from the object,
+ and response cookies will be merged into it.
+ '''
+ headers = dict(headers) # Note: Calling dict() on a dict will make a copy
+ headers['Accept-Encoding'] = 'gzip, br'
+
+ # prevent python version being leaked by urllib if User-Agent isn't provided
+ # (urllib will use ex. Python-urllib/3.6 otherwise)
+ if 'User-Agent' not in headers and 'user-agent' not in headers and 'User-agent' not in headers:
+ headers['User-Agent'] = 'Python-urllib'
+
+ method = "GET"
+ if data is not None:
+ method = "POST"
+ if isinstance(data, str):
+ data = data.encode('ascii')
+ elif not isinstance(data, bytes):
+ data = urllib.parse.urlencode(data).encode('ascii')
+
+ start_time = time.time()
+
+ if cookiejar_send is not None or cookiejar_receive is not None: # Use urllib
+ req = urllib.request.Request(url, data=data, headers=headers)
+
+ cookie_processor = HTTPAsymmetricCookieProcessor(cookiejar_send=cookiejar_send, cookiejar_receive=cookiejar_receive)
+
+ if use_tor and settings.route_tor:
+ opener = urllib.request.build_opener(sockshandler.SocksiPyHandler(socks.PROXY_TYPE_SOCKS5, "127.0.0.1", 9150), cookie_processor)
+ else:
+ opener = urllib.request.build_opener(cookie_processor)
+
+ response = opener.open(req, timeout=timeout)
+ response_time = time.time()
+
+
+ content = response.read()
+
+ else: # Use a urllib3 pool. Cookies can't be used since urllib3 doesn't have easy support for them.
+ pool = get_pool(use_tor and settings.route_tor)
+
+ response = pool.request(method, url, headers=headers, timeout=timeout, preload_content=False, decode_content=False)
+ response_time = time.time()
+
+ content = response.read()
+ response.release_conn()
+
+ read_finish = time.time()
+ if report_text:
+ print(report_text, ' Latency:', round(response_time - start_time,3), ' Read time:', round(read_finish - response_time,3))
+ content = decode_content(content, response.getheader('Content-Encoding', default='identity'))
+
+ if return_response:
+ return content, response
+ return content
+
+mobile_user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1'
+mobile_ua = (('User-Agent', mobile_user_agent),)
+desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
+desktop_ua = (('User-Agent', desktop_user_agent),)
+
+
+
+
+
+
+
+
+
+
+def dict_add(*dicts):
+ for dictionary in dicts[1:]:
+ dicts[0].update(dictionary)
+ return dicts[0]
+
+def video_id(url):
+ url_parts = urllib.parse.urlparse(url)
+ return urllib.parse.parse_qs(url_parts.query)['v'][0]
+
+def default_multi_get(object, *keys, default):
+ ''' Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. Last argument is the default value to use in case of any IndexErrors or KeyErrors '''
+ try:
+ for key in keys:
+ object = object[key]
+ return object
+ except (IndexError, KeyError):
+ return default
+
+
+# default, sddefault, mqdefault, hqdefault, hq720
+def get_thumbnail_url(video_id):
+ return "/i.ytimg.com/vi/" + video_id + "/mqdefault.jpg"
+
+def seconds_to_timestamp(seconds):
+ seconds = int(seconds)
+ hours, seconds = divmod(seconds,3600)
+ minutes, seconds = divmod(seconds,60)
+ if hours != 0:
+ timestamp = str(hours) + ":"
+ timestamp += str(minutes).zfill(2) # zfill pads with zeros
+ else:
+ timestamp = str(minutes)
+
+ timestamp += ":" + str(seconds).zfill(2)
+ return timestamp
+
+
+
+def update_query_string(query_string, items):
+ parameters = urllib.parse.parse_qs(query_string)
+ parameters.update(items)
+ return urllib.parse.urlencode(parameters, doseq=True)
+
+
+
+def uppercase_escape(s):
+ return re.sub(
+ r'\\U([0-9a-fA-F]{8})',
+ lambda m: chr(int(m.group(1), base=16)), s) \ No newline at end of file
diff --git a/youtube/watch.py b/youtube/watch.py
index 04a5b5d..06b525a 100644
--- a/youtube/watch.py
+++ b/youtube/watch.py
@@ -1,12 +1,12 @@
+from youtube import util, html_common, comments
+
from youtube_dl.YoutubeDL import YoutubeDL
from youtube_dl.extractor.youtube import YoutubeError
import json
import urllib
from string import Template
import html
-import youtube.common as common
-from youtube.common import default_multi_get, get_thumbnail_url, video_id, URL_ORIGIN
-import youtube.comments as comments
+
import gevent
import settings
import os
@@ -127,9 +127,11 @@ def get_related_items_html(info):
result = ""
for item in info['related_vids']:
if 'list' in item: # playlist:
- result += common.small_playlist_item_html(watch_page_related_playlist_info(item))
+ item = watch_page_related_playlist_info(item)
+ result += html_common.playlist_item_html(item, html_common.small_playlist_item_template)
else:
- result += common.small_video_item_html(watch_page_related_video_info(item))
+ item = watch_page_related_video_info(item)
+ result += html_common.video_item_html(item, html_common.small_video_item_template)
return result
@@ -137,11 +139,12 @@ def get_related_items_html(info):
# converts these to standard names
def watch_page_related_video_info(item):
result = {key: item[key] for key in ('id', 'title', 'author')}
- result['duration'] = common.seconds_to_timestamp(item['length_seconds'])
+ result['duration'] = util.seconds_to_timestamp(item['length_seconds'])
try:
result['views'] = item['short_view_count_text']
except KeyError:
result['views'] = ''
+ result['thumbnail'] = util.get_thumbnail_url(item['id'])
return result
def watch_page_related_playlist_info(item):
@@ -150,14 +153,15 @@ def watch_page_related_playlist_info(item):
'title': item['playlist_title'],
'id': item['list'],
'first_video_id': item['video_id'],
+ 'thumbnail': util.get_thumbnail_url(item['video_id']),
}
def sort_formats(info):
sorted_formats = info['formats'].copy()
- sorted_formats.sort(key=lambda x: default_multi_get(_formats, x['format_id'], 'height', default=0))
+ sorted_formats.sort(key=lambda x: util.default_multi_get(_formats, x['format_id'], 'height', default=0))
for index, format in enumerate(sorted_formats):
- if default_multi_get(_formats, format['format_id'], 'height', default=0) >= 360:
+ if util.default_multi_get(_formats, format['format_id'], 'height', default=0) >= 360:
break
sorted_formats = sorted_formats[index:] + sorted_formats[0:index]
sorted_formats = [format for format in info['formats'] if format['acodec'] != 'none' and format['vcodec'] != 'none']
@@ -236,7 +240,7 @@ def get_watch_page(env, start_response):
start_response('200 OK', [('Content-type','text/html'),])
- lc = common.default_multi_get(env['parameters'], 'lc', 0, default='')
+ lc = util.default_multi_get(env['parameters'], 'lc', 0, default='')
if settings.route_tor:
proxy = 'socks5://127.0.0.1:9150/'
else:
@@ -256,17 +260,17 @@ def get_watch_page(env, start_response):
#chosen_format = choose_format(info)
if isinstance(info, str): # youtube error
- return common.yt_basic_template.substitute(
+ return html_common.yt_basic_template.substitute(
page_title = "Error",
style = "",
- header = common.get_header(),
+ header = html_common.get_header(),
page = html.escape(info),
).encode('utf-8')
sorted_formats = sort_formats(info)
video_info = {
- "duration": common.seconds_to_timestamp(info["duration"]),
+ "duration": util.seconds_to_timestamp(info["duration"]),
"id": info['id'],
"title": info['title'],
"author": info['uploader'],
@@ -338,7 +342,7 @@ def get_watch_page(env, start_response):
page = yt_watch_template.substitute(
video_title = html.escape(info["title"]),
page_title = html.escape(info["title"]),
- header = common.get_header(),
+ header = html_common.get_header(),
uploader = html.escape(info["uploader"]),
uploader_channel_url = '/' + info["uploader_url"],
upload_date = upload_date,
diff --git a/youtube/youtube.py b/youtube/youtube.py
index 288f68b..4ec7962 100644
--- a/youtube/youtube.py
+++ b/youtube/youtube.py
@@ -1,7 +1,7 @@
import mimetypes
import urllib.parse
import os
-from youtube import local_playlist, watch, search, playlist, channel, comments, common, post_comment, accounts, subscriptions
+from youtube import local_playlist, watch, search, playlist, channel, comments, post_comment, accounts, util, subscriptions
import settings
YOUTUBE_FILES = (
"/shared.css",
@@ -68,7 +68,7 @@ def youtube(env, start_response):
elif path.startswith("/api/"):
start_response('200 OK', [('Content-type', 'text/vtt'),] )
- result = common.fetch_url('https://www.youtube.com' + path + ('?' + query_string if query_string else ''))
+ result = util.fetch_url('https://www.youtube.com' + path + ('?' + query_string if query_string else ''))
result = result.replace(b"align:start position:0%", b"")
return result
diff --git a/youtube/yt_data_extract.py b/youtube/yt_data_extract.py
new file mode 100644
index 0000000..5483911
--- /dev/null
+++ b/youtube/yt_data_extract.py
@@ -0,0 +1,205 @@
+import html
+
+# videos (all of type str):
+
+# id
+# title
+# url
+# author
+# author_url
+# thumbnail
+# description
+# published
+# duration
+# likes
+# dislikes
+# views
+# playlist_index
+
+# playlists:
+
+# id
+# title
+# url
+# author
+# author_url
+# thumbnail
+# description
+# updated
+# size
+# first_video_id
+
+
+
+
+
+
+
+def get_plain_text(node):
+ try:
+ return html.escape(node['simpleText'])
+ except KeyError:
+ return unformmated_text_runs(node['runs'])
+
+def unformmated_text_runs(runs):
+ result = ''
+ for text_run in runs:
+ result += html.escape(text_run["text"])
+ return result
+
+def format_text_runs(runs):
+ if isinstance(runs, str):
+ return runs
+ result = ''
+ for text_run in runs:
+ if text_run.get("bold", False):
+ result += "<b>" + html.escape(text_run["text"]) + "</b>"
+ elif text_run.get('italics', False):
+ result += "<i>" + html.escape(text_run["text"]) + "</i>"
+ else:
+ result += html.escape(text_run["text"])
+ return result
+
+
+
+
+
+
+
+
+def get_url(node):
+ try:
+ return node['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
+ except KeyError:
+ return node['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
+
+
+def get_text(node):
+ try:
+ return node['simpleText']
+ except KeyError:
+ pass
+ try:
+ return node['runs'][0]['text']
+ except IndexError: # empty text runs
+ return ''
+
+def get_formatted_text(node):
+ try:
+ return node['runs']
+ except KeyError:
+ return node['simpleText']
+
+def get_badges(node):
+ badges = []
+ for badge_node in node:
+ badge = badge_node['metadataBadgeRenderer']['label']
+ if badge.lower() != 'new':
+ badges.append(badge)
+ return badges
+
+def get_thumbnail(node):
+ try:
+ return node['thumbnails'][0]['url'] # polymer format
+ except KeyError:
+ return node['url'] # ajax format
+
+dispatch = {
+
+# polymer format
+ 'title': ('title', get_text),
+ 'publishedTimeText': ('published', get_text),
+ 'videoId': ('id', lambda node: node),
+ 'descriptionSnippet': ('description', get_formatted_text),
+ 'lengthText': ('duration', get_text),
+ 'thumbnail': ('thumbnail', get_thumbnail),
+ 'thumbnails': ('thumbnail', lambda node: node[0]['thumbnails'][0]['url']),
+
+ 'viewCountText': ('views', get_text),
+ 'numVideosText': ('size', lambda node: get_text(node).split(' ')[0]), # the format is "324 videos"
+ 'videoCountText': ('size', get_text),
+ 'playlistId': ('id', lambda node: node),
+ 'descriptionText': ('description', get_formatted_text),
+
+ 'subscriberCountText': ('subscriber_count', get_text),
+ 'channelId': ('id', lambda node: node),
+ 'badges': ('badges', get_badges),
+
+# ajax format
+ 'view_count_text': ('views', get_text),
+ 'num_videos_text': ('size', lambda node: get_text(node).split(' ')[0]),
+ 'owner_text': ('author', get_text),
+ 'owner_endpoint': ('author_url', lambda node: node['url']),
+ 'description': ('description', get_formatted_text),
+ 'index': ('playlist_index', get_text),
+ 'short_byline': ('author', get_text),
+ 'length': ('duration', get_text),
+ 'video_id': ('id', lambda node: node),
+
+}
+
+def renderer_info(renderer):
+ try:
+ info = {}
+ if 'viewCountText' in renderer: # prefer this one as it contains all the digits
+ info['views'] = get_text(renderer['viewCountText'])
+ elif 'shortViewCountText' in renderer:
+ info['views'] = get_text(renderer['shortViewCountText'])
+
+ if 'ownerText' in renderer:
+ info['author'] = renderer['ownerText']['runs'][0]['text']
+ info['author_url'] = renderer['ownerText']['runs'][0]['navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
+ try:
+ overlays = renderer['thumbnailOverlays']
+ except KeyError:
+ pass
+ else:
+ for overlay in overlays:
+ if 'thumbnailOverlayTimeStatusRenderer' in overlay:
+ info['duration'] = get_text(overlay['thumbnailOverlayTimeStatusRenderer']['text'])
+ # show renderers don't have videoCountText
+ elif 'thumbnailOverlayBottomPanelRenderer' in overlay:
+ info['size'] = get_text(overlay['thumbnailOverlayBottomPanelRenderer']['text'])
+
+ # show renderers don't have playlistId, have to dig into the url to get it
+ try:
+ info['id'] = renderer['navigationEndpoint']['watchEndpoint']['playlistId']
+ except KeyError:
+ pass
+ for key, node in renderer.items():
+ if key in ('longBylineText', 'shortBylineText'):
+ info['author'] = get_text(node)
+ try:
+ info['author_url'] = get_url(node)
+ except KeyError:
+ pass
+
+ # show renderers don't have thumbnail key at top level, dig into thumbnailRenderer
+ elif key == 'thumbnailRenderer' and 'showCustomThumbnailRenderer' in node:
+ info['thumbnail'] = node['showCustomThumbnailRenderer']['thumbnail']['thumbnails'][0]['url']
+ else:
+ try:
+ simple_key, function = dispatch[key]
+ except KeyError:
+ continue
+ info[simple_key] = function(node)
+ return info
+ except KeyError:
+ print(renderer)
+ raise
+
+def ajax_info(item_json):
+ try:
+ info = {}
+ for key, node in item_json.items():
+ try:
+ simple_key, function = dispatch[key]
+ except KeyError:
+ continue
+ info[simple_key] = function(node)
+ return info
+ except KeyError:
+ print(item_json)
+ raise
+
+