aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/account_functions_.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2018-12-24 03:00:42 -0800
committerJames Taylor <user234683@users.noreply.github.com>2018-12-24 03:00:42 -0800
commit59c0d76424b4df6a402a1da2d5de908c9ebd552c (patch)
tree40feaec81b503213ab62ccc4d18093930f6e93c3 /youtube/account_functions_.py
parentb61865beb11a1f42a85192c662298c43f53328c5 (diff)
downloadyt-local-59c0d76424b4df6a402a1da2d5de908c9ebd552c.tar.lz
yt-local-59c0d76424b4df6a402a1da2d5de908c9ebd552c.tar.xz
yt-local-59c0d76424b4df6a402a1da2d5de908c9ebd552c.zip
Initial port of _login method
Diffstat (limited to 'youtube/account_functions_.py')
-rw-r--r--youtube/account_functions_.py201
1 files changed, 132 insertions, 69 deletions
diff --git a/youtube/account_functions_.py b/youtube/account_functions_.py
index 3a64d51..414f9fe 100644
--- a/youtube/account_functions_.py
+++ b/youtube/account_functions_.py
@@ -6,6 +6,7 @@ from youtube import common, proto, comments
import re
import traceback
import settings
+import http.cookiejar
def _post_comment(text, video_id, session_token, cookie):
headers = {
@@ -149,6 +150,115 @@ def post_comment(query_string, fields):
return b'Refreshing comment page yielded error 500 Internal Server Error.\nPost comment status code: ' + code.encode('ascii')
return response
+def get_post_comment_page(query_string):
+ parameters = urllib.parse.parse_qs(query_string)
+ video_id = parameters['video_id'][0]
+ parent_id = common.default_multi_get(parameters, 'parent_id', 0, default='')
+
+ style = ''' main{
+ display: grid;
+ grid-template-columns: 3fr 2fr;
+}
+.left{
+ display:grid;
+ grid-template-columns: 1fr 640px;
+}
+textarea{
+ width: 460px;
+ height: 85px;
+}
+.comment-form{
+ grid-column:2;
+}'''
+ if parent_id: # comment reply
+ comment_box = comments.comment_box_template.substitute(
+ form_action = common.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id,
+ video_id_input = '',
+ post_text = "Post reply",
+ )
+ else:
+ comment_box = comments.comment_box_template.substitute(
+ form_action = common.URL_ORIGIN + '/comments?ctoken=' + comments.make_comment_ctoken(video_id, sort=1).replace("=", "%3D"),
+ video_id_input = '''<input type="hidden" name="video_id" value="''' + video_id + '''">''',
+ post_text = "Post comment",
+ )
+
+ page = '''<div class="left">\n''' + comment_box + '''</div>\n'''
+ return common.yt_basic_template.substitute(
+ page_title = "Post comment reply" if parent_id else "Post a comment",
+ style = style,
+ header = common.get_header(),
+ page = page,
+ )
+
+
+
+# ---------------------------------
+# Code ported from youtube-dl
+# ---------------------------------
+from html.parser import HTMLParser as compat_HTMLParser
+import http.client as compat_http_client
+
+class HTMLAttributeParser(compat_HTMLParser):
+ """Trivial HTML parser to gather the attributes for a single element"""
+ def __init__(self):
+ self.attrs = {}
+ compat_HTMLParser.__init__(self)
+
+ def handle_starttag(self, tag, attrs):
+ self.attrs = dict(attrs)
+
+def extract_attributes(html_element):
+ """Given a string for an HTML element such as
+ <el
+ a="foo" B="bar" c="&98;az" d=boz
+ empty= noval entity="&amp;"
+ sq='"' dq="'"
+ >
+ Decode and return a dictionary of attributes.
+ {
+ 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz',
+ 'empty': '', 'noval': None, 'entity': '&',
+ 'sq': '"', 'dq': '\''
+ }.
+ NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions,
+ but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5.
+ """
+ parser = HTMLAttributeParser()
+ parser.feed(html_element)
+ parser.close()
+
+ return parser.attrs
+
+def _hidden_inputs(html):
+ html = re.sub(r'<!--(?:(?!<!--).)*-->', '', html)
+ hidden_inputs = {}
+ for input in re.findall(r'(?i)(<input[^>]+>)', html):
+ attrs = extract_attributes(input)
+ if not input:
+ continue
+ if attrs.get('type') not in ('hidden', 'submit'):
+ continue
+ name = attrs.get('name') or attrs.get('id')
+ value = attrs.get('value')
+ if name and value is not None:
+ hidden_inputs[name] = value
+ return hidden_inputs
+
+def try_get(src, getter, expected_type=None):
+ if not isinstance(getter, (list, tuple)):
+ getter = [getter]
+ for get in getter:
+ try:
+ v = get(src)
+ except (AttributeError, KeyError, TypeError, IndexError):
+ pass
+ else:
+ if expected_type is None or isinstance(v, expected_type):
+ return v
+
+def remove_start(s, start):
+ return s[len(start):] if s is not None and s.startswith(start) else s
_LOGIN_URL = 'https://accounts.google.com/ServiceLogin'
_TWOFACTOR_URL = 'https://accounts.google.com/signin/challenge'
@@ -156,7 +266,7 @@ _TWOFACTOR_URL = 'https://accounts.google.com/signin/challenge'
_LOOKUP_URL = 'https://accounts.google.com/_/signin/sl/lookup'
_CHALLENGE_URL = 'https://accounts.google.com/_/signin/sl/challenge'
_TFA_URL = 'https://accounts.google.com/_/signin/challenge?hl=en&TL={0}'
-def _login(username, password):
+def _login(username, password, cookie_jar):
"""
Attempt to log in to YouTube.
True is returned if successful or skipped.
@@ -164,15 +274,12 @@ def _login(username, password):
Taken from youtube-dl
"""
+ login_page = common.fetch_url(_LOGIN_URL, report_text='Downloaded login page', cookie_jar_receive=cookie_jar).decode('utf-8')
- login_page = self._download_webpage(
- _LOGIN_URL, None,
- note='Downloading login page',
- errnote='unable to fetch login page', fatal=False)
if login_page is False:
return
- login_form = self._hidden_inputs(login_page)
+ login_form = _hidden_inputs(login_page)
def req(url, f_req, note, errnote):
data = login_form.copy()
@@ -186,14 +293,13 @@ def _login(username, password):
'flowName': 'GlifWebSignIn',
'flowEntry': 'ServiceLogin',
})
- return self._download_json(
- url, None, note=note, errnote=errnote,
- transform_source=lambda s: re.sub(r'^[^[]*', '', s),
- fatal=False,
- data=urlencode_postdata(data), headers={
- 'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8',
- 'Google-Accounts-XSRF': 1,
- })
+ headers={
+ 'Content-Type': 'application/x-www-form-urlencoded;charset=utf-8',
+ 'Google-Accounts-XSRF': 1,
+ }
+ result = common.fetch_url(url, headers, report_text=note, data=data, cookie_jar_send=cookie_jar, cookie_jar_receive=cookie_jar)
+ result = re.sub(r'^[^\[]*', '', result)
+ return json.loads(result)
def warn(message):
print("Login: " + message)
@@ -218,7 +324,7 @@ def _login(username, password):
if lookup_results is False:
return False
- user_hash = try_get(lookup_results, lambda x: x[0][2], compat_str)
+ user_hash = try_get(lookup_results, lambda x: x[0][2], str)
if not user_hash:
warn('Unable to extract user hash')
return False
@@ -240,7 +346,7 @@ def _login(username, password):
login_res = try_get(challenge_results, lambda x: x[0][5], list)
if login_res:
- login_msg = try_get(login_res, lambda x: x[5], compat_str)
+ login_msg = try_get(login_res, lambda x: x[5], str)
warn(
'Unable to login: %s' % 'Invalid password'
if login_msg == 'INCORRECT_ANSWER_ENTERED' else login_msg)
@@ -253,16 +359,16 @@ def _login(username, password):
login_challenge = try_get(res, lambda x: x[0][0], list)
if login_challenge:
- challenge_str = try_get(login_challenge, lambda x: x[2], compat_str)
+ challenge_str = try_get(login_challenge, lambda x: x[2], str)
if challenge_str == 'TWO_STEP_VERIFICATION':
# SEND_SUCCESS - TFA code has been successfully sent to phone
# QUOTA_EXCEEDED - reached the limit of TFA codes
- status = try_get(login_challenge, lambda x: x[5], compat_str)
+ status = try_get(login_challenge, lambda x: x[5], str)
if status == 'QUOTA_EXCEEDED':
warn('Exceeded the limit of TFA codes, try later')
return False
- tl = try_get(challenge_results, lambda x: x[1][2], compat_str)
+ tl = try_get(challenge_results, lambda x: x[1][2], str)
if not tl:
warn('Unable to extract TL')
return False
@@ -293,14 +399,14 @@ def _login(username, password):
tfa_res = try_get(tfa_results, lambda x: x[0][5], list)
if tfa_res:
- tfa_msg = try_get(tfa_res, lambda x: x[5], compat_str)
+ tfa_msg = try_get(tfa_res, lambda x: x[5], str)
warn(
'Unable to finish TFA: %s' % 'Invalid TFA code'
if tfa_msg == 'INCORRECT_ANSWER_ENTERED' else tfa_msg)
return False
check_cookie_url = try_get(
- tfa_results, lambda x: x[0][-1][2], compat_str)
+ tfa_results, lambda x: x[0][-1][2], str)
else:
CHALLENGES = {
'LOGIN_CHALLENGE': "This device isn't recognized. For your security, Google wants to make sure it's really you.",
@@ -309,20 +415,19 @@ def _login(username, password):
}
challenge = CHALLENGES.get(
challenge_str,
- '%s returned error %s.' % (IE_NAME, challenge_str))
+ '%s returned error %s.' % ('youtube', challenge_str))
warn('%s\nGo to https://accounts.google.com/, login and solve a challenge.' % challenge)
return False
else:
- check_cookie_url = try_get(res, lambda x: x[2], compat_str)
+ check_cookie_url = try_get(res, lambda x: x[2], str)
if not check_cookie_url:
warn('Unable to extract CheckCookie URL')
return False
- check_cookie_results = self._download_webpage(
- check_cookie_url, None, 'Checking cookie', fatal=False)
-
- if check_cookie_results is False:
+ try:
+ check_cookie_results = common.fetch_url(check_cookie_url, report-text="Checked cookie", cookie_jar_send=cookie_jar, cookie_jar_receive=cookie_jar).decode('utf-8')
+ except (urllib.error.URLError, compat_http_client.HTTPException, socket.error) as err:
return False
if 'https://myaccount.google.com/' not in check_cookie_results:
@@ -330,45 +435,3 @@ def _login(username, password):
return False
return True
-
-
-def get_post_comment_page(query_string):
- parameters = urllib.parse.parse_qs(query_string)
- video_id = parameters['video_id'][0]
- parent_id = common.default_multi_get(parameters, 'parent_id', 0, default='')
-
- style = ''' main{
- display: grid;
- grid-template-columns: 3fr 2fr;
-}
-.left{
- display:grid;
- grid-template-columns: 1fr 640px;
-}
-textarea{
- width: 460px;
- height: 85px;
-}
-.comment-form{
- grid-column:2;
-}'''
- if parent_id: # comment reply
- comment_box = comments.comment_box_template.substitute(
- form_action = common.URL_ORIGIN + '/comments?parent_id=' + parent_id + "&video_id=" + video_id,
- video_id_input = '',
- post_text = "Post reply",
- )
- else:
- comment_box = comments.comment_box_template.substitute(
- form_action = common.URL_ORIGIN + '/comments?ctoken=' + comments.make_comment_ctoken(video_id, sort=1).replace("=", "%3D"),
- video_id_input = '''<input type="hidden" name="video_id" value="''' + video_id + '''">''',
- post_text = "Post comment",
- )
-
- page = '''<div class="left">\n''' + comment_box + '''</div>\n'''
- return common.yt_basic_template.substitute(
- page_title = "Post comment reply" if parent_id else "Post a comment",
- style = style,
- header = common.get_header(),
- page = page,
- ) \ No newline at end of file