aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/comments.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/comments.py')
-rw-r--r--youtube/comments.py344
1 files changed, 152 insertions, 192 deletions
diff --git a/youtube/comments.py b/youtube/comments.py
index 3b1ef86..1ff1a21 100644
--- a/youtube/comments.py
+++ b/youtube/comments.py
@@ -1,11 +1,13 @@
-from youtube import proto, util, yt_data_extract, accounts
+from youtube import proto, util, yt_data_extract
+from youtube.util import (
+ concat_or_none,
+ strip_non_ascii
+)
from youtube import yt_app
import settings
import json
import base64
-import urllib
-import re
import flask
from flask import request
@@ -23,12 +25,13 @@ from flask import request
# *Old ASJN's continue to work, and start at the same comment even if new comments have been posted since
# *The ASJN has no relation with any of the data in the response it came from
+
def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''):
video_id = proto.as_bytes(video_id)
secret_key = proto.as_bytes(secret_key)
-
- page_info = proto.string(4,video_id) + proto.uint(6, sort)
+
+ page_info = proto.string(4, video_id) + proto.uint(6, sort)
offset_information = proto.nested(4, page_info) + proto.uint(5, offset)
if secret_key:
offset_information = proto.string(1, secret_key) + offset_information
@@ -37,235 +40,192 @@ def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''):
if lc:
page_params += proto.string(6, proto.percent_b64encode(proto.string(15, lc)))
- result = proto.nested(2, page_params) + proto.uint(3,6) + proto.nested(6, offset_information)
- return base64.urlsafe_b64encode(result).decode('ascii')
-
-def comment_replies_ctoken(video_id, comment_id, max_results=500):
-
- params = proto.string(2, comment_id) + proto.uint(9, max_results)
- params = proto.nested(3, params)
-
- result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3,6) + proto.nested(6, params)
+ result = proto.nested(2, page_params) + proto.uint(3, 6) + proto.nested(6, offset_information)
return base64.urlsafe_b64encode(result).decode('ascii')
-def ctoken_metadata(ctoken):
- result = dict()
- params = proto.parse(proto.b64_to_bytes(ctoken))
- result['video_id'] = proto.parse(params[2])[2].decode('ascii')
- offset_information = proto.parse(params[6])
- result['offset'] = offset_information.get(5, 0)
-
- result['is_replies'] = False
- if (3 in offset_information) and (2 in proto.parse(offset_information[3])):
- result['is_replies'] = True
- result['sort'] = None
- else:
- try:
- result['sort'] = proto.parse(offset_information[4])[6]
- except KeyError:
- result['sort'] = 0
- return result
-
-
-mobile_headers = {
- 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1',
- 'Accept': '*/*',
- 'Accept-Language': 'en-US,en;q=0.5',
- 'X-YouTube-Client-Name': '2',
- 'X-YouTube-Client-Version': '2.20180823',
-}
def request_comments(ctoken, replies=False):
- if replies: # let's make it use different urls for no reason despite all the data being encoded
- base_url = "https://m.youtube.com/watch_comment?action_get_comment_replies=1&ctoken="
- else:
- base_url = "https://m.youtube.com/watch_comment?action_get_comments=1&ctoken="
- url = base_url + ctoken.replace("=", "%3D") + "&pbj=1"
-
- for i in range(0,8): # don't retry more than 8 times
- content = util.fetch_url(url, headers=mobile_headers, report_text="Retrieved comments", debug_name='request_comments')
- if content[0:4] == b")]}'": # random closing characters included at beginning of response for some reason
- content = content[4:]
- elif content[0:10] == b'\n<!DOCTYPE': # occasionally returns html instead of json for no reason
- content = b''
- print("got <!DOCTYPE>, retrying")
- continue
- break
- return content
+ url = 'https://m.youtube.com/youtubei/v1/next'
+ url += '?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
+ data = json.dumps({
+ 'context': {
+ 'client': {
+ 'hl': 'en',
+ 'gl': 'US',
+ 'clientName': 'MWEB',
+ 'clientVersion': '2.20210804.02.00',
+ },
+ },
+ 'continuation': ctoken.replace('=', '%3D'),
+ })
+
+ content = util.fetch_url(
+ url, headers=util.mobile_xhr_headers + util.json_header, data=data,
+ report_text='Retrieved comments', debug_name='request_comments')
+ content = content.decode('utf-8')
+
+ polymer_json = json.loads(content)
+ return polymer_json
def single_comment_ctoken(video_id, comment_id):
- page_params = proto.string(2, video_id) + proto.string(6, proto.percent_b64encode(proto.string(15, comment_id)))
+ page_params = proto.string(2, video_id) + proto.string(
+ 6, proto.percent_b64encode(proto.string(15, comment_id)))
- result = proto.nested(2, page_params) + proto.uint(3,6)
+ result = proto.nested(2, page_params) + proto.uint(3, 6)
return base64.urlsafe_b64encode(result).decode('ascii')
-
-def parse_comments_polymer(content):
- try:
- video_title = ''
- content = json.loads(util.uppercase_escape(content.decode('utf-8')))
- url = content[1]['url']
- ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0]
- metadata = ctoken_metadata(ctoken)
-
- try:
- comments_raw = content[1]['response']['continuationContents']['commentSectionContinuation']['items']
- except KeyError:
- comments_raw = content[1]['response']['continuationContents']['commentRepliesContinuation']['contents']
-
- ctoken = util.default_multi_get(content, 1, 'response', 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='')
-
- comments = []
- for comment_json in comments_raw:
- number_of_replies = 0
- try:
- comment_thread = comment_json['commentThreadRenderer']
- except KeyError:
- comment_renderer = comment_json['commentRenderer']
- else:
- if 'commentTargetTitle' in comment_thread:
- video_title = comment_thread['commentTargetTitle']['runs'][0]['text']
-
- if 'replies' in comment_thread:
- view_replies_text = yt_data_extract.get_plain_text(comment_thread['replies']['commentRepliesRenderer']['moreText'])
- view_replies_text = view_replies_text.replace(',', '')
- match = re.search(r'(\d+)', view_replies_text)
- if match is None:
- number_of_replies = 1
- else:
- number_of_replies = int(match.group(1))
- comment_renderer = comment_thread['comment']['commentRenderer']
-
- comment = {
- 'author_id': comment_renderer.get('authorId', ''),
- 'author_avatar': comment_renderer['authorThumbnail']['thumbnails'][0]['url'],
- 'likes': comment_renderer['likeCount'],
- 'published': yt_data_extract.get_plain_text(comment_renderer['publishedTimeText']),
- 'text': comment_renderer['contentText'].get('runs', ''),
- 'number_of_replies': number_of_replies,
- 'comment_id': comment_renderer['commentId'],
- }
-
- if 'authorText' in comment_renderer: # deleted channels have no name or channel link
- comment['author'] = yt_data_extract.get_plain_text(comment_renderer['authorText'])
- comment['author_url'] = comment_renderer['authorEndpoint']['commandMetadata']['webCommandMetadata']['url']
- comment['author_channel_id'] = comment_renderer['authorEndpoint']['browseEndpoint']['browseId']
- else:
- comment['author'] = ''
- comment['author_url'] = ''
- comment['author_channel_id'] = ''
-
- comments.append(comment)
- except Exception as e:
- print('Error parsing comments: ' + str(e))
- comments = ()
- ctoken = ''
-
- return {
- 'ctoken': ctoken,
- 'comments': comments,
- 'video_title': video_title,
- 'video_id': metadata['video_id'],
- 'offset': metadata['offset'],
- 'is_replies': metadata['is_replies'],
- 'sort': metadata['sort'],
- }
-
def post_process_comments_info(comments_info):
for comment in comments_info['comments']:
- comment['author_url'] = util.URL_ORIGIN + comment['author_url']
- comment['author_avatar'] = '/' + comment['author_avatar']
-
- comment['permalink'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id'] + '&lc=' + comment['comment_id']
-
- if comment['author_channel_id'] in accounts.accounts:
- comment['delete_url'] = (util.URL_ORIGIN + '/delete_comment?video_id='
- + comments_info['video_id']
- + '&channel_id='+ comment['author_channel_id']
- + '&author_id=' + comment['author_id']
- + '&comment_id=' + comment['comment_id'])
-
- num_replies = comment['number_of_replies']
- if num_replies == 0:
- comment['replies_url'] = util.URL_ORIGIN + '/post_comment?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id']
- else:
- comment['replies_url'] = util.URL_ORIGIN + '/comments?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id']
-
- if num_replies == 0:
+ comment['author'] = strip_non_ascii(comment['author']) if comment.get('author') else ""
+ comment['author_url'] = concat_or_none(
+ '/', comment['author_url'])
+ comment['author_avatar'] = concat_or_none(
+ settings.img_prefix, comment['author_avatar'])
+
+ comment['permalink'] = concat_or_none(
+ util.URL_ORIGIN, '/watch?v=',
+ comments_info['video_id'],
+ '&lc=', comment['id']
+ )
+
+ reply_count = comment['reply_count']
+ comment['replies_url'] = None
+ if comment['reply_ctoken']:
+ # change max_replies field to 250 in ctoken
+ ctoken = comment['reply_ctoken']
+ ctoken, err = proto.set_protobuf_value(
+ ctoken,
+ 'base64p', 6, 3, 9, value=200)
+ if err:
+ print('Error setting ctoken value:')
+ print(err)
+ comment['replies_url'] = None
+ comment['replies_url'] = concat_or_none(
+ util.URL_ORIGIN,
+ '/comments?replies=1&ctoken=' + ctoken)
+
+ if reply_count == 0:
comment['view_replies_text'] = 'Reply'
- elif num_replies == 1:
+ elif reply_count == 1:
comment['view_replies_text'] = '1 reply'
else:
- comment['view_replies_text'] = str(num_replies) + ' replies'
-
+ comment['view_replies_text'] = str(reply_count) + ' replies'
- if comment['likes'] == 1:
+ if comment['approx_like_count'] == '1':
comment['likes_text'] = '1 like'
else:
- comment['likes_text'] = str(comment['likes']) + ' likes'
+ comment['likes_text'] = (str(comment['approx_like_count'])
+ + ' likes')
comments_info['include_avatars'] = settings.enable_comment_avatars
- if comments_info['ctoken'] != '':
- comments_info['more_comments_url'] = util.URL_ORIGIN + '/comments?ctoken=' + comments_info['ctoken']
+ if comments_info['ctoken']:
+ ctoken = comments_info['ctoken']
+ if comments_info['is_replies']:
+ replies_param = '&replies=1'
+ # change max_replies field to 250 in ctoken
+ new_ctoken, err = proto.set_protobuf_value(
+ ctoken,
+ 'base64p', 6, 3, 9, value=200)
+ if err:
+ print('Error setting ctoken value:')
+ print(err)
+ else:
+ ctoken = new_ctoken
+ else:
+ replies_param = ''
+ comments_info['more_comments_url'] = concat_or_none(
+ util.URL_ORIGIN, '/comments?ctoken=', ctoken, replies_param)
- comments_info['page_number'] = page_number = str(int(comments_info['offset']/20) + 1)
+ if comments_info['offset'] is None:
+ comments_info['page_number'] = None
+ else:
+ comments_info['page_number'] = int(comments_info['offset']/20) + 1
if not comments_info['is_replies']:
comments_info['sort_text'] = 'top' if comments_info['sort'] == 0 else 'newest'
-
- comments_info['video_url'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id']
- comments_info['video_thumbnail'] = '/i.ytimg.com/vi/'+ comments_info['video_id'] + '/mqdefault.jpg'
+ comments_info['video_url'] = concat_or_none(
+ util.URL_ORIGIN, '/watch?v=', comments_info['video_id'])
+ comments_info['video_thumbnail'] = concat_or_none(
+ settings.img_prefix, 'https://i.ytimg.com/vi/',
+ comments_info['video_id'], '/hqdefault.jpg'
+ )
def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''):
- if settings.comments_mode:
- comments_info = parse_comments_polymer(request_comments(make_comment_ctoken(video_id, sort, offset, lc, secret_key)))
- post_process_comments_info(comments_info)
-
- post_comment_url = util.URL_ORIGIN + "/post_comment?video_id=" + video_id
- other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
- other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
- comments_info['comment_links'] = [('Post comment', post_comment_url), (other_sort_text, other_sort_url)]
+ try:
+ if settings.comments_mode:
+ comments_info = {'error': None}
+ other_sort_url = (
+ util.URL_ORIGIN + '/comments?ctoken='
+ + make_comment_ctoken(video_id, sort=1 - sort, lc=lc)
+ )
+ other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top')
+
+ this_sort_url = (util.URL_ORIGIN
+ + '/comments?ctoken='
+ + make_comment_ctoken(video_id, sort=sort, lc=lc))
+
+ comments_info['comment_links'] = [
+ (other_sort_text, other_sort_url),
+ ('Direct link', this_sort_url)
+ ]
+
+ ctoken = make_comment_ctoken(video_id, sort, offset, lc)
+ comments_info.update(yt_data_extract.extract_comments_info(
+ request_comments(ctoken), ctoken=ctoken
+ ))
+ post_process_comments_info(comments_info)
+
+ return comments_info
+ else:
+ return {}
+ except util.FetchError as e:
+ if e.code == '429' and settings.route_tor:
+ comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.'
+ if e.error_message:
+ comments_info['error'] += '\n\n' + e.error_message
+ comments_info['error'] += '\n\nExit node IP address: %s' % e.ip
+ else:
+ comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
- return comments_info
+ except Exception as e:
+ comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e)
- return {}
+ if comments_info.get('error'):
+ print('Error retrieving comments for ' + str(video_id) + ':\n' +
+ comments_info['error'])
+ return comments_info
@yt_app.route('/comments')
def get_comments_page():
ctoken = request.args.get('ctoken', '')
- replies = False
- if not ctoken:
- video_id = request.args['video_id']
- parent_id = request.args['parent_id']
+ replies = request.args.get('replies', '0') == '1'
- ctoken = comment_replies_ctoken(video_id, parent_id)
- replies = True
-
- comments_info = parse_comments_polymer(request_comments(ctoken, replies))
+ comments_info = yt_data_extract.extract_comments_info(
+ request_comments(ctoken, replies), ctoken=ctoken
+ )
post_process_comments_info(comments_info)
if not replies:
- other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(comments_info['video_id'], sort=1 - comments_info['sort'])
+ if comments_info['sort'] is None or comments_info['video_id'] is None:
+ other_sort_url = None
+ else:
+ other_sort_url = (
+ util.URL_ORIGIN
+ + '/comments?ctoken='
+ + make_comment_ctoken(comments_info['video_id'],
+ sort=1-comments_info['sort'])
+ )
other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top')
comments_info['comment_links'] = [(other_sort_text, other_sort_url)]
-
- comment_posting_box_info = {
- 'form_action': '' if replies else util.URL_ORIGIN + '/post_comment',
- 'video_id': comments_info['video_id'],
- 'accounts': accounts.account_list_data(),
- 'include_video_id_input': not replies,
- 'replying': replies,
- }
-
- return flask.render_template('comments_page.html',
- comments_info = comments_info,
- comment_posting_box_info = comment_posting_box_info,
+ return flask.render_template(
+ 'comments_page.html',
+ comments_info=comments_info,
+ slim=request.args.get('slim', False)
)
-