from youtube import proto, util, html_common, yt_data_extract, accounts import settings import json import base64 from string import Template import urllib.request import urllib import html import re comment_area_template = Template('''
$video-metadata $comment-links $comment-box $comments $more-comments-button
''') comment_template = Template('''
$avatar
$author
$text
$replies $action_buttons
''') comment_avatar_template = Template(''' ''') reply_link_template = Template(''' $view_replies_text ''') with open("yt_comments_template.html", "r") as file: yt_comments_template = Template(file.read()) # $replies_link_text # Here's what I know about the secret key (starting with ASJN_i) # *The secret key definitely contains the following information (or perhaps the information is stored at youtube's servers): # -Video id # -Offset # -Sort # *If the video id or sort in the ctoken contradicts the ASJN, the response is an error. The offset encoded outside the ASJN is ignored entirely. # *The ASJN is base64 encoded data, indicated by the fact that the character after "ASJN_i" is one of ("0", "1", "2", "3") # *The encoded data is not valid protobuf # *The encoded data (after the 5 or so bytes that are always the same) is indistinguishable from random data according to a battery of randomness tests # *The ASJN in the ctoken provided by a response changes in regular intervals of about a second or two. # *Old ASJN's continue to work, and start at the same comment even if new comments have been posted since # *The ASJN has no relation with any of the data in the response it came from def make_comment_ctoken(video_id, sort=0, offset=0, lc='', secret_key=''): video_id = proto.as_bytes(video_id) secret_key = proto.as_bytes(secret_key) page_info = proto.string(4,video_id) + proto.uint(6, sort) offset_information = proto.nested(4, page_info) + proto.uint(5, offset) if secret_key: offset_information = proto.string(1, secret_key) + offset_information page_params = proto.string(2, video_id) if lc: page_params += proto.string(6, proto.percent_b64encode(proto.string(15, lc))) result = proto.nested(2, page_params) + proto.uint(3,6) + proto.nested(6, offset_information) return base64.urlsafe_b64encode(result).decode('ascii') def comment_replies_ctoken(video_id, comment_id, max_results=500): params = proto.string(2, comment_id) + proto.uint(9, max_results) params = proto.nested(3, params) result = proto.nested(2, proto.string(2, video_id)) + proto.uint(3,6) + proto.nested(6, params) return base64.urlsafe_b64encode(result).decode('ascii') def ctoken_metadata(ctoken): result = dict() params = proto.parse(proto.b64_to_bytes(ctoken)) result['video_id'] = proto.parse(params[2])[2].decode('ascii') offset_information = proto.parse(params[6]) result['offset'] = offset_information.get(5, 0) result['is_replies'] = False if (3 in offset_information) and (2 in proto.parse(offset_information[3])): result['is_replies'] = True else: try: result['sort'] = proto.parse(offset_information[4])[6] except KeyError: result['sort'] = 0 return result def get_ids(ctoken): params = proto.parse(proto.b64_to_bytes(ctoken)) video_id = proto.parse(params[2])[2] params = proto.parse(params[6]) params = proto.parse(params[3]) return params[2].decode('ascii'), video_id.decode('ascii') mobile_headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.5', 'X-YouTube-Client-Name': '2', 'X-YouTube-Client-Version': '2.20180823', } def request_comments(ctoken, replies=False): if replies: # let's make it use different urls for no reason despite all the data being encoded base_url = "https://m.youtube.com/watch_comment?action_get_comment_replies=1&ctoken=" else: base_url = "https://m.youtube.com/watch_comment?action_get_comments=1&ctoken=" url = base_url + ctoken.replace("=", "%3D") + "&pbj=1" for i in range(0,8): # don't retry more than 8 times content = util.fetch_url(url, headers=mobile_headers, report_text="Retrieved comments") if content[0:4] == b")]}'": # random closing characters included at beginning of response for some reason content = content[4:] elif content[0:10] == b'\n, retrying") continue break '''with open('debug/comments_debug', 'wb') as f: f.write(content)''' return content def single_comment_ctoken(video_id, comment_id): page_params = proto.string(2, video_id) + proto.string(6, proto.percent_b64encode(proto.string(15, comment_id))) result = proto.nested(2, page_params) + proto.uint(3,6) return base64.urlsafe_b64encode(result).decode('ascii') def parse_comments_ajax(content, replies=False): try: content = json.loads(util.uppercase_escape(content.decode('utf-8'))) #print(content) comments_raw = content['content']['continuation_contents']['contents'] ctoken = util.default_multi_get(content, 'content', 'continuation_contents', 'continuations', 0, 'continuation', default='') comments = [] for comment_raw in comments_raw: replies_url = '' if not replies: if comment_raw['replies'] is not None: reply_ctoken = comment_raw['replies']['continuations'][0]['continuation'] comment_id, video_id = get_ids(reply_ctoken) replies_url = util.URL_ORIGIN + '/comments?parent_id=' + comment_id + "&video_id=" + video_id comment_raw = comment_raw['comment'] comment = { 'author': comment_raw['author']['runs'][0]['text'], 'author_url': comment_raw['author_endpoint']['url'], 'author_channel_id': '', 'author_id': '', 'author_avatar': comment_raw['author_thumbnail']['url'], 'likes': comment_raw['like_count'], 'published': comment_raw['published_time']['runs'][0]['text'], 'text': comment_raw['content']['runs'], 'reply_count': '', 'replies_url': replies_url, } comments.append(comment) except Exception as e: print('Error parsing comments: ' + str(e)) comments = () ctoken = '' return {'ctoken': ctoken, 'comments': comments} reply_count_regex = re.compile(r'(\d+)') def parse_comments_polymer(content, replies=False): try: video_title = '' content = json.loads(util.uppercase_escape(content.decode('utf-8'))) url = content[1]['url'] ctoken = urllib.parse.parse_qs(url[url.find('?')+1:])['ctoken'][0] video_id = ctoken_metadata(ctoken)['video_id'] #print(content) try: comments_raw = content[1]['response']['continuationContents']['commentSectionContinuation']['items'] except KeyError: comments_raw = content[1]['response']['continuationContents']['commentRepliesContinuation']['contents'] replies = True ctoken = util.default_multi_get(content, 1, 'response', 'continuationContents', 'commentSectionContinuation', 'continuations', 0, 'nextContinuationData', 'continuation', default='') comments = [] for comment_raw in comments_raw: replies_url = '' view_replies_text = '' try: comment_raw = comment_raw['commentThreadRenderer'] except KeyError: pass else: if 'commentTargetTitle' in comment_raw: video_title = comment_raw['commentTargetTitle']['runs'][0]['text'] parent_id = comment_raw['comment']['commentRenderer']['commentId'] # TODO: move this stuff into the comments_html function if 'replies' in comment_raw: #reply_ctoken = comment_raw['replies']['commentRepliesRenderer']['continuations'][0]['nextContinuationData']['continuation'] #comment_id, video_id = get_ids(reply_ctoken) replies_url = util.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id view_replies_text = yt_data_extract.get_plain_text(comment_raw['replies']['commentRepliesRenderer']['moreText']) match = reply_count_regex.search(view_replies_text) if match is None: view_replies_text = '1 reply' else: view_replies_text = match.group(1) + " replies" elif not replies: view_replies_text = "Reply" replies_url = util.URL_ORIGIN + '/post_comment?parent_id=' + parent_id + "&video_id=" + video_id comment_raw = comment_raw['comment'] comment_raw = comment_raw['commentRenderer'] comment = { 'author': yt_data_extract.get_plain_text(comment_raw['authorText']), 'author_url': comment_raw['authorEndpoint']['commandMetadata']['webCommandMetadata']['url'], 'author_channel_id': comment_raw['authorEndpoint']['browseEndpoint']['browseId'], 'author_id': comment_raw['authorId'], 'author_avatar': comment_raw['authorThumbnail']['thumbnails'][0]['url'], 'likes': comment_raw['likeCount'], 'published': yt_data_extract.get_plain_text(comment_raw['publishedTimeText']), 'text': comment_raw['contentText'].get('runs', ''), 'view_replies_text': view_replies_text, 'replies_url': replies_url, 'video_id': video_id, 'comment_id': comment_raw['commentId'], } comments.append(comment) except Exception as e: print('Error parsing comments: ' + str(e)) comments = () ctoken = '' return {'ctoken': ctoken, 'comments': comments, 'video_title': video_title} def get_comments_html(comments): html_result = '' for comment in comments: replies = '' if comment['replies_url']: replies = reply_link_template.substitute(url=comment['replies_url'], view_replies_text=html.escape(comment['view_replies_text'])) if settings.enable_comment_avatars: avatar = comment_avatar_template.substitute( author_url = util.URL_ORIGIN + comment['author_url'], author_avatar = '/' + comment['author_avatar'], ) else: avatar = '' if comment['author_channel_id'] in accounts.accounts: delete_url = (util.URL_ORIGIN + '/delete_comment?video_id=' + comment['video_id'] + '&channel_id='+ comment['author_channel_id'] + '&author_id=' + comment['author_id'] + '&comment_id=' + comment['comment_id']) action_buttons = '''Delete''' else: action_buttons = '' permalink = util.URL_ORIGIN + '/watch?v=' + comment['video_id'] + '&lc=' + comment['comment_id'] html_result += comment_template.substitute( author=comment['author'], author_url = util.URL_ORIGIN + comment['author_url'], avatar = avatar, likes = str(comment['likes']) + ' likes' if str(comment['likes']) != '0' else '', published = comment['published'], text = yt_data_extract.format_text_runs(comment['text']), datetime = '', #TODO replies = replies, action_buttons = action_buttons, permalink = permalink, ) return html_result def video_comments(video_id, sort=0, offset=0, lc='', secret_key=''): if settings.enable_comments: post_comment_url = util.URL_ORIGIN + "/post_comment?video_id=" + video_id post_comment_link = '''Post comment''' other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(video_id, sort=1 - sort, lc=lc) other_sort_name = 'newest' if sort == 0 else 'top' other_sort_link = '''Sort by ''' + other_sort_name + '''''' comment_links = '''''' comment_info = parse_comments_polymer(request_comments(make_comment_ctoken(video_id, sort, offset, lc, secret_key))) ctoken = comment_info['ctoken'] if ctoken == '': more_comments_button = '' else: more_comments_button = more_comments_template.substitute(url = util.URL_ORIGIN + '/comments?ctoken=' + ctoken) result = '''
\n''' result += comment_links + '\n' result += '
\n' result += get_comments_html(comment_info['comments']) + '\n' result += '
\n' result += more_comments_button + '\n' result += '''
''' return result return '' more_comments_template = Template('''More comments''') video_metadata_template = Template('''
$title

