diff options
-rw-r--r-- | youtube/proto_debug.py | 64 |
1 files changed, 43 insertions, 21 deletions
diff --git a/youtube/proto_debug.py b/youtube/proto_debug.py index 3a5541c..bf89ae6 100644 --- a/youtube/proto_debug.py +++ b/youtube/proto_debug.py @@ -104,6 +104,7 @@ from math import ceil import base64 import io + def byte(n): return bytes((n,)) @@ -116,12 +117,13 @@ def varint_encode(offset): This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data. See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.''' - needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case. + # (0).bit_length() returns 0, but we need 1 in that case. + needed_bytes = ceil(offset.bit_length()/7) or 1 encoded_bytes = bytearray(needed_bytes) for i in range(0, needed_bytes - 1): encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits offset = offset >> 7 - encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte + encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte return bytes(encoded_bytes) @@ -139,15 +141,18 @@ def varint_decode(encoded): def string(field_number, data): data = as_bytes(data) return _proto_field(2, field_number, varint_encode(len(data)) + data) + + nested = string + def uint(field_number, value): return _proto_field(0, field_number, varint_encode(value)) def _proto_field(wire_type, field_number, data): ''' See https://developers.google.com/protocol-buffers/docs/encoding#structure ''' - return varint_encode( (field_number << 3) | wire_type) + data + return varint_encode((field_number << 3) | wire_type) + data def percent_b64encode(data): @@ -173,7 +178,9 @@ def read_varint(data): except IndexError: if i == 0: raise EOFError() - raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) + raise Exception( + 'Unterminated varint starting at ' + str(data.tell() - i) + ) result |= (byte & 127) << 7*i if not byte & 128: break @@ -192,14 +199,17 @@ def read_group(data, end_sequence): def parse(data): - return {field_number: value for _, field_number, value in read_protobuf(data)} + return { + field_number: value for _, + field_number, value in read_protobuf(data) + } def b64_to_bytes(data): if isinstance(data, bytes): data = data.decode('ascii') data = data.replace("%3D", "=") - return base64.urlsafe_b64decode(data + "="*((4 - len(data)%4)%4) ) + return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4)) # -------------------------------------------------------------------- @@ -209,30 +219,41 @@ dec = b64_to_bytes def enc(t): return base64.urlsafe_b64encode(t).decode('ascii') + def uenc(t): return enc(t).replace("=", "%3D") + def b64_to_ascii(t): return base64.urlsafe_b64decode(t).decode('ascii', errors='replace') + def b64_to_bin(t): decoded = base64.urlsafe_b64decode(t) - #print(len(decoded)*8) + # print(len(decoded)*8) return " ".join(["{:08b}".format(x) for x in decoded]) + def bytes_to_bin(t): return " ".join(["{:08b}".format(x) for x in t]) + + def bin_to_bytes(t): return int(t, 2).to_bytes((len(t) + 7) // 8, 'big') + def bytes_to_hex(t): return ' '.join(hex(n)[2:].zfill(2) for n in t) + + tohex = bytes_to_hex fromhex = bytes.fromhex def aligned_ascii(data): - return ' '.join(' ' + chr(n) if n in range(32,128) else ' _' for n in data) + return ' '.join(' ' + chr(n) if n in range( + 32, 128) else ' _' for n in data) + def parse_protobuf(data, mutable=False, spec=()): data_original = data @@ -245,7 +266,7 @@ def parse_protobuf(data, mutable=False, spec=()): break wire_type = tag & 7 field_number = tag >> 3 - + if wire_type == 0: value = read_varint(data) elif wire_type == 1: @@ -265,9 +286,11 @@ def parse_protobuf(data, mutable=False, spec=()): else: yield (wire_type, field_number, value) + def pb(data, mutable=False): return list(parse_protobuf(data, mutable=mutable)) + def make_proto(fields): if len(fields) == 2 and fields[0] == 'base64': return enc(make_proto(fields[1])) @@ -312,6 +335,7 @@ _b32rev = None bytes_types = (bytes, bytearray) # Types acceptable as binary data + def _bytes_from_decode_data(s): if isinstance(s, str): try: @@ -327,7 +351,6 @@ def _bytes_from_decode_data(s): "string, not %r" % s.__class__.__name__) from None - def b32decode(s, casefold=False, map01=None): """Decode the Base32 encoded bytes-like object or ASCII string s. @@ -397,10 +420,12 @@ def b32decode(s, casefold=False, map01=None): raise binascii.Error('Incorrect padding') return bytes(decoded) + def dec32(data): if isinstance(data, bytes): data = data.decode('ascii') - return b32decode(data + "="*((8 - len(data)%8)%8)) + return b32decode(data + "="*((8 - len(data) % 8) % 8)) + def recursive_pb(data, filt=True): b64 = False @@ -415,7 +440,7 @@ def recursive_pb(data, filt=True): return data try: - result = pb(data, mutable=True) + result = pb(data, mutable=True) except Exception as e: return data for tuple in result: @@ -429,10 +454,10 @@ def recursive_pb(data, filt=True): return result - def indent_lines(lines, indent): return re.sub(r'^', ' '*indent, lines, flags=re.MULTILINE) + def _pp(obj, indent): # not my best work if isinstance(obj, tuple): if len(obj) == 3: # (wire_type, field_number, data) @@ -443,15 +468,13 @@ def _pp(obj, indent): # not my best work + ')') elif isinstance(obj, list): # [wire_type, field_number, data] - if (len(obj) == 3 - and not any(isinstance(x, (list, tuple)) for x in obj) - ): + if (len(obj) == 3 and not any( + isinstance(x, (list, tuple)) for x in obj)): return obj.__repr__() # [wire_type, field_number, [...]] - elif (len(obj) == 3 - and not any(isinstance(x, (list, tuple)) for x in obj[0:2]) - ): + elif (len(obj) == 3 and not any( + isinstance(x, (list, tuple)) for x in obj[0:2])): return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n' + indent_lines(_pp(obj[2], indent), indent) + '\n' + ']') @@ -464,6 +487,7 @@ def _pp(obj, indent): # not my best work else: return obj.__repr__() + def pp(obj, indent=1): '''Pretty prints the recursive pb structure''' print(_pp(obj, indent)) @@ -484,5 +508,3 @@ mobile_headers = ( ('X-YouTube-Client-Name', '2'), ('X-YouTube-Client-Version', '2.20180830'), ) + (('User-Agent', mobile_user_agent),) - - |