From 6b7a1212e30b713453aa7d2b3a7122e97689dad0 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Thu, 19 Dec 2019 21:28:21 -0800 Subject: Extraction: Move non-stateful signature decryption functionality into yt_data_extract --- youtube/watch.py | 97 ++++------------------------- youtube/yt_data_extract/__init__.py | 3 +- youtube/yt_data_extract/watch_extraction.py | 96 ++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 86 deletions(-) (limited to 'youtube') diff --git a/youtube/watch.py b/youtube/watch.py index 45d658f..429f272 100644 --- a/youtube/watch.py +++ b/youtube/watch.py @@ -11,7 +11,6 @@ import gevent import os import math import traceback -import re import urllib try: @@ -175,101 +174,29 @@ def save_decrypt_cache(): f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True)) f.close() -# adapted from youtube-dl and invidious: -# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr -decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}') -op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)') def decrypt_signatures(info): '''return error string, or False if no errors''' - if ('formats' not in info) or (not info['formats']) or (not info['formats'][0]['s']): - return False # No decryption needed + if not yt_data_extract.requires_decryption(info): + return False + if not info['player_name']: + return 'Could not find player name' if not info['base_js']: return 'Failed to find base.js' - player_name = yt_data_extract.get(info['base_js'].split('/'), -2) - if not player_name: - return 'Could not find player name' + player_name = info['player_name'] if player_name in decrypt_cache: print('Using cached decryption function for: ' + player_name) - decryption_function = decrypt_cache[player_name] + info['decryption_function'] = decrypt_cache[player_name] else: base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name) base_js = base_js.decode('utf-8') - - decrypt_function_match = decrypt_function_re.search(base_js) - if decrypt_function_match is None: - return 'Could not find decryption function in base.js' - - function_body = decrypt_function_match.group(1).split(';')[1:-1] - if not function_body: - return 'Empty decryption function body' - - var_name = yt_data_extract.get(function_body[0].split('.'), 0) - if var_name is None: - return 'Could not find var_name' - - var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) - if var_body_match is None: - return 'Could not find var_body' - - operations = var_body_match.group(1).replace('\n', '').split('},') - if not operations: - return 'Did not find any definitions in var_body' - operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others - operation_definitions = {} - for op in operations: - colon_index = op.find(':') - opening_brace_index = op.find('{') - - if colon_index == -1 or opening_brace_index == -1: - return 'Could not parse operation' - op_name = op[:colon_index] - op_body = op[opening_brace_index+1:] - if op_body == 'a.reverse()': - operation_definitions[op_name] = 0 - elif op_body == 'a.splice(0,b)': - operation_definitions[op_name] = 1 - elif op_body.startswith('var c=a[0]'): - operation_definitions[op_name] = 2 - else: - return 'Unknown op_body: ' + op_body - - decryption_function = [] - for op_with_arg in function_body: - match = op_with_arg_re.fullmatch(op_with_arg) - if match is None: - return 'Could not parse operation with arg' - op_name = match.group(1) - if op_name not in operation_definitions: - return 'Unknown op_name: ' + op_name - op_argument = match.group(2) - decryption_function.append([operation_definitions[op_name], int(op_argument)]) - - decrypt_cache[player_name] = decryption_function + err = yt_data_extract.extract_decryption_function(info, base_js) + if err: + return err + decrypt_cache[player_name] = info['decryption_function'] save_decrypt_cache() - - for format in info['formats']: - if not format['s'] or not format['sp'] or not format['url']: - print('Warning: s, sp, or url not in format') - continue - - a = list(format['s']) - for op, argument in decryption_function: - if op == 0: - a.reverse() - elif op == 1: - a = a[argument:] - else: - operation_2(a, argument) - - signature = ''.join(a) - format['url'] += '&' + format['sp'] + '=' + signature - return False - -def operation_2(a, b): - c = a[0] - a[0] = a[b % len(a)] - a[b % len(a)] = c + err = yt_data_extract.decrypt_signatures(info) + return err headers = ( ('Accept', '*/*'), diff --git a/youtube/yt_data_extract/__init__.py b/youtube/yt_data_extract/__init__.py index f2f07c0..898141e 100644 --- a/youtube/yt_data_extract/__init__.py +++ b/youtube/yt_data_extract/__init__.py @@ -7,4 +7,5 @@ from .everything_else import (extract_channel_info, extract_search_info, extract_playlist_metadata, extract_playlist_info, extract_comments_info) from .watch_extraction import (extract_watch_info, get_caption_url, - update_with_age_restricted_info) + update_with_age_restricted_info, requires_decryption, + extract_decryption_function, decrypt_signatures) diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py index ff39f62..09abbe3 100644 --- a/youtube/yt_data_extract/watch_extraction.py +++ b/youtube/yt_data_extract/watch_extraction.py @@ -7,6 +7,7 @@ from .common import (get, multi_get, deep_get, multi_deep_get, import json import urllib.parse import traceback +import re # from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py _formats = { @@ -377,7 +378,11 @@ def extract_watch_info(polymer_json): info['base_js'] = deep_get(top_level, 'player', 'assets', 'js') if info['base_js']: info['base_js'] = normalize_url(info['base_js']) + info['player_name'] = get(info['base_js'].split('/'), -2) + else: + info['player_name'] = None + # extract stuff from visible parts of page mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={}) if mobile: info.update(_extract_watch_info_mobile(top_level)) @@ -447,3 +452,94 @@ def update_with_age_restricted_info(info, video_info_page): _extract_formats(info, player_response) _extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX) + +def requires_decryption(info): + return ('formats' in info) and info['formats'] and info['formats'][0]['s'] + +# adapted from youtube-dl and invidious: +# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr +decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}') +op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)') +def extract_decryption_function(info, base_js): + '''Insert decryption function into info. Return error string if not successful. + Decryption function is a list of list[2] of numbers. + It is advisable to cache the decryption function (uniquely identified by info['player_name']) so base.js (1 MB) doesn't need to be redownloaded each time''' + info['decryption_function'] = None + decrypt_function_match = decrypt_function_re.search(base_js) + if decrypt_function_match is None: + return 'Could not find decryption function in base.js' + + function_body = decrypt_function_match.group(1).split(';')[1:-1] + if not function_body: + return 'Empty decryption function body' + + var_name = get(function_body[0].split('.'), 0) + if var_name is None: + return 'Could not find var_name' + + var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) + if var_body_match is None: + return 'Could not find var_body' + + operations = var_body_match.group(1).replace('\n', '').split('},') + if not operations: + return 'Did not find any definitions in var_body' + operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others + operation_definitions = {} + for op in operations: + colon_index = op.find(':') + opening_brace_index = op.find('{') + + if colon_index == -1 or opening_brace_index == -1: + return 'Could not parse operation' + op_name = op[:colon_index] + op_body = op[opening_brace_index+1:] + if op_body == 'a.reverse()': + operation_definitions[op_name] = 0 + elif op_body == 'a.splice(0,b)': + operation_definitions[op_name] = 1 + elif op_body.startswith('var c=a[0]'): + operation_definitions[op_name] = 2 + else: + return 'Unknown op_body: ' + op_body + + decryption_function = [] + for op_with_arg in function_body: + match = op_with_arg_re.fullmatch(op_with_arg) + if match is None: + return 'Could not parse operation with arg' + op_name = match.group(1) + if op_name not in operation_definitions: + return 'Unknown op_name: ' + op_name + op_argument = match.group(2) + decryption_function.append([operation_definitions[op_name], int(op_argument)]) + + info['decryption_function'] = decryption_function + return False + +def _operation_2(a, b): + c = a[0] + a[0] = a[b % len(a)] + a[b % len(a)] = c + +def decrypt_signatures(info): + '''Applies info['decryption_function'] to decrypt all the signatures. Return err.''' + if not info.get('decryption_function'): + return 'decryption_function not in info' + for format in info['formats']: + if not format['s'] or not format['sp'] or not format['url']: + print('Warning: s, sp, or url not in format') + continue + + a = list(format['s']) + for op, argument in info['decryption_function']: + if op == 0: + a.reverse() + elif op == 1: + a = a[argument:] + else: + _operation_2(a, argument) + + signature = ''.join(a) + format['url'] += '&' + format['sp'] + '=' + signature + return False -- cgit v1.2.3