diff options
author | James Taylor <user234683@users.noreply.github.com> | 2019-12-19 21:28:21 -0800 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2019-12-19 21:28:21 -0800 |
commit | 6b7a1212e30b713453aa7d2b3a7122e97689dad0 (patch) | |
tree | a1bcebdc84c36abaa9fbd1ced566815e75098bb1 /youtube/yt_data_extract/watch_extraction.py | |
parent | 4a3529df9577b660a2f493ab63ef08f10320b38e (diff) | |
download | yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.lz yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.xz yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.zip |
Extraction: Move non-stateful signature decryption functionality into yt_data_extract
Diffstat (limited to 'youtube/yt_data_extract/watch_extraction.py')
-rw-r--r-- | youtube/yt_data_extract/watch_extraction.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py index ff39f62..09abbe3 100644 --- a/youtube/yt_data_extract/watch_extraction.py +++ b/youtube/yt_data_extract/watch_extraction.py @@ -7,6 +7,7 @@ from .common import (get, multi_get, deep_get, multi_deep_get, import json import urllib.parse import traceback +import re # from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py _formats = { @@ -377,7 +378,11 @@ def extract_watch_info(polymer_json): info['base_js'] = deep_get(top_level, 'player', 'assets', 'js') if info['base_js']: info['base_js'] = normalize_url(info['base_js']) + info['player_name'] = get(info['base_js'].split('/'), -2) + else: + info['player_name'] = None + # extract stuff from visible parts of page mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={}) if mobile: info.update(_extract_watch_info_mobile(top_level)) @@ -447,3 +452,94 @@ def update_with_age_restricted_info(info, video_info_page): _extract_formats(info, player_response) _extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX) + +def requires_decryption(info): + return ('formats' in info) and info['formats'] and info['formats'][0]['s'] + +# adapted from youtube-dl and invidious: +# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr +decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}') +op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)') +def extract_decryption_function(info, base_js): + '''Insert decryption function into info. Return error string if not successful. + Decryption function is a list of list[2] of numbers. + It is advisable to cache the decryption function (uniquely identified by info['player_name']) so base.js (1 MB) doesn't need to be redownloaded each time''' + info['decryption_function'] = None + decrypt_function_match = decrypt_function_re.search(base_js) + if decrypt_function_match is None: + return 'Could not find decryption function in base.js' + + function_body = decrypt_function_match.group(1).split(';')[1:-1] + if not function_body: + return 'Empty decryption function body' + + var_name = get(function_body[0].split('.'), 0) + if var_name is None: + return 'Could not find var_name' + + var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) + if var_body_match is None: + return 'Could not find var_body' + + operations = var_body_match.group(1).replace('\n', '').split('},') + if not operations: + return 'Did not find any definitions in var_body' + operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others + operation_definitions = {} + for op in operations: + colon_index = op.find(':') + opening_brace_index = op.find('{') + + if colon_index == -1 or opening_brace_index == -1: + return 'Could not parse operation' + op_name = op[:colon_index] + op_body = op[opening_brace_index+1:] + if op_body == 'a.reverse()': + operation_definitions[op_name] = 0 + elif op_body == 'a.splice(0,b)': + operation_definitions[op_name] = 1 + elif op_body.startswith('var c=a[0]'): + operation_definitions[op_name] = 2 + else: + return 'Unknown op_body: ' + op_body + + decryption_function = [] + for op_with_arg in function_body: + match = op_with_arg_re.fullmatch(op_with_arg) + if match is None: + return 'Could not parse operation with arg' + op_name = match.group(1) + if op_name not in operation_definitions: + return 'Unknown op_name: ' + op_name + op_argument = match.group(2) + decryption_function.append([operation_definitions[op_name], int(op_argument)]) + + info['decryption_function'] = decryption_function + return False + +def _operation_2(a, b): + c = a[0] + a[0] = a[b % len(a)] + a[b % len(a)] = c + +def decrypt_signatures(info): + '''Applies info['decryption_function'] to decrypt all the signatures. Return err.''' + if not info.get('decryption_function'): + return 'decryption_function not in info' + for format in info['formats']: + if not format['s'] or not format['sp'] or not format['url']: + print('Warning: s, sp, or url not in format') + continue + + a = list(format['s']) + for op, argument in info['decryption_function']: + if op == 0: + a.reverse() + elif op == 1: + a = a[argument:] + else: + _operation_2(a, argument) + + signature = ''.join(a) + format['url'] += '&' + format['sp'] + '=' + signature + return False |