diff options
Diffstat (limited to 'youtube/proto_debug.py')
-rw-r--r-- | youtube/proto_debug.py | 126 |
1 files changed, 91 insertions, 35 deletions
diff --git a/youtube/proto_debug.py b/youtube/proto_debug.py index bf89ae6..75c09a4 100644 --- a/youtube/proto_debug.py +++ b/youtube/proto_debug.py @@ -79,11 +79,11 @@ The function pp will pretty print the recursive structure: make_proto will take a recursive_pb structure and make a ctoken out of it: - base64 means a base64 encode with equals sign paddings - base64s means a base64 encode without padding -- base64u means a url base64 encode with equals signs replaced with %3D +- base64p means a url base64 encode with equals signs replaced with %3D -recursive_pb cannot detect between base64 or base64u or base64s so +recursive_pb cannot detect between base64 or base64p or base64s so those must be manually specified if recreating the token. Will not have -make_proto(recursive_pb(x)) == x if x is using base64u or base64s +make_proto(recursive_pb(x)) == x if x is using base64p or base64s There are some other functions I wrote while reverse engineering stuff that may or may not be useful. @@ -123,7 +123,8 @@ def varint_encode(offset): for i in range(0, needed_bytes - 1): encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits offset = offset >> 7 - encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte + # leave first bit as zero for last byte + encoded_bytes[-1] = offset & 127 return bytes(encoded_bytes) @@ -198,11 +199,88 @@ def read_group(data, end_sequence): return data.original[start:index] -def parse(data): - return { - field_number: value for _, - field_number, value in read_protobuf(data) - } +def parse(data, include_wire_type=False): + '''Returns a dict mapping field numbers to values + + data is the protobuf structure, which must not be b64-encoded''' + if include_wire_type: + return {field_number: [wire_type, value] + for wire_type, field_number, value in read_protobuf(data)} + return {field_number: value + for _, field_number, value in read_protobuf(data)} + + +base64_enc_funcs = { + 'base64': base64.urlsafe_b64encode, + 'base64s': unpadded_b64encode, + 'base64p': percent_b64encode, +} + + +def _make_protobuf(data): + # must be dict mapping field_number to [wire_type, value] + if isinstance(data, dict): + new_data = [] + for field_num, (wire_type, value) in sorted(data.items()): + new_data.append((wire_type, field_num, value)) + data = new_data + if isinstance(data, str): + return data.encode('utf-8') + elif len(data) == 2 and data[0] in list(base64_enc_funcs.keys()): + return base64_enc_funcs[data[0]](_make_protobuf(data[1])) + elif isinstance(data, list): + result = b'' + for field in data: + if field[0] == 0: + result += uint(field[1], field[2]) + elif field[0] == 2: + result += string(field[1], _make_protobuf(field[2])) + else: + raise NotImplementedError('Wire type ' + str(field[0]) + + ' not implemented') + return result + return data + + +def make_protobuf(data): + return _make_protobuf(data).decode('ascii') + + +make_proto = make_protobuf + + +def _set_protobuf_value(data, *path, value): + if not path: + return value + op = path[0] + if op in base64_enc_funcs: + inner_data = b64_to_bytes(data) + return base64_enc_funcs[op]( + _set_protobuf_value(inner_data, *path[1:], value=value) + ) + pb_dict = parse(data, include_wire_type=True) + pb_dict[op][1] = _set_protobuf_value( + pb_dict[op][1], *path[1:], value=value + ) + return _make_protobuf(pb_dict) + + +def set_protobuf_value(data, *path, value): + '''Set a field's value in a raw protobuf structure + + path is a list of field numbers and/or base64 encoding directives + + The directives are + base64: normal base64 encoding with equal signs padding + base64s ("stripped"): no padding + base64p: %3D instead of = for padding + + return new_protobuf, err''' + try: + new_protobuf = _set_protobuf_value(data, *path, value=value) + return new_protobuf.decode('ascii'), None + except Exception: + return None, traceback.format_exc() def b64_to_bytes(data): @@ -287,33 +365,11 @@ def parse_protobuf(data, mutable=False, spec=()): yield (wire_type, field_number, value) -def pb(data, mutable=False): - return list(parse_protobuf(data, mutable=mutable)) +read_protobuf = parse_protobuf -def make_proto(fields): - if len(fields) == 2 and fields[0] == 'base64': - return enc(make_proto(fields[1])) - result = b'' - for field in fields: - if field[0] == 0: - result += _proto_field(0, field[1], varint_encode(field[2])) - elif field[0] == 2: - data = field[2] - if isinstance(data, str): - data = data.encode('utf-8') - elif len(data) == 2 and data[0] == 'base64': - data = base64.urlsafe_b64encode(make_proto(data[1])) - elif len(data) == 2 and data[0] == 'base64s': - data = base64.urlsafe_b64encode(make_proto(data[1])).rstrip(b'=') - elif len(data) == 2 and data[0] == 'base64u': - data = base64.urlsafe_b64encode(make_proto(data[1])).replace(b'=', b'%3D') - elif isinstance(data, list): - data = make_proto(data) - result += _proto_field(2, field[1], varint_encode(len(data)) + data) - else: - raise NotImplementedError('Wire type ' + str(field[0]) + ' not implemented') - return result +def pb(data, mutable=False): + return list(parse_protobuf(data, mutable=mutable)) def bytes_to_base4(data): @@ -424,7 +480,7 @@ def b32decode(s, casefold=False, map01=None): def dec32(data): if isinstance(data, bytes): data = data.decode('ascii') - return b32decode(data + "="*((8 - len(data) % 8) % 8)) + return b32decode(data + "="*((8 - len(data)%8)%8)) def recursive_pb(data, filt=True): |