diff options
Diffstat (limited to 'youtube/comments.py')
-rw-r--r-- | youtube/comments.py | 344 |
1 files changed, 152 insertions, 192 deletions
diff --git a/youtube/comments.py b/youtube/comments.py index 3b1ef86..1ff1a21 100644 --- a/youtube/comments.py +++ b/youtube/comments.py @@ -1,11 +1,13 @@ -from youtube import proto, util, yt_data_extract, accounts +from youtube import proto, util, yt_data_extract +from youtube.util import ( + concat_or_none, + strip_non_ascii +) from youtube import yt_app import settings import json import base64 -import urllib -import re import flask from flask import request @@ -23,12 +25,13 @@ from flask import request # *Old ASJN's continue to work, and start at the same comment even if new comments have been posted since # *The ASJN has no relation with any of the data in the response it came from + def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''): video_id = proto.as_bytes(video_id) secret_key = proto.as_bytes(secret_key) - - page_info = proto.string(4,video_id) + proto.uint(6, sort) + + page_info = proto.string(4, video_id) + proto.uint(6, sort) offset_information = proto.nested(4, page_info) + proto.uint(5, offset) if secret_key: offset_information = proto.string(1, secret_key) + offset_information @@ -37,235 +40,192 @@ def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''): if lc: page_params += proto.string(6, proto.percent_b64encode(proto.string(15, lc))) - result = proto.nested(2, page_params) + proto.uint(3,6) + proto.nested(6, offset_information) - return base64.urlsafe_b64encode(result).decode('ascii') - -def comment_replies_ctoken(video_id, comment_id, max_results=500): - - params = proto.string(2, comment_id) + proto.uint(9, max_results) - params = proto.nested(3, params) - - result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3,6) + proto.nested(6, params) + result = proto.nested(2, page_params) + proto.uint(3, 6) + proto.nested(6, offset_information) return base64.urlsafe_b64encode(result).decode('ascii') -def ctoken_metadata(ctoken): - result = dict() - params = proto.parse(proto.b64_to_bytes(ctoken)) - result['video_id'] = proto.parse(params[2])[2].decode('ascii') - offset_information = proto.parse(params[6]) - result['offset'] = offset_information.get(5, 0) - - result['is_replies'] = False - if (3 in offset_information) and (2 in proto.parse(offset_information[3])): - result['is_replies'] = True - result['sort'] = None - else: - try: - result['sort'] = proto.parse(offset_information[4])[6] - except KeyError: - result['sort'] = 0 - return result - - -mobile_headers = { - 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1', - 'Accept': '*/*', - 'Accept-Language': 'en-US,en;q=0.5', - 'X-YouTube-Client-Name': '2', - 'X-YouTube-Client-Version': '2.20180823', -} def request_comments(ctoken, replies=False): - if replies: # let's make it use different urls for no reason despite all the data being encoded - base_url = "https://m.youtube.com/watch_comment?action_get_comment_replies=1&ctoken=" - else: - base_url = "https://m.youtube.com/watch_comment?action_get_comments=1&ctoken=" - url = base_url + ctoken.replace("=", "%3D") + "&pbj=1" - - for i in range(0,8): # don't retry more than 8 times - content = util.fetch_url(url, headers=mobile_headers, report_text="Retrieved comments", debug_name='request_comments') - if content[0:4] == b")]}'": # random closing characters included at beginning of response for some reason - content = content[4:] - elif content[0:10] == b'\n<!DOCTYPE': # occasionally returns html instead of json for no reason - content = b'' - print("got <!DOCTYPE>, retrying") - continue - break - return content + url = 'https://m.youtube.com/youtubei/v1/next' + url += '?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8' + data = json.dumps({ + 'context': { + 'client': { + 'hl': 'en', + 'gl': 'US', + 'clientName': 'MWEB', + 'clientVersion': '2.20210804.02.00', + }, + }, + 'continuation': ctoken.replace('=', '%3D'), + }) + + content = util.fetch_url( + url, headers=util.mobile_xhr_headers + util.json_header, data=data, + report_text='Retrieved comments', debug_name='request_comments') + content = content.decode('utf-8') + + polymer_json = json.loads(content) + return polymer_json def single_comment_ctoken(video_id, comment_id): - page_params = proto.string(2, video_id) + proto.string(6, proto.percent_b64encode(proto.string(15, comment_id))) + page_params = proto.string(2, video_id) + proto.string( + 6, proto.percent_b64encode(proto.string(15, comment_id))) - result = proto.nested(2, page_params) + proto.uint(3,6) + result = proto.nested(2, page_params) + proto.uint(3, 6) return base64.urlsafe_b64encode(result).decode('ascii') - -def parse_comments_polymer(content): - try: - video_title = '' - content = json.loads(util.uppercase_escape(content.decode('utf-8'))) - url = content[1]['url'] - ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0] - metadata = ctoken_metadata(ctoken) - - try: - comments_raw = content[1]['response']['continuationContents']['commentSectionContinuation']['items'] - except KeyError: - comments_raw = content[1]['response']['continuationContents']['commentRepliesContinuation']['contents'] - - ctoken = util.default_multi_get(content, 1, 'response', 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='') - - comments = [] - for comment_json in comments_raw: - number_of_replies = 0 - try: - comment_thread = comment_json['commentThreadRenderer'] - except KeyError: - comment_renderer = comment_json['commentRenderer'] - else: - if 'commentTargetTitle' in comment_thread: - video_title = comment_thread['commentTargetTitle']['runs'][0]['text'] - - if 'replies' in comment_thread: - view_replies_text = yt_data_extract.get_plain_text(comment_thread['replies']['commentRepliesRenderer']['moreText']) - view_replies_text = view_replies_text.replace(',', '') - match = re.search(r'(\d+)', view_replies_text) - if match is None: - number_of_replies = 1 - else: - number_of_replies = int(match.group(1)) - comment_renderer = comment_thread['comment']['commentRenderer'] - - comment = { - 'author_id': comment_renderer.get('authorId', ''), - 'author_avatar': comment_renderer['authorThumbnail']['thumbnails'][0]['url'], - 'likes': comment_renderer['likeCount'], - 'published': yt_data_extract.get_plain_text(comment_renderer['publishedTimeText']), - 'text': comment_renderer['contentText'].get('runs', ''), - 'number_of_replies': number_of_replies, - 'comment_id': comment_renderer['commentId'], - } - - if 'authorText' in comment_renderer: # deleted channels have no name or channel link - comment['author'] = yt_data_extract.get_plain_text(comment_renderer['authorText']) - comment['author_url'] = comment_renderer['authorEndpoint']['commandMetadata']['webCommandMetadata']['url'] - comment['author_channel_id'] = comment_renderer['authorEndpoint']['browseEndpoint']['browseId'] - else: - comment['author'] = '' - comment['author_url'] = '' - comment['author_channel_id'] = '' - - comments.append(comment) - except Exception as e: - print('Error parsing comments: ' + str(e)) - comments = () - ctoken = '' - - return { - 'ctoken': ctoken, - 'comments': comments, - 'video_title': video_title, - 'video_id': metadata['video_id'], - 'offset': metadata['offset'], - 'is_replies': metadata['is_replies'], - 'sort': metadata['sort'], - } - def post_process_comments_info(comments_info): for comment in comments_info['comments']: - comment['author_url'] = util.URL_ORIGIN + comment['author_url'] - comment['author_avatar'] = '/' + comment['author_avatar'] - - comment['permalink'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id'] + '&lc=' + comment['comment_id'] - - if comment['author_channel_id'] in accounts.accounts: - comment['delete_url'] = (util.URL_ORIGIN + '/delete_comment?video_id=' - + comments_info['video_id'] - + '&channel_id='+ comment['author_channel_id'] - + '&author_id=' + comment['author_id'] - + '&comment_id=' + comment['comment_id']) - - num_replies = comment['number_of_replies'] - if num_replies == 0: - comment['replies_url'] = util.URL_ORIGIN + '/post_comment?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id'] - else: - comment['replies_url'] = util.URL_ORIGIN + '/comments?parent_id=' + comment['comment_id'] + "&video_id=" + comments_info['video_id'] - - if num_replies == 0: + comment['author'] = strip_non_ascii(comment['author']) if comment.get('author') else "" + comment['author_url'] = concat_or_none( + '/', comment['author_url']) + comment['author_avatar'] = concat_or_none( + settings.img_prefix, comment['author_avatar']) + + comment['permalink'] = concat_or_none( + util.URL_ORIGIN, '/watch?v=', + comments_info['video_id'], + '&lc=', comment['id'] + ) + + reply_count = comment['reply_count'] + comment['replies_url'] = None + if comment['reply_ctoken']: + # change max_replies field to 250 in ctoken + ctoken = comment['reply_ctoken'] + ctoken, err = proto.set_protobuf_value( + ctoken, + 'base64p', 6, 3, 9, value=200) + if err: + print('Error setting ctoken value:') + print(err) + comment['replies_url'] = None + comment['replies_url'] = concat_or_none( + util.URL_ORIGIN, + '/comments?replies=1&ctoken=' + ctoken) + + if reply_count == 0: comment['view_replies_text'] = 'Reply' - elif num_replies == 1: + elif reply_count == 1: comment['view_replies_text'] = '1 reply' else: - comment['view_replies_text'] = str(num_replies) + ' replies' - + comment['view_replies_text'] = str(reply_count) + ' replies' - if comment['likes'] == 1: + if comment['approx_like_count'] == '1': comment['likes_text'] = '1 like' else: - comment['likes_text'] = str(comment['likes']) + ' likes' + comment['likes_text'] = (str(comment['approx_like_count']) + + ' likes') comments_info['include_avatars'] = settings.enable_comment_avatars - if comments_info['ctoken'] != '': - comments_info['more_comments_url'] = util.URL_ORIGIN + '/comments?ctoken=' + comments_info['ctoken'] + if comments_info['ctoken']: + ctoken = comments_info['ctoken'] + if comments_info['is_replies']: + replies_param = '&replies=1' + # change max_replies field to 250 in ctoken + new_ctoken, err = proto.set_protobuf_value( + ctoken, + 'base64p', 6, 3, 9, value=200) + if err: + print('Error setting ctoken value:') + print(err) + else: + ctoken = new_ctoken + else: + replies_param = '' + comments_info['more_comments_url'] = concat_or_none( + util.URL_ORIGIN, '/comments?ctoken=', ctoken, replies_param) - comments_info['page_number'] = page_number = str(int(comments_info['offset']/20) + 1) + if comments_info['offset'] is None: + comments_info['page_number'] = None + else: + comments_info['page_number'] = int(comments_info['offset']/20) + 1 if not comments_info['is_replies']: comments_info['sort_text'] = 'top' if comments_info['sort'] == 0 else 'newest' - - comments_info['video_url'] = util.URL_ORIGIN + '/watch?v=' + comments_info['video_id'] - comments_info['video_thumbnail'] = '/i.ytimg.com/vi/'+ comments_info['video_id'] + '/mqdefault.jpg' + comments_info['video_url'] = concat_or_none( + util.URL_ORIGIN, '/watch?v=', comments_info['video_id']) + comments_info['video_thumbnail'] = concat_or_none( + settings.img_prefix, 'https://i.ytimg.com/vi/', + comments_info['video_id'], '/hqdefault.jpg' + ) def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''): - if settings.comments_mode: - comments_info = parse_comments_polymer(request_comments(make_comment_ctoken(video_id, sort, offset, lc, secret_key))) - post_process_comments_info(comments_info) - - post_comment_url = util.URL_ORIGIN + "/post_comment?video_id=" + video_id - other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(video_id, sort=1 - sort, lc=lc) - other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top') - comments_info['comment_links'] = [('Post comment', post_comment_url), (other_sort_text, other_sort_url)] + try: + if settings.comments_mode: + comments_info = {'error': None} + other_sort_url = ( + util.URL_ORIGIN + '/comments?ctoken=' + + make_comment_ctoken(video_id, sort=1 - sort, lc=lc) + ) + other_sort_text = 'Sort by ' + ('newest' if sort == 0 else 'top') + + this_sort_url = (util.URL_ORIGIN + + '/comments?ctoken=' + + make_comment_ctoken(video_id, sort=sort, lc=lc)) + + comments_info['comment_links'] = [ + (other_sort_text, other_sort_url), + ('Direct link', this_sort_url) + ] + + ctoken = make_comment_ctoken(video_id, sort, offset, lc) + comments_info.update(yt_data_extract.extract_comments_info( + request_comments(ctoken), ctoken=ctoken + )) + post_process_comments_info(comments_info) + + return comments_info + else: + return {} + except util.FetchError as e: + if e.code == '429' and settings.route_tor: + comments_info['error'] = 'Error: YouTube blocked the request because the Tor exit node is overutilized.' + if e.error_message: + comments_info['error'] += '\n\n' + e.error_message + comments_info['error'] += '\n\nExit node IP address: %s' % e.ip + else: + comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) - return comments_info + except Exception as e: + comments_info['error'] = 'YouTube blocked the request. Error: %s' % str(e) - return {} + if comments_info.get('error'): + print('Error retrieving comments for ' + str(video_id) + ':\n' + + comments_info['error']) + return comments_info @yt_app.route('/comments') def get_comments_page(): ctoken = request.args.get('ctoken', '') - replies = False - if not ctoken: - video_id = request.args['video_id'] - parent_id = request.args['parent_id'] + replies = request.args.get('replies', '0') == '1' - ctoken = comment_replies_ctoken(video_id, parent_id) - replies = True - - comments_info = parse_comments_polymer(request_comments(ctoken, replies)) + comments_info = yt_data_extract.extract_comments_info( + request_comments(ctoken, replies), ctoken=ctoken + ) post_process_comments_info(comments_info) if not replies: - other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(comments_info['video_id'], sort=1 - comments_info['sort']) + if comments_info['sort'] is None or comments_info['video_id'] is None: + other_sort_url = None + else: + other_sort_url = ( + util.URL_ORIGIN + + '/comments?ctoken=' + + make_comment_ctoken(comments_info['video_id'], + sort=1-comments_info['sort']) + ) other_sort_text = 'Sort by ' + ('newest' if comments_info['sort'] == 0 else 'top') comments_info['comment_links'] = [(other_sort_text, other_sort_url)] - - comment_posting_box_info = { - 'form_action': '' if replies else util.URL_ORIGIN + '/post_comment', - 'video_id': comments_info['video_id'], - 'accounts': accounts.account_list_data(), - 'include_video_id_input': not replies, - 'replying': replies, - } - - return flask.render_template('comments_page.html', - comments_info = comments_info, - comment_posting_box_info = comment_posting_box_info, + return flask.render_template( + 'comments_page.html', + comments_info=comments_info, + slim=request.args.get('slim', False) ) - |