aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/watch.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-10-18 14:02:28 -0700
committerJames Taylor <user234683@users.noreply.github.com>2019-10-18 14:02:28 -0700
commit70b56d6eef4fd9d6c46c8fbf48dfec3ae7a2937e (patch)
treed9cb5041d8689955b6ce17e6cecfcd160ca1f46a /youtube/watch.py
parent4c07546e7a5e5882abdda896009b744e947df1c4 (diff)
downloadyt-local-70b56d6eef4fd9d6c46c8fbf48dfec3ae7a2937e.tar.lz
yt-local-70b56d6eef4fd9d6c46c8fbf48dfec3ae7a2937e.tar.xz
yt-local-70b56d6eef4fd9d6c46c8fbf48dfec3ae7a2937e.zip
Extraction: Add signature decryption
Diffstat (limited to 'youtube/watch.py')
-rw-r--r--youtube/watch.py149
1 files changed, 133 insertions, 16 deletions
diff --git a/youtube/watch.py b/youtube/watch.py
index a5e0759..959dca2 100644
--- a/youtube/watch.py
+++ b/youtube/watch.py
@@ -11,8 +11,14 @@ import gevent
import os
import math
import traceback
+import re
+import urllib
-
+try:
+ with open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'r') as f:
+ decrypt_cache = json.loads(f.read())['decrypt_cache']
+except FileNotFoundError:
+ decrypt_cache = {}
def get_video_sources(info):
@@ -22,9 +28,9 @@ def get_video_sources(info):
else:
max_resolution = settings.default_resolution
for format in info['formats']:
- if not all(attr in format for attr in ('height', 'width', 'ext', 'url')):
+ if not all(format[attr] for attr in ('height', 'width', 'ext', 'url')):
continue
- if 'acodec' in format and 'vcodec' in format and format['height'] <= max_resolution:
+ if format['acodec'] and format['vcodec'] and format['height'] <= max_resolution:
video_sources.append({
'src': format['url'],
'type': 'video/' + format['ext'],
@@ -101,6 +107,112 @@ def get_ordered_music_list_attributes(music_list):
return ordered_attributes
+def save_decrypt_cache():
+ try:
+ f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
+ except FileNotFoundError:
+ os.makedirs(settings.data_dir)
+ f = open(os.path.join(settings.data_dir, 'decrypt_function_cache.json'), 'w')
+
+ f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True))
+ f.close()
+
+# adapted from youtube-dl and invidious:
+# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
+decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}')
+op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)')
+def decrypt_signatures(info):
+ '''return error string, or False if no errors'''
+ if not info['formats'] or not info['formats'][0]['s']:
+ return False # No decryption needed
+ if not info['base_js']:
+ return 'Failed to find base.js'
+ player_name = yt_data_extract.default_get(info['base_js'].split('/'), -2)
+ if not player_name:
+ return 'Could not find player name'
+
+ if player_name in decrypt_cache:
+ print('Using cached decryption function for: ' + player_name)
+ decryption_function = decrypt_cache[player_name]
+ else:
+ base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
+ base_js = base_js.decode('utf-8')
+
+ decrypt_function_match = decrypt_function_re.search(base_js)
+ if decrypt_function_match is None:
+ return 'Could not find decryption function in base.js'
+
+ function_body = decrypt_function_match.group(1).split(';')[1:-1]
+ if not function_body:
+ return 'Empty decryption function body'
+
+ var_name = yt_data_extract.default_get(function_body[0].split('.'), 0)
+ if var_name is None:
+ return 'Could not find var_name'
+
+ var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
+ if var_body_match is None:
+ return 'Could not find var_body'
+
+ operations = var_body_match.group(1).replace('\n', '').split('},')
+ if not operations:
+ return 'Did not find any definitions in var_body'
+ operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others
+ operation_definitions = {}
+ for op in operations:
+ colon_index = op.find(':')
+ opening_brace_index = op.find('{')
+
+ if colon_index == -1 or opening_brace_index == -1:
+ return 'Could not parse operation'
+ op_name = op[:colon_index]
+ op_body = op[opening_brace_index+1:]
+ if op_body == 'a.reverse()':
+ operation_definitions[op_name] = 0
+ elif op_body == 'a.splice(0,b)':
+ operation_definitions[op_name] = 1
+ elif op_body.startswith('var c=a[0]'):
+ operation_definitions[op_name] = 2
+ else:
+ return 'Unknown op_body: ' + op_body
+
+ decryption_function = []
+ for op_with_arg in function_body:
+ match = op_with_arg_re.fullmatch(op_with_arg)
+ if match is None:
+ return 'Could not parse operation with arg'
+ op_name = match.group(1)
+ if op_name not in operation_definitions:
+ return 'Unknown op_name: ' + op_name
+ op_argument = match.group(2)
+ decryption_function.append([operation_definitions[op_name], int(op_argument)])
+
+ decrypt_cache[player_name] = decryption_function
+ save_decrypt_cache()
+
+ for format in info['formats']:
+ if not format['s'] or not format['sp'] or not format['url']:
+ print('Warning: s, sp, or url not in format')
+ continue
+
+ a = list(format['s'])
+ for op, argument in decryption_function:
+ if op == 0:
+ a.reverse()
+ elif op == 1:
+ a = a[argument:]
+ else:
+ operation_2(a, argument)
+
+ signature = ''.join(a)
+ format['url'] += '&' + format['sp'] + '=' + signature
+ return False
+
+def operation_2(a, b):
+ c = a[0]
+ a[0] = a[b % len(a)]
+ a[b % len(a)] = c
+
headers = (
('Accept', '*/*'),
('Accept-Language', 'en-US,en;q=0.5'),
@@ -115,26 +227,31 @@ def extract_info(video_id):
except json.decoder.JSONDecodeError:
traceback.print_exc()
return {'error': 'Failed to parse json response'}
- return yt_data_extract.extract_watch_info(polymer_json)
+ info = yt_data_extract.extract_watch_info(polymer_json)
+ error = decrypt_signatures(info)
+ if error:
+ print('Error decrypting url signatures: ' + error)
+ info['playability_error'] = error
+ return info
def video_quality_string(format):
- if 'vcodec' in format:
- result =str(format.get('width', '?')) + 'x' + str(format.get('height', '?'))
- if 'fps' in format:
- result += ' ' + format['fps'] + 'fps'
+ if format['vcodec']:
+ result =str(format['width'] or '?') + 'x' + str(format['height'] or '?')
+ if format['fps']:
+ result += ' ' + str(format['fps']) + 'fps'
return result
- elif 'acodec' in format:
+ elif format['acodec']:
return 'audio only'
return '?'
def audio_quality_string(format):
- if 'acodec' in format:
- result = str(format.get('abr', '?')) + 'k'
- if 'audio_sample_rate' in format:
+ if format['acodec']:
+ result = str(format['audio_bitrate'] or '?') + 'k'
+ if format['audio_sample_rate']:
result += ' ' + str(format['audio_sample_rate']) + ' Hz'
return result
- elif 'vcodec' in format:
+ elif format['vcodec']:
return 'video only'
return '?'
@@ -193,13 +310,13 @@ def get_watch_page():
download_formats = []
for format in info['formats']:
- if 'acodec' in format and 'vcodec' in format:
+ if format['acodec'] and format['vcodec']:
codecs_string = format['acodec'] + ', ' + format['vcodec']
else:
- codecs_string = format.get('acodec') or format.get('vcodec') or '?'
+ codecs_string = format['acodec'] or format['vcodec'] or '?'
download_formats.append({
'url': format['url'],
- 'ext': format.get('ext', '?'),
+ 'ext': format['ext'] or '?',
'audio_quality': audio_quality_string(format),
'video_quality': video_quality_string(format),
'file_size': format_bytes(format['file_size']),