Comments page $page_number

Sorted by $sort
''') account_option_template = Template(''' ''') def comment_box_account_options(): return ''.join(account_option_template.substitute(channel_id=channel_id, display_name=display_name) for channel_id, display_name in accounts.account_list_data()) comment_box_template = Template('''
Add account
$video_id_input
''') def get_comments_page(env, start_response): start_response('200 OK', [('Content-type','text/html'),] ) parameters = env['parameters'] ctoken = util.default_multi_get(parameters, 'ctoken', 0, default='') replies = False if not ctoken: video_id = parameters['video_id'][0] parent_id = parameters['parent_id'][0] ctoken = comment_replies_ctoken(video_id, parent_id) replies = True comment_info = parse_comments_polymer(request_comments(ctoken, replies), replies) metadata = ctoken_metadata(ctoken) if replies: page_title = 'Replies' video_metadata = '' comment_box = comment_box_template.substitute(form_action='', video_id_input='', post_text='Post reply', options=comment_box_account_options()) comment_links = '' else: page_number = str(int(metadata['offset']/20) + 1) page_title = 'Comments page ' + page_number video_metadata = video_metadata_template.substitute( page_number = page_number, sort = 'top' if metadata['sort'] == 0 else 'newest', title = html.escape(comment_info['video_title']), url = util.URL_ORIGIN + '/watch?v=' + metadata['video_id'], thumbnail = '/i.ytimg.com/vi/'+ metadata['video_id'] + '/mqdefault.jpg', ) comment_box = comment_box_template.substitute( form_action= util.URL_ORIGIN + '/post_comment', video_id_input='''''', post_text='Post comment', options=comment_box_account_options(), ) other_sort_url = util.URL_ORIGIN + '/comments?ctoken=' + make_comment_ctoken(metadata['video_id'], sort=1 - metadata['sort']) other_sort_name = 'newest' if metadata['sort'] == 0 else 'top' other_sort_link = '''Sort by ''' + other_sort_name + '''''' comment_links = '''''' comments_html = get_comments_html(comment_info['comments']) ctoken = comment_info['ctoken'] if ctoken == '': more_comments_button = '' else: more_comments_button = more_comments_template.substitute(url = util.URL_ORIGIN + '/comments?ctoken=' + ctoken) comments_area = '
\n' comments_area += video_metadata + comment_box + comment_links + '\n' comments_area += '
\n' comments_area += comments_html + '\n' comments_area += '
\n' comments_area += more_comments_button + '\n' comments_area += '
\n' return yt_comments_template.substitute( header = html_common.get_header(), comments_area = comments_area, page_title = page_title, ).encode('utf-8')