aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-12-19 21:28:21 -0800
committerJames Taylor <user234683@users.noreply.github.com>2019-12-19 21:28:21 -0800
commit6b7a1212e30b713453aa7d2b3a7122e97689dad0 (patch)
treea1bcebdc84c36abaa9fbd1ced566815e75098bb1
parent4a3529df9577b660a2f493ab63ef08f10320b38e (diff)
downloadyt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.lz
yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.tar.xz
yt-local-6b7a1212e30b713453aa7d2b3a7122e97689dad0.zip
Extraction: Move non-stateful signature decryption functionality into yt_data_extract
-rw-r--r--youtube/watch.py97
-rw-r--r--youtube/yt_data_extract/__init__.py3
-rw-r--r--youtube/yt_data_extract/watch_extraction.py96
3 files changed, 110 insertions, 86 deletions
diff --git a/youtube/watch.py b/youtube/watch.py
index 45d658f..429f272 100644
--- a/youtube/watch.py
+++ b/youtube/watch.py
@@ -11,7 +11,6 @@ import gevent
import os
import math
import traceback
-import re
import urllib
try:
@@ -175,101 +174,29 @@ def save_decrypt_cache():
f.write(json.dumps({'version': 1, 'decrypt_cache':decrypt_cache}, indent=4, sort_keys=True))
f.close()
-# adapted from youtube-dl and invidious:
-# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
-decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}')
-op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)')
def decrypt_signatures(info):
'''return error string, or False if no errors'''
- if ('formats' not in info) or (not info['formats']) or (not info['formats'][0]['s']):
- return False # No decryption needed
+ if not yt_data_extract.requires_decryption(info):
+ return False
+ if not info['player_name']:
+ return 'Could not find player name'
if not info['base_js']:
return 'Failed to find base.js'
- player_name = yt_data_extract.get(info['base_js'].split('/'), -2)
- if not player_name:
- return 'Could not find player name'
+ player_name = info['player_name']
if player_name in decrypt_cache:
print('Using cached decryption function for: ' + player_name)
- decryption_function = decrypt_cache[player_name]
+ info['decryption_function'] = decrypt_cache[player_name]
else:
base_js = util.fetch_url(info['base_js'], debug_name='base.js', report_text='Fetched player ' + player_name)
base_js = base_js.decode('utf-8')
-
- decrypt_function_match = decrypt_function_re.search(base_js)
- if decrypt_function_match is None:
- return 'Could not find decryption function in base.js'
-
- function_body = decrypt_function_match.group(1).split(';')[1:-1]
- if not function_body:
- return 'Empty decryption function body'
-
- var_name = yt_data_extract.get(function_body[0].split('.'), 0)
- if var_name is None:
- return 'Could not find var_name'
-
- var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
- if var_body_match is None:
- return 'Could not find var_body'
-
- operations = var_body_match.group(1).replace('\n', '').split('},')
- if not operations:
- return 'Did not find any definitions in var_body'
- operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others
- operation_definitions = {}
- for op in operations:
- colon_index = op.find(':')
- opening_brace_index = op.find('{')
-
- if colon_index == -1 or opening_brace_index == -1:
- return 'Could not parse operation'
- op_name = op[:colon_index]
- op_body = op[opening_brace_index+1:]
- if op_body == 'a.reverse()':
- operation_definitions[op_name] = 0
- elif op_body == 'a.splice(0,b)':
- operation_definitions[op_name] = 1
- elif op_body.startswith('var c=a[0]'):
- operation_definitions[op_name] = 2
- else:
- return 'Unknown op_body: ' + op_body
-
- decryption_function = []
- for op_with_arg in function_body:
- match = op_with_arg_re.fullmatch(op_with_arg)
- if match is None:
- return 'Could not parse operation with arg'
- op_name = match.group(1)
- if op_name not in operation_definitions:
- return 'Unknown op_name: ' + op_name
- op_argument = match.group(2)
- decryption_function.append([operation_definitions[op_name], int(op_argument)])
-
- decrypt_cache[player_name] = decryption_function
+ err = yt_data_extract.extract_decryption_function(info, base_js)
+ if err:
+ return err
+ decrypt_cache[player_name] = info['decryption_function']
save_decrypt_cache()
-
- for format in info['formats']:
- if not format['s'] or not format['sp'] or not format['url']:
- print('Warning: s, sp, or url not in format')
- continue
-
- a = list(format['s'])
- for op, argument in decryption_function:
- if op == 0:
- a.reverse()
- elif op == 1:
- a = a[argument:]
- else:
- operation_2(a, argument)
-
- signature = ''.join(a)
- format['url'] += '&' + format['sp'] + '=' + signature
- return False
-
-def operation_2(a, b):
- c = a[0]
- a[0] = a[b % len(a)]
- a[b % len(a)] = c
+ err = yt_data_extract.decrypt_signatures(info)
+ return err
headers = (
('Accept', '*/*'),
diff --git a/youtube/yt_data_extract/__init__.py b/youtube/yt_data_extract/__init__.py
index f2f07c0..898141e 100644
--- a/youtube/yt_data_extract/__init__.py
+++ b/youtube/yt_data_extract/__init__.py
@@ -7,4 +7,5 @@ from .everything_else import (extract_channel_info, extract_search_info,
extract_playlist_metadata, extract_playlist_info, extract_comments_info)
from .watch_extraction import (extract_watch_info, get_caption_url,
- update_with_age_restricted_info)
+ update_with_age_restricted_info, requires_decryption,
+ extract_decryption_function, decrypt_signatures)
diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py
index ff39f62..09abbe3 100644
--- a/youtube/yt_data_extract/watch_extraction.py
+++ b/youtube/yt_data_extract/watch_extraction.py
@@ -7,6 +7,7 @@ from .common import (get, multi_get, deep_get, multi_deep_get,
import json
import urllib.parse
import traceback
+import re
# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py
_formats = {
@@ -377,7 +378,11 @@ def extract_watch_info(polymer_json):
info['base_js'] = deep_get(top_level, 'player', 'assets', 'js')
if info['base_js']:
info['base_js'] = normalize_url(info['base_js'])
+ info['player_name'] = get(info['base_js'].split('/'), -2)
+ else:
+ info['player_name'] = None
+ # extract stuff from visible parts of page
mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={})
if mobile:
info.update(_extract_watch_info_mobile(top_level))
@@ -447,3 +452,94 @@ def update_with_age_restricted_info(info, video_info_page):
_extract_formats(info, player_response)
_extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX)
+
+def requires_decryption(info):
+ return ('formats' in info) and info['formats'] and info['formats'][0]['s']
+
+# adapted from youtube-dl and invidious:
+# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
+decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}]+)\}')
+op_with_arg_re = re.compile(r'[^\.]+\.([^\(]+)\(a,(\d+)\)')
+def extract_decryption_function(info, base_js):
+ '''Insert decryption function into info. Return error string if not successful.
+ Decryption function is a list of list[2] of numbers.
+ It is advisable to cache the decryption function (uniquely identified by info['player_name']) so base.js (1 MB) doesn't need to be redownloaded each time'''
+ info['decryption_function'] = None
+ decrypt_function_match = decrypt_function_re.search(base_js)
+ if decrypt_function_match is None:
+ return 'Could not find decryption function in base.js'
+
+ function_body = decrypt_function_match.group(1).split(';')[1:-1]
+ if not function_body:
+ return 'Empty decryption function body'
+
+ var_name = get(function_body[0].split('.'), 0)
+ if var_name is None:
+ return 'Could not find var_name'
+
+ var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
+ if var_body_match is None:
+ return 'Could not find var_body'
+
+ operations = var_body_match.group(1).replace('\n', '').split('},')
+ if not operations:
+ return 'Did not find any definitions in var_body'
+ operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others
+ operation_definitions = {}
+ for op in operations:
+ colon_index = op.find(':')
+ opening_brace_index = op.find('{')
+
+ if colon_index == -1 or opening_brace_index == -1:
+ return 'Could not parse operation'
+ op_name = op[:colon_index]
+ op_body = op[opening_brace_index+1:]
+ if op_body == 'a.reverse()':
+ operation_definitions[op_name] = 0
+ elif op_body == 'a.splice(0,b)':
+ operation_definitions[op_name] = 1
+ elif op_body.startswith('var c=a[0]'):
+ operation_definitions[op_name] = 2
+ else:
+ return 'Unknown op_body: ' + op_body
+
+ decryption_function = []
+ for op_with_arg in function_body:
+ match = op_with_arg_re.fullmatch(op_with_arg)
+ if match is None:
+ return 'Could not parse operation with arg'
+ op_name = match.group(1)
+ if op_name not in operation_definitions:
+ return 'Unknown op_name: ' + op_name
+ op_argument = match.group(2)
+ decryption_function.append([operation_definitions[op_name], int(op_argument)])
+
+ info['decryption_function'] = decryption_function
+ return False
+
+def _operation_2(a, b):
+ c = a[0]
+ a[0] = a[b % len(a)]
+ a[b % len(a)] = c
+
+def decrypt_signatures(info):
+ '''Applies info['decryption_function'] to decrypt all the signatures. Return err.'''
+ if not info.get('decryption_function'):
+ return 'decryption_function not in info'
+ for format in info['formats']:
+ if not format['s'] or not format['sp'] or not format['url']:
+ print('Warning: s, sp, or url not in format')
+ continue
+
+ a = list(format['s'])
+ for op, argument in info['decryption_function']:
+ if op == 0:
+ a.reverse()
+ elif op == 1:
+ a = a[argument:]
+ else:
+ _operation_2(a, argument)
+
+ signature = ''.join(a)
+ format['url'] += '&' + format['sp'] + '=' + signature
+ return False