From 70b56d6eef4fd9d6c46c8fbf48dfec3ae7a2937e Mon Sep 17 00:00:00 2001 From: James Taylor Date: Fri, 18 Oct 2019 14:02:28 -0700 Subject: Extraction: Add signature decryption --- youtube/watch.py | 149 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 133 insertions(+), 16 deletions(-) (limited to 'youtube/watch.py') diff --git a/youtube/watch.py b/youtube/watch.py index a5e0759..959dca2 100644 --- a/youtube/watch.py +++ b/youtube/watch.py @@ -11,8 +11,14 @@ import gevent import os import math import traceback +import re +import urllib - +try: + with open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'r') as f: + decrypt_cache = json.loads(f.read())['decrypt_cache'] +except FileNotFoundError: + decrypt_cache = {} def get_video_sources(info): @@ -22,9 +28,9 @@ def get_video_sources(info): else: max_resolution = settings.default_resolution for format in info['formats']: - if not all(attr in format for attr in ('height', 'width', 'ext', 'url')): + if not all(format[attr] for attr in ('height', 'width', 'ext', 'url')): continue - if 'acodec' in format and 'vcodec' in format and format['height'] <= max_resolution: + if format['acodec'] and format['vcodec'] and format['height'] <= max_resolution: video_sources.append({ 'src': format['url'], 'type': 'video/' + format['ext'], @@ -101,6 +107,112 @@ def get_ordered_music_list_attributes(music_list): return ordered_attributes +def save_decrypt_cache(): + try: + f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w') + except FileNotFoundError: + os.makedirs(settings.data_dir) + f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w') + + f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True)) + f.close() + +# adapted from youtube-dl and invidious: +# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr +decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}') +op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)') +def decrypt_signatures(info): + '''return error string, or False if no errors''' + if not info['formats'] or not info['formats'][0]['s']: + return False # No decryption needed + if not info['base_js']: + return 'Failed to find base.js' + player_name = yt_data_extract.default_get(info['base_js'].split('/'), -2) + if not player_name: + return 'Could not find player name' + + if player_name in decrypt_cache: + print('Using cached decryption function for: ' + player_name) + decryption_function = decrypt_cache[player_name] + else: + base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name) + base_js = base_js.decode('utf-8') + + decrypt_function_match = decrypt_function_re.search(base_js) + if decrypt_function_match is None: + return 'Could not find decryption function in base.js' + + function_body = decrypt_function_match.group(1).split(';')[1:-1] + if not function_body: + return 'Empty decryption function body' + + var_name = yt_data_extract.default_get(function_body[0].split('.'), 0) + if var_name is None: + return 'Could not find var_name' + + var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL) + if var_body_match is None: + return 'Could not find var_body' + + operations = var_body_match.group(1).replace('\n', '').split('},') + if not operations: + return 'Did not find any definitions in var_body' + operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others + operation_definitions = {} + for op in operations: + colon_index = op.find(':') + opening_brace_index = op.find('{') + + if colon_index == -1 or opening_brace_index == -1: + return 'Could not parse operation' + op_name = op[:colon_index] + op_body = op[opening_brace_index+1:] + if op_body == 'a.reverse()': + operation_definitions[op_name] = 0 + elif op_body == 'a.splice(0,b)': + operation_definitions[op_name] = 1 + elif op_body.startswith('var c=a[0]'): + operation_definitions[op_name] = 2 + else: + return 'Unknown op_body: ' + op_body + + decryption_function = [] + for op_with_arg in function_body: + match = op_with_arg_re.fullmatch(op_with_arg) + if match is None: + return 'Could not parse operation with arg' + op_name = match.group(1) + if op_name not in operation_definitions: + return 'Unknown op_name: ' + op_name + op_argument = match.group(2) + decryption_function.append([operation_definitions[op_name], int(op_argument)]) + + decrypt_cache[player_name] = decryption_function + save_decrypt_cache() + + for format in info['formats']: + if not format['s'] or not format['sp'] or not format['url']: + print('Warning: s, sp, or url not in format') + continue + + a = list(format['s']) + for op, argument in decryption_function: + if op == 0: + a.reverse() + elif op == 1: + a = a[argument:] + else: + operation_2(a, argument) + + signature = ''.join(a) + format['url'] += '&' + format['sp'] + '=' + signature + return False + +def operation_2(a, b): + c = a[0] + a[0] = a[b % len(a)] + a[b % len(a)] = c + headers = ( ('Accept', '*/*'), ('Accept-Language', 'en-US,en;q=0.5'), @@ -115,26 +227,31 @@ def extract_info(video_id): except json.decoder.JSONDecodeError: traceback.print_exc() return {'error': 'Failed to parse json response'} - return yt_data_extract.extract_watch_info(polymer_json) + info = yt_data_extract.extract_watch_info(polymer_json) + error = decrypt_signatures(info) + if error: + print('Error decrypting url signatures: ' + error) + info['playability_error'] = error + return info def video_quality_string(format): - if 'vcodec' in format: - result =str(format.get('width', '?')) + 'x' + str(format.get('height', '?')) - if 'fps' in format: - result += ' ' + format['fps'] + 'fps' + if format['vcodec']: + result =str(format['width'] or '?') + 'x' + str(format['height'] or '?') + if format['fps']: + result += ' ' + str(format['fps']) + 'fps' return result - elif 'acodec' in format: + elif format['acodec']: return 'audio only' return '?' def audio_quality_string(format): - if 'acodec' in format: - result = str(format.get('abr', '?')) + 'k' - if 'audio_sample_rate' in format: + if format['acodec']: + result = str(format['audio_bitrate'] or '?') + 'k' + if format['audio_sample_rate']: result += ' ' + str(format['audio_sample_rate']) + ' Hz' return result - elif 'vcodec' in format: + elif format['vcodec']: return 'video only' return '?' @@ -193,13 +310,13 @@ def get_watch_page(): download_formats = [] for format in info['formats']: - if 'acodec' in format and 'vcodec' in format: + if format['acodec'] and format['vcodec']: codecs_string = format['acodec'] + ', ' + format['vcodec'] else: - codecs_string = format.get('acodec') or format.get('vcodec') or '?' + codecs_string = format['acodec'] or format['vcodec'] or '?' download_formats.append({ 'url': format['url'], - 'ext': format.get('ext', '?'), + 'ext': format['ext'] or '?', 'audio_quality': audio_quality_string(format), 'video_quality': video_quality_string(format), 'file_size': format_bytes(format['file_size']), -- cgit v1.2.3