aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/watch.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-12-19 21:28:21 -0800
committerJames Taylor <user234683@users.noreply.github.com>2019-12-19 21:28:21 -0800
commit6b7a1212e30b713453aa7d2b3a7122e97689dad0 (patch)
treea1bcebdc84c36abaa9fbd1ced566815e75098bb1 /youtube/watch.py
parent4a3529df9577b660a2f493ab63ef08f10320b38e (diff)
downloadyt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.lz
yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.xz
yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.zip
Extraction: Move non-stateful signature decryption functionality into yt_data_extract
Diffstat (limited to 'youtube/watch.py')
-rw-r--r--youtube/watch.py97
1 files changed, 12 insertions, 85 deletions
diff --git a/youtube/watch.py b/youtube/watch.py
index 45d658f..429f272 100644
--- a/youtube/watch.py
+++ b/youtube/watch.py
@@ -11,7 +11,6 @@ import gevent
import os
import math
import traceback
-import re
import urllib
try:
@@ -175,101 +174,29 @@ def save_decrypt_cache():
f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True))
f.close()
-# adapted from youtube-dl and invidious:
-# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
-decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}')
-op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)')
def decrypt_signatures(info):
'''return error string, or False if no errors'''
- if ('formats' not in info) or (not info['formats']) or (not info['formats'][0]['s']):
- return False # No decryption needed
+ if not yt_data_extract.requires_decryption(info):
+ return False
+ if not info['player_name']:
+ return 'Could not find player name'
if not info['base_js']:
return 'Failed to find base.js'
- player_name = yt_data_extract.get(info['base_js'].split('/'), -2)
- if not player_name:
- return 'Could not find player name'
+ player_name = info['player_name']
if player_name in decrypt_cache:
print('Using cached decryption function for: ' + player_name)
- decryption_function = decrypt_cache[player_name]
+ info['decryption_function'] = decrypt_cache[player_name]
else:
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
base_js = base_js.decode('utf-8')
-
- decrypt_function_match = decrypt_function_re.search(base_js)
- if decrypt_function_match is None:
- return 'Could not find decryption function in base.js'
-
- function_body = decrypt_function_match.group(1).split(';')[1:-1]
- if not function_body:
- return 'Empty decryption function body'
-
- var_name = yt_data_extract.get(function_body[0].split('.'), 0)
- if var_name is None:
- return 'Could not find var_name'
-
- var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
- if var_body_match is None:
- return 'Could not find var_body'
-
- operations = var_body_match.group(1).replace('\n', '').split('},')
- if not operations:
- return 'Did not find any definitions in var_body'
- operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others
- operation_definitions = {}
- for op in operations:
- colon_index = op.find(':')
- opening_brace_index = op.find('{')
-
- if colon_index == -1 or opening_brace_index == -1:
- return 'Could not parse operation'
- op_name = op[:colon_index]
- op_body = op[opening_brace_index+1:]
- if op_body == 'a.reverse()':
- operation_definitions[op_name] = 0
- elif op_body == 'a.splice(0,b)':
- operation_definitions[op_name] = 1
- elif op_body.startswith('var c=a[0]'):
- operation_definitions[op_name] = 2
- else:
- return 'Unknown op_body: ' + op_body
-
- decryption_function = []
- for op_with_arg in function_body:
- match = op_with_arg_re.fullmatch(op_with_arg)
- if match is None:
- return 'Could not parse operation with arg'
- op_name = match.group(1)
- if op_name not in operation_definitions:
- return 'Unknown op_name: ' + op_name
- op_argument = match.group(2)
- decryption_function.append([operation_definitions[op_name], int(op_argument)])
-
- decrypt_cache[player_name] = decryption_function
+ err = yt_data_extract.extract_decryption_function(info, base_js)
+ if err:
+ return err
+ decrypt_cache[player_name] = info['decryption_function']
save_decrypt_cache()
-
- for format in info['formats']:
- if not format['s'] or not format['sp'] or not format['url']:
- print('Warning: s, sp, or url not in format')
- continue
-
- a = list(format['s'])
- for op, argument in decryption_function:
- if op == 0:
- a.reverse()
- elif op == 1:
- a = a[argument:]
- else:
- operation_2(a, argument)
-
- signature = ''.join(a)
- format['url'] += '&' + format['sp'] + '=' + signature
- return False
-
-def operation_2(a, b):
- c = a[0]
- a[0] = a[b % len(a)]
- a[b % len(a)] = c
+ err = yt_data_extract.decrypt_signatures(info)
+ return err
headers = (
('Accept', '*/*'),