diff options
author | James Taylor <user234683@users.noreply.github.com> | 2019-12-19 21:28:21 -0800 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2019-12-19 21:28:21 -0800 |
commit | 6b7a1212e30b713453aa7d2b3a7122e97689dad0 (patch) | |
tree | a1bcebdc84c36abaa9fbd1ced566815e75098bb1 /youtube/watch.py | |
parent | 4a3529df9577b660a2f493ab63ef08f10320b38e (diff) | |
download | yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.lz yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.xz yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.zip |
Extraction: Move non-stateful signature decryption functionality into yt_data_extract
Diffstat (limited to 'youtube/watch.py')
-rw-r--r-- | youtube/watch.py | 97 |
1 files changed, 12 insertions, 85 deletions
diff --git a/youtube/watch.py b/youtube/watch.py index 45d658f..429f272 100644 --- a/youtube/watch.py +++ b/youtube/watch.py @@ -11,7 +11,6 @@ import gevent import os import math import traceback -import re import urllib try: @@ -175,101 +174,29 @@ def save_decrypt_cache(): f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True)) f.close() -# adapted from youtube-dl and invidious: -# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr -decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}') -op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)') def decrypt_signatures(info): '''return error string, or False if no errors''' - if ('formats' not in info) or (not info['formats']) or (not info['formats'][0]['s']): - return False # No decryption needed + if not yt_data_extract.requires_decryption(info): + return False + if not info['player_name']: + return 'Could not find player name' if not info['base_js']: return 'Failed to find base.js' - player_name = yt_data_extract.get(info['base_js'].split('/'), -2) - if not player_name: - return 'Could not find player name' + player_name = info['player_name'] if player_name in decrypt_cache: print('Using cached decryption function for: ' + player_name) - decryption_function = decrypt_cache[player_name] + info['decryption_function'] = decrypt_cache[player_name] else: base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name) base_js = base_js.decode('utf-8') - - decrypt_function_match = decrypt_function_re.search(base_js) - if decrypt_function_match is None: - return 'Could not find decryption function in base.js' - - function_body = decrypt_function_match.group(1).split(';')[1:-1] - if not function_body: - return 'Empty decryption function body' - - var_name = yt_data_extract.get(function_body[0].split('.'), 0) - if var_name is None: - return 'Could not find var_name' - - var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) - if var_body_match is None: - return 'Could not find var_body' - - operations = var_body_match.group(1).replace('\n', '').split('},') - if not operations: - return 'Did not find any definitions in var_body' - operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others - operation_definitions = {} - for op in operations: - colon_index = op.find(':') - opening_brace_index = op.find('{') - - if colon_index == -1 or opening_brace_index == -1: - return 'Could not parse operation' - op_name = op[:colon_index] - op_body = op[opening_brace_index+1:] - if op_body == 'a.reverse()': - operation_definitions[op_name] = 0 - elif op_body == 'a.splice(0,b)': - operation_definitions[op_name] = 1 - elif op_body.startswith('var c=a[0]'): - operation_definitions[op_name] = 2 - else: - return 'Unknown op_body: ' + op_body - - decryption_function = [] - for op_with_arg in function_body: - match = op_with_arg_re.fullmatch(op_with_arg) - if match is None: - return 'Could not parse operation with arg' - op_name = match.group(1) - if op_name not in operation_definitions: - return 'Unknown op_name: ' + op_name - op_argument = match.group(2) - decryption_function.append([operation_definitions[op_name], int(op_argument)]) - - decrypt_cache[player_name] = decryption_function + err = yt_data_extract.extract_decryption_function(info, base_js) + if err: + return err + decrypt_cache[player_name] = info['decryption_function'] save_decrypt_cache() - - for format in info['formats']: - if not format['s'] or not format['sp'] or not format['url']: - print('Warning: s, sp, or url not in format') - continue - - a = list(format['s']) - for op, argument in decryption_function: - if op == 0: - a.reverse() - elif op == 1: - a = a[argument:] - else: - operation_2(a, argument) - - signature = ''.join(a) - format['url'] += '&' + format['sp'] + '=' + signature - return False - -def operation_2(a, b): - c = a[0] - a[0] = a[b % len(a)] - a[b % len(a)] = c + err = yt_data_extract.decrypt_signatures(info) + return err headers = ( ('Accept', '*/*'), |