From 59c0d76424b4df6a402a1da2d5de908c9ebd552c Mon Sep 17 00:00:00 2001 From: James Taylor Date: Mon, 24 Dec 2018 03:00:42 -0800 Subject: Initial port of _login method --- youtube/account_functions_.py | 201 +++++++++++++++++++++++++++--------------- 1 file changed, 132 insertions(+), 69 deletions(-) diff --git a/youtube/account_functions_.py b/youtube/account_functions_.py index 3a64d51..414f9fe 100644 --- a/youtube/account_functions_.py +++ b/youtube/account_functions_.py @@ -6,6 +6,7 @@ from youtube import common, proto, comments import re import traceback import settings +import http.cookiejar def _post_comment(text, video_id, session_token, cookie): headers = { @@ -149,6 +150,115 @@ def post_comment(query_string, fields): return b'Refreshing comment page yielded error 500 Internal Server Error.\nPost comment status code: ' + code.encode('ascii') return response +def get_post_comment_page(query_string): + parameters = urllib.parse.parse_qs(query_string) + video_id = parameters['video_id'][0] + parent_id = common.default_multi_get(parameters, 'parent_id', 0, default='') + + style = ''' main{ + display: grid; + grid-template-columns: 3fr 2fr; +} +.left{ + display:grid; + grid-template-columns: 1fr 640px; +} +textarea{ + width: 460px; + height: 85px; +} +.comment-form{ + grid-column:2; +}''' + if parent_id: # comment reply + comment_box = comments.comment_box_template.substitute( + form_action = common.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id, + video_id_input = '', + post_text = "Post reply", + ) + else: + comment_box = comments.comment_box_template.substitute( + form_action = common.URL_ORIGIN + '/comments?ctoken=' + comments.make_comment_ctoken(video_id, sort=1).replace("=", "%3D"), + video_id_input = '''''', + post_text = "Post comment", + ) + + page = '''
\n''' + comment_box + '''
\n''' + return common.yt_basic_template.substitute( + page_title = "Post comment reply" if parent_id else "Post a comment", + style = style, + header = common.get_header(), + page = page, + ) + + + +# --------------------------------- +# Code ported from youtube-dl +# --------------------------------- +from html.parser import HTMLParser as compat_HTMLParser +import http.client as compat_http_client + +class HTMLAttributeParser(compat_HTMLParser): + """Trivial HTML parser to gather the attributes for a single element""" + def __init__(self): + self.attrs = {} + compat_HTMLParser.__init__(self) + + def handle_starttag(self, tag, attrs): + self.attrs = dict(attrs) + +def extract_attributes(html_element): + """Given a string for an HTML element such as + + Decode and return a dictionary of attributes. + { + 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz', + 'empty': '', 'noval': None, 'entity': '&', + 'sq': '"', 'dq': '\'' + }. + NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions, + but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5. + """ + parser = HTMLAttributeParser() + parser.feed(html_element) + parser.close() + + return parser.attrs + +def _hidden_inputs(html): + html = re.sub(r'', '', html) + hidden_inputs = {} + for input in re.findall(r'(?i)(]+>)', html): + attrs = extract_attributes(input) + if not input: + continue + if attrs.get('type') not in ('hidden', 'submit'): + continue + name = attrs.get('name') or attrs.get('id') + value = attrs.get('value') + if name and value is not None: + hidden_inputs[name] = value + return hidden_inputs + +def try_get(src, getter, expected_type=None): + if not isinstance(getter, (list, tuple)): + getter = [getter] + for get in getter: + try: + v = get(src) + except (AttributeError, KeyError, TypeError, IndexError): + pass + else: + if expected_type is None or isinstance(v, expected_type): + return v + +def remove_start(s, start): + return s[len(start):] if s is not None and s.startswith(start) else s _LOGIN_URL = 'https://accounts.google.com/ServiceLogin' _TWOFACTOR_URL = 'https://accounts.google.com/signin/challenge' @@ -156,7 +266,7 @@ _TWOFACTOR_URL = 'https://accounts.google.com/signin/challenge' _LOOKUP_URL = 'https://accounts.google.com/_/signin/sl/lookup' _CHALLENGE_URL = 'https://accounts.google.com/_/signin/sl/challenge' _TFA_URL = 'https://accounts.google.com/_/signin/challenge?hl=en&TL={0}' -def _login(username, password): +def _login(username, password, cookie_jar): """ Attempt to log in to YouTube. True is returned if successful or skipped. @@ -164,15 +274,12 @@ def _login(username, password): Taken from youtube-dl """ + login_page = common.fetch_url(_LOGIN_URL, report_text='Downloaded login page', cookie_jar_receive=cookie_jar).decode('utf-8') - login_page = self._download_webpage( - _LOGIN_URL, None, - note='Downloading login page', - errnote='unable to fetch login page', fatal=False) if login_page is False: return - login_form = self._hidden_inputs(login_page) + login_form = _hidden_inputs(login_page) def req(url, f_req, note, errnote): data = login_form.copy() @@ -186,14 +293,13 @@ def _login(username, password): 'flowName': 'GlifWebSignIn', 'flowEntry': 'ServiceLogin', }) - return self._download_json( - url, None, note=note, errnote=errnote, - transform_source=lambda s: re.sub(r'^[^[]*', '', s), - fatal=False, - data=urlencode_postdata(data), headers={ - 'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8', - 'Google-Accounts-XSRF': 1, - }) + headers={ + 'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8', + 'Google-Accounts-XSRF': 1, + } + result = common.fetch_url(url, headers, report_text=note, data=data, cookie_jar_send=cookie_jar, cookie_jar_receive=cookie_jar) + result = re.sub(r'^[^\[]*', '', result) + return json.loads(result) def warn(message): print("Login: " + message) @@ -218,7 +324,7 @@ def _login(username, password): if lookup_results is False: return False - user_hash = try_get(lookup_results, lambda x: x[0][2], compat_str) + user_hash = try_get(lookup_results, lambda x: x[0][2], str) if not user_hash: warn('Unable to extract user hash') return False @@ -240,7 +346,7 @@ def _login(username, password): login_res = try_get(challenge_results, lambda x: x[0][5], list) if login_res: - login_msg = try_get(login_res, lambda x: x[5], compat_str) + login_msg = try_get(login_res, lambda x: x[5], str) warn( 'Unable to login: %s' % 'Invalid password' if login_msg == 'INCORRECT_ANSWER_ENTERED' else login_msg) @@ -253,16 +359,16 @@ def _login(username, password): login_challenge = try_get(res, lambda x: x[0][0], list) if login_challenge: - challenge_str = try_get(login_challenge, lambda x: x[2], compat_str) + challenge_str = try_get(login_challenge, lambda x: x[2], str) if challenge_str == 'TWO_STEP_VERIFICATION': # SEND_SUCCESS - TFA code has been successfully sent to phone # QUOTA_EXCEEDED - reached the limit of TFA codes - status = try_get(login_challenge, lambda x: x[5], compat_str) + status = try_get(login_challenge, lambda x: x[5], str) if status == 'QUOTA_EXCEEDED': warn('Exceeded the limit of TFA codes, try later') return False - tl = try_get(challenge_results, lambda x: x[1][2], compat_str) + tl = try_get(challenge_results, lambda x: x[1][2], str) if not tl: warn('Unable to extract TL') return False @@ -293,14 +399,14 @@ def _login(username, password): tfa_res = try_get(tfa_results, lambda x: x[0][5], list) if tfa_res: - tfa_msg = try_get(tfa_res, lambda x: x[5], compat_str) + tfa_msg = try_get(tfa_res, lambda x: x[5], str) warn( 'Unable to finish TFA: %s' % 'Invalid TFA code' if tfa_msg == 'INCORRECT_ANSWER_ENTERED' else tfa_msg) return False check_cookie_url = try_get( - tfa_results, lambda x: x[0][-1][2], compat_str) + tfa_results, lambda x: x[0][-1][2], str) else: CHALLENGES = { 'LOGIN_CHALLENGE': "This device isn't recognized. For your security, Google wants to make sure it's really you.", @@ -309,20 +415,19 @@ def _login(username, password): } challenge = CHALLENGES.get( challenge_str, - '%s returned error %s.' % (IE_NAME, challenge_str)) + '%s returned error %s.' % ('youtube', challenge_str)) warn('%s\nGo to https://accounts.google.com/, login and solve a challenge.' % challenge) return False else: - check_cookie_url = try_get(res, lambda x: x[2], compat_str) + check_cookie_url = try_get(res, lambda x: x[2], str) if not check_cookie_url: warn('Unable to extract CheckCookie URL') return False - check_cookie_results = self._download_webpage( - check_cookie_url, None, 'Checking cookie', fatal=False) - - if check_cookie_results is False: + try: + check_cookie_results = common.fetch_url(check_cookie_url, report-text="Checked cookie", cookie_jar_send=cookie_jar, cookie_jar_receive=cookie_jar).decode('utf-8') + except (urllib.error.URLError, compat_http_client.HTTPException, socket.error) as err: return False if 'https://myaccount.google.com/' not in check_cookie_results: @@ -330,45 +435,3 @@ def _login(username, password): return False return True - - -def get_post_comment_page(query_string): - parameters = urllib.parse.parse_qs(query_string) - video_id = parameters['video_id'][0] - parent_id = common.default_multi_get(parameters, 'parent_id', 0, default='') - - style = ''' main{ - display: grid; - grid-template-columns: 3fr 2fr; -} -.left{ - display:grid; - grid-template-columns: 1fr 640px; -} -textarea{ - width: 460px; - height: 85px; -} -.comment-form{ - grid-column:2; -}''' - if parent_id: # comment reply - comment_box = comments.comment_box_template.substitute( - form_action = common.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id, - video_id_input = '', - post_text = "Post reply", - ) - else: - comment_box = comments.comment_box_template.substitute( - form_action = common.URL_ORIGIN + '/comments?ctoken=' + comments.make_comment_ctoken(video_id, sort=1).replace("=", "%3D"), - video_id_input = '''''', - post_text = "Post comment", - ) - - page = '''
\n''' + comment_box + '''
\n''' - return common.yt_basic_template.substitute( - page_title = "Post comment reply" if parent_id else "Post a comment", - style = style, - header = common.get_header(), - page = page, - ) \ No newline at end of file -- cgit v1.2.3