aboutsummaryrefslogtreecommitdiffstats
path: root/hypervideo_dl/jsinterp.py
diff options
context:
space:
mode:
Diffstat (limited to 'hypervideo_dl/jsinterp.py')
-rw-r--r--hypervideo_dl/jsinterp.py492
1 files changed, 385 insertions, 107 deletions
diff --git a/hypervideo_dl/jsinterp.py b/hypervideo_dl/jsinterp.py
index 7bda596..46834f8 100644
--- a/hypervideo_dl/jsinterp.py
+++ b/hypervideo_dl/jsinterp.py
@@ -1,5 +1,4 @@
-from __future__ import unicode_literals
-
+from collections.abc import MutableMapping
import json
import operator
import re
@@ -22,10 +21,55 @@ _OPERATORS = [
('*', operator.mul),
]
_ASSIGN_OPERATORS = [(op + '=', opfunc) for op, opfunc in _OPERATORS]
-_ASSIGN_OPERATORS.append(('=', lambda cur, right: right))
+_ASSIGN_OPERATORS.append(('=', (lambda cur, right: right)))
_NAME_RE = r'[a-zA-Z_$][a-zA-Z_$0-9]*'
+_MATCHING_PARENS = dict(zip('({[', ')}]'))
+
+
+class JS_Break(ExtractorError):
+ def __init__(self):
+ ExtractorError.__init__(self, 'Invalid break')
+
+
+class JS_Continue(ExtractorError):
+ def __init__(self):
+ ExtractorError.__init__(self, 'Invalid continue')
+
+
+class LocalNameSpace(MutableMapping):
+ def __init__(self, *stack):
+ self.stack = tuple(stack)
+
+ def __getitem__(self, key):
+ for scope in self.stack:
+ if key in scope:
+ return scope[key]
+ raise KeyError(key)
+
+ def __setitem__(self, key, value):
+ for scope in self.stack:
+ if key in scope:
+ scope[key] = value
+ break
+ else:
+ self.stack[0][key] = value
+ return value
+
+ def __delitem__(self, key):
+ raise NotImplementedError('Deleting is not supported')
+
+ def __iter__(self):
+ for scope in self.stack:
+ yield from scope
+
+ def __len__(self, key):
+ return len(iter(self))
+
+ def __repr__(self):
+ return f'LocalNameSpace{self.stack}'
+
class JSInterpreter(object):
def __init__(self, code, objects=None):
@@ -34,11 +78,56 @@ class JSInterpreter(object):
self.code = code
self._functions = {}
self._objects = objects
+ self.__named_object_counter = 0
+
+ def _named_object(self, namespace, obj):
+ self.__named_object_counter += 1
+ name = f'__hypervideo_dl_jsinterp_obj{self.__named_object_counter}'
+ namespace[name] = obj
+ return name
+
+ @staticmethod
+ def _separate(expr, delim=',', max_split=None):
+ if not expr:
+ return
+ counters = {k: 0 for k in _MATCHING_PARENS.values()}
+ start, splits, pos, delim_len = 0, 0, 0, len(delim) - 1
+ for idx, char in enumerate(expr):
+ if char in _MATCHING_PARENS:
+ counters[_MATCHING_PARENS[char]] += 1
+ elif char in counters:
+ counters[char] -= 1
+ if char != delim[pos] or any(counters.values()):
+ pos = 0
+ continue
+ elif pos != delim_len:
+ pos += 1
+ continue
+ yield expr[start: idx - delim_len]
+ start, pos = idx + 1, 0
+ splits += 1
+ if max_split and splits >= max_split:
+ break
+ yield expr[start:]
+
+ @staticmethod
+ def _separate_at_paren(expr, delim):
+ separated = list(JSInterpreter._separate(expr, delim, 1))
+ if len(separated) < 2:
+ raise ExtractorError(f'No terminating paren {delim} in {expr}')
+ return separated[0][1:].strip(), separated[1].strip()
def interpret_statement(self, stmt, local_vars, allow_recursion=100):
if allow_recursion < 0:
raise ExtractorError('Recursion limit reached')
+ sub_statements = list(self._separate(stmt, ';'))
+ stmt = (sub_statements or ['']).pop()
+ for sub_stmt in sub_statements:
+ ret, should_abort = self.interpret_statement(sub_stmt, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+
should_abort = False
stmt = stmt.lstrip()
stmt_m = re.match(r'var\s', stmt)
@@ -61,25 +150,122 @@ class JSInterpreter(object):
if expr == '': # Empty expression
return None
+ if expr.startswith('{'):
+ inner, outer = self._separate_at_paren(expr, '}')
+ inner, should_abort = self.interpret_statement(inner, local_vars, allow_recursion - 1)
+ if not outer or should_abort:
+ return inner
+ else:
+ expr = json.dumps(inner) + outer
+
if expr.startswith('('):
- parens_count = 0
- for m in re.finditer(r'[()]', expr):
- if m.group(0) == '(':
- parens_count += 1
+ inner, outer = self._separate_at_paren(expr, ')')
+ inner = self.interpret_expression(inner, local_vars, allow_recursion)
+ if not outer:
+ return inner
+ else:
+ expr = json.dumps(inner) + outer
+
+ if expr.startswith('['):
+ inner, outer = self._separate_at_paren(expr, ']')
+ name = self._named_object(local_vars, [
+ self.interpret_expression(item, local_vars, allow_recursion)
+ for item in self._separate(inner)])
+ expr = name + outer
+
+ m = re.match(r'try\s*', expr)
+ if m:
+ if expr[m.end()] == '{':
+ try_expr, expr = self._separate_at_paren(expr[m.end():], '}')
+ else:
+ try_expr, expr = expr[m.end() - 1:], ''
+ ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ m = re.match(r'catch\s*\(', expr)
+ if m:
+ # We ignore the catch block
+ _, expr = self._separate_at_paren(expr, '}')
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ m = re.match(r'for\s*\(', expr)
+ if m:
+ constructor, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
+ if remaining.startswith('{'):
+ body, expr = self._separate_at_paren(remaining, '}')
+ else:
+ m = re.match(r'switch\s*\(', remaining) # FIXME
+ if m:
+ switch_val, remaining = self._separate_at_paren(remaining[m.end() - 1:], ')')
+ body, expr = self._separate_at_paren(remaining, '}')
+ body = 'switch(%s){%s}' % (switch_val, body)
else:
- parens_count -= 1
- if parens_count == 0:
- sub_expr = expr[1:m.start()]
- sub_result = self.interpret_expression(
- sub_expr, local_vars, allow_recursion)
- remaining_expr = expr[m.end():].strip()
- if not remaining_expr:
- return sub_result
- else:
- expr = json.dumps(sub_result) + remaining_expr
+ body, expr = remaining, ''
+ start, cndn, increment = self._separate(constructor, ';')
+ if self.interpret_statement(start, local_vars, allow_recursion - 1)[1]:
+ raise ExtractorError(
+ f'Premature return in the initialization of a for loop in {constructor!r}')
+ while True:
+ if not self.interpret_expression(cndn, local_vars, allow_recursion):
+ break
+ try:
+ ret, should_abort = self.interpret_statement(body, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ except JS_Break:
+ break
+ except JS_Continue:
+ pass
+ if self.interpret_statement(increment, local_vars, allow_recursion - 1)[1]:
+ raise ExtractorError(
+ f'Premature return in the initialization of a for loop in {constructor!r}')
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ m = re.match(r'switch\s*\(', expr)
+ if m:
+ switch_val, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
+ switch_val = self.interpret_expression(switch_val, local_vars, allow_recursion)
+ body, expr = self._separate_at_paren(remaining, '}')
+ items = body.replace('default:', 'case default:').split('case ')[1:]
+ for default in (False, True):
+ matched = False
+ for item in items:
+ case, stmt = [i.strip() for i in self._separate(item, ':', 1)]
+ if default:
+ matched = matched or case == 'default'
+ elif not matched:
+ matched = case != 'default' and switch_val == self.interpret_expression(case, local_vars, allow_recursion)
+ if not matched:
+ continue
+ try:
+ ret, should_abort = self.interpret_statement(stmt, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ except JS_Break:
break
- else:
- raise ExtractorError('Premature end of parens in %r' % expr)
+ if matched:
+ break
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ # Comma separated statements
+ sub_expressions = list(self._separate(expr))
+ expr = sub_expressions.pop().strip() if sub_expressions else ''
+ for sub_expr in sub_expressions:
+ self.interpret_expression(sub_expr, local_vars, allow_recursion)
+
+ for m in re.finditer(rf'''(?x)
+ (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
+ (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr):
+ var = m.group('var1') or m.group('var2')
+ start, end = m.span()
+ sign = m.group('pre_sign') or m.group('post_sign')
+ ret = local_vars[var]
+ local_vars[var] += 1 if sign[0] == '+' else -1
+ if m.group('pre_sign'):
+ ret = local_vars[var]
+ expr = expr[:start] + json.dumps(ret) + expr[end:]
for op, opfunc in _ASSIGN_OPERATORS:
m = re.match(r'''(?x)
@@ -88,14 +274,13 @@ class JSInterpreter(object):
(?P<expr>.*)$''' % (_NAME_RE, re.escape(op)), expr)
if not m:
continue
- right_val = self.interpret_expression(
- m.group('expr'), local_vars, allow_recursion - 1)
+ right_val = self.interpret_expression(m.group('expr'), local_vars, allow_recursion)
if m.groupdict().get('index'):
lvar = local_vars[m.group('out')]
- idx = self.interpret_expression(
- m.group('index'), local_vars, allow_recursion)
- assert isinstance(idx, int)
+ idx = self.interpret_expression(m.group('index'), local_vars, allow_recursion)
+ if not isinstance(idx, int):
+ raise ExtractorError(f'List indices must be integers: {idx}')
cur = lvar[idx]
val = opfunc(cur, right_val)
lvar[idx] = val
@@ -109,8 +294,13 @@ class JSInterpreter(object):
if expr.isdigit():
return int(expr)
+ if expr == 'break':
+ raise JS_Break()
+ elif expr == 'continue':
+ raise JS_Continue()
+
var_m = re.match(
- r'(?!if|return|true|false)(?P<name>%s)$' % _NAME_RE,
+ r'(?!if|return|true|false|null)(?P<name>%s)$' % _NAME_RE,
expr)
if var_m:
return local_vars[var_m.group('name')]
@@ -124,91 +314,154 @@ class JSInterpreter(object):
r'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE, expr)
if m:
val = local_vars[m.group('in')]
- idx = self.interpret_expression(
- m.group('idx'), local_vars, allow_recursion - 1)
+ idx = self.interpret_expression(m.group('idx'), local_vars, allow_recursion)
return val[idx]
+ for op, opfunc in _OPERATORS:
+ separated = list(self._separate(expr, op))
+ if len(separated) < 2:
+ continue
+ right_val = separated.pop()
+ left_val = op.join(separated)
+ left_val, should_abort = self.interpret_statement(
+ left_val, local_vars, allow_recursion - 1)
+ if should_abort:
+ raise ExtractorError(f'Premature left-side return of {op} in {expr!r}')
+ right_val, should_abort = self.interpret_statement(
+ right_val, local_vars, allow_recursion - 1)
+ if should_abort:
+ raise ExtractorError(f'Premature right-side return of {op} in {expr!r}')
+ return opfunc(left_val or 0, right_val)
+
m = re.match(
- r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*(?:\(+(?P<args>[^()]*)\))?$' % _NAME_RE,
+ r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*' % _NAME_RE,
expr)
if m:
variable = m.group('var')
member = remove_quotes(m.group('member') or m.group('member2'))
- arg_str = m.group('args')
-
- if variable in local_vars:
- obj = local_vars[variable]
- else:
- if variable not in self._objects:
- self._objects[variable] = self.extract_object(variable)
- obj = self._objects[variable]
-
- if arg_str is None:
- # Member access
- if member == 'length':
- return len(obj)
- return obj[member]
-
- assert expr.endswith(')')
- # Function call
- if arg_str == '':
- argvals = tuple()
+ arg_str = expr[m.end():]
+ if arg_str.startswith('('):
+ arg_str, remaining = self._separate_at_paren(arg_str, ')')
else:
- argvals = tuple([
+ arg_str, remaining = None, arg_str
+
+ def assertion(cndn, msg):
+ """ assert, but without risk of getting optimized out """
+ if not cndn:
+ raise ExtractorError(f'{member} {msg}: {expr}')
+
+ def eval_method():
+ nonlocal member
+ if variable == 'String':
+ obj = str
+ elif variable in local_vars:
+ obj = local_vars[variable]
+ else:
+ if variable not in self._objects:
+ self._objects[variable] = self.extract_object(variable)
+ obj = self._objects[variable]
+
+ if arg_str is None:
+ # Member access
+ if member == 'length':
+ return len(obj)
+ return obj[member]
+
+ # Function call
+ argvals = [
self.interpret_expression(v, local_vars, allow_recursion)
- for v in arg_str.split(',')])
-
- if member == 'split':
- assert argvals == ('',)
- return list(obj)
- if member == 'join':
- assert len(argvals) == 1
- return argvals[0].join(obj)
- if member == 'reverse':
- assert len(argvals) == 0
- obj.reverse()
- return obj
- if member == 'slice':
- assert len(argvals) == 1
- return obj[argvals[0]:]
- if member == 'splice':
- assert isinstance(obj, list)
- index, howMany = argvals
- res = []
- for i in range(index, min(index + howMany, len(obj))):
- res.append(obj.pop(index))
- return res
-
- return obj[member](argvals)
-
- for op, opfunc in _OPERATORS:
- m = re.match(r'(?P<x>.+?)%s(?P<y>.+)' % re.escape(op), expr)
- if not m:
- continue
- x, abort = self.interpret_statement(
- m.group('x'), local_vars, allow_recursion - 1)
- if abort:
- raise ExtractorError(
- 'Premature left-side return of %s in %r' % (op, expr))
- y, abort = self.interpret_statement(
- m.group('y'), local_vars, allow_recursion - 1)
- if abort:
- raise ExtractorError(
- 'Premature right-side return of %s in %r' % (op, expr))
- return opfunc(x, y)
+ for v in self._separate(arg_str)]
+
+ if obj == str:
+ if member == 'fromCharCode':
+ assertion(argvals, 'takes one or more arguments')
+ return ''.join(map(chr, argvals))
+ raise ExtractorError(f'Unsupported string method {member}')
+
+ if member == 'split':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(argvals == [''], 'with arguments is not implemented')
+ return list(obj)
+ elif member == 'join':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(len(argvals) == 1, 'takes exactly one argument')
+ return argvals[0].join(obj)
+ elif member == 'reverse':
+ assertion(not argvals, 'does not take any arguments')
+ obj.reverse()
+ return obj
+ elif member == 'slice':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(len(argvals) == 1, 'takes exactly one argument')
+ return obj[argvals[0]:]
+ elif member == 'splice':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(argvals, 'takes one or more arguments')
+ index, howMany = map(int, (argvals + [len(obj)])[:2])
+ if index < 0:
+ index += len(obj)
+ add_items = argvals[2:]
+ res = []
+ for i in range(index, min(index + howMany, len(obj))):
+ res.append(obj.pop(index))
+ for i, item in enumerate(add_items):
+ obj.insert(index + i, item)
+ return res
+ elif member == 'unshift':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(argvals, 'takes one or more arguments')
+ for item in reversed(argvals):
+ obj.insert(0, item)
+ return obj
+ elif member == 'pop':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(not argvals, 'does not take any arguments')
+ if not obj:
+ return
+ return obj.pop()
+ elif member == 'push':
+ assertion(argvals, 'takes one or more arguments')
+ obj.extend(argvals)
+ return obj
+ elif member == 'forEach':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
+ f, this = (argvals + [''])[:2]
+ return [f((item, idx, obj), this=this) for idx, item in enumerate(obj)]
+ elif member == 'indexOf':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
+ idx, start = (argvals + [0])[:2]
+ try:
+ return obj.index(idx, start)
+ except ValueError:
+ return -1
+
+ if isinstance(obj, list):
+ member = int(member)
+ return obj[member](argvals)
+
+ if remaining:
+ return self.interpret_expression(
+ self._named_object(local_vars, eval_method()) + remaining,
+ local_vars, allow_recursion)
+ else:
+ return eval_method()
- m = re.match(
- r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr)
+ m = re.match(r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr)
if m:
fname = m.group('func')
argvals = tuple([
int(v) if v.isdigit() else local_vars[v]
- for v in m.group('args').split(',')]) if len(m.group('args')) > 0 else tuple()
- if fname not in self._functions:
+ for v in self._separate(m.group('args'))])
+ if fname in local_vars:
+ return local_vars[fname](argvals)
+ elif fname not in self._functions:
self._functions[fname] = self.extract_function(fname)
return self._functions[fname](argvals)
- raise ExtractorError('Unsupported JS expression %r' % expr)
+ if expr:
+ raise ExtractorError('Unsupported JS expression %r' % expr)
def extract_object(self, objname):
_FUNC_NAME_RE = r'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
@@ -233,30 +486,55 @@ class JSInterpreter(object):
return obj
- def extract_function(self, funcname):
+ def extract_function_code(self, funcname):
+ """ @returns argnames, code """
func_m = re.search(
r'''(?x)
(?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s*
\((?P<args>[^)]*)\)\s*
- \{(?P<code>[^}]+)\}''' % (
+ (?P<code>\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % (
re.escape(funcname), re.escape(funcname), re.escape(funcname)),
self.code)
+ code, _ = self._separate_at_paren(func_m.group('code'), '}') # refine the match
if func_m is None:
raise ExtractorError('Could not find JS function %r' % funcname)
- argnames = func_m.group('args').split(',')
+ return func_m.group('args').split(','), code
- return self.build_function(argnames, func_m.group('code'))
+ def extract_function(self, funcname):
+ return self.extract_function_from_code(*self.extract_function_code(funcname))
+
+ def extract_function_from_code(self, argnames, code, *global_stack):
+ local_vars = {}
+ while True:
+ mobj = re.search(r'function\((?P<args>[^)]*)\)\s*{', code)
+ if mobj is None:
+ break
+ start, body_start = mobj.span()
+ body, remaining = self._separate_at_paren(code[body_start - 1:], '}')
+ name = self._named_object(
+ local_vars,
+ self.extract_function_from_code(
+ [str.strip(x) for x in mobj.group('args').split(',')],
+ body, local_vars, *global_stack))
+ code = code[:start] + name + remaining
+ return self.build_function(argnames, code, local_vars, *global_stack)
def call_function(self, funcname, *args):
- f = self.extract_function(funcname)
- return f(args)
-
- def build_function(self, argnames, code):
- def resf(args):
- local_vars = dict(zip(argnames, args))
- for stmt in code.split(';'):
- res, abort = self.interpret_statement(stmt, local_vars)
- if abort:
+ return self.extract_function(funcname)(args)
+
+ def build_function(self, argnames, code, *global_stack):
+ global_stack = list(global_stack) or [{}]
+ local_vars = global_stack.pop(0)
+
+ def resf(args, **kwargs):
+ local_vars.update({
+ **dict(zip(argnames, args)),
+ **kwargs
+ })
+ var_stack = LocalNameSpace(local_vars, *global_stack)
+ for stmt in self._separate(code.replace('\n', ''), ';'):
+ ret, should_abort = self.interpret_statement(stmt, var_stack)
+ if should_abort:
break
- return res
+ return ret
return resf