diff options
Diffstat (limited to 'youtube/proto_debug.py')
-rw-r--r-- | youtube/proto_debug.py | 488 |
1 files changed, 488 insertions, 0 deletions
diff --git a/youtube/proto_debug.py b/youtube/proto_debug.py new file mode 100644 index 0000000..3a5541c --- /dev/null +++ b/youtube/proto_debug.py @@ -0,0 +1,488 @@ +# TODO: clean this file up more and heavily refactor + +''' Helper functions for reverse engineering protobuf. + +Basic guide: + +Run interactively with python3 -i proto_debug.py + +The function dec will decode a base64 string +(regardless of whether it includes = or %3D at the end) to a bytestring + +The function pb (parse_protobuf) will return a list of tuples. +Each tuple is (wire_type, field_number, field_data) + +The function enc encodes as base64 (inverse of dec) +The function uenc is like enc but replaces = with %3D + +See https://developers.google.com/protocol-buffers/docs/encoding#structure + +Example usage: +>>> pb(dec('4qmFsgJcEhhVQ1lPX2phYl9lc3VGUlY0YjE3QUp0QXcaQEVnWjJhV1JsYjNNWUF5QUFNQUU0QWVvREdFTm5Ua1JSVlVWVFEzZHBYM2gwTTBaeFRuRkZiRFZqUWclM0QlM0Q%3D')) +[(2, 80226972, b'\x12\x18UCYO_jab_esuFRV4b17AJtAw\x1a@EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D')] + +>>> pb(b'\x12\x18UCYO_jab_esuFRV4b17AJtAw\x1a@EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D') +[(2, 2, b'UCYO_jab_esuFRV4b17AJtAw'), (2, 3, b'EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D')] + +>>> pb(dec(b'EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D')) +[(2, 2, b'videos'), (0, 3, 3), (0, 4, 0), (0, 6, 1), (0, 7, 1), (2, 61, b'CgNDQUESCwi_xt3FqNqEl5cB')] + +>>> pb(dec(b'CgNDQUESCwi_xt3FqNqEl5cB')) +[(2, 1, b'CAA'), (2, 2, b'\x08\xbf\xc6\xdd\xc5\xa8\xda\x84\x97\x97\x01')] + +>>> pb(b'\x08\xbf\xc6\xdd\xc5\xa8\xda\x84\x97\x97\x01') +[(0, 1, 10893665244101960511)] + +>>> pb(dec(b'CAA')) +[(0, 1, 0)] + +The function recursive_pb will try to do dec/pb recursively automatically. +It's a dumb function (so might try to dec or pb something that isn't really +base64 or protobuf) and it's a mess right now so disclaimer. +The function pp will pretty print the recursive structure: + +>>> pp(recursive_pb('4qmFsgJcEhhVQ1lPX2phYl9lc3VGUlY0YjE3QUp0QXcaQEVnWjJhV1JsYjNNWUF5QUFNQUU0QWVvREdFTm5Ua1JSVlVWVFEzZHBYM2gwTTBaeFRuRkZiRFZqUWclM0QlM0Q%3D')) + +('base64', + [ + [2, 80226972, + [ + [2, 2, b'UCYO_jab_esuFRV4b17AJtAw'], + [2, 3, + ('base64', + [ + [2, 2, b'videos'], + [0, 3, 3], + [0, 4, 0], + [0, 6, 1], + [0, 7, 1], + [2, 61, + ('base64', + [ + [2, 1, b'CAA'], + [2, 2, + [ + [0, 1, 10893665244101960511], + ] + ], + ] + ) + ], + ] + ) + ], + ] + ], + ] +) + +make_proto will take a recursive_pb structure and make a ctoken out of it: +- base64 means a base64 encode with equals sign paddings +- base64s means a base64 encode without padding +- base64u means a url base64 encode with equals signs replaced with %3D + +recursive_pb cannot detect between base64 or base64u or base64s so +those must be manually specified if recreating the token. Will not have +make_proto(recursive_pb(x)) == x if x is using base64u or base64s + +There are some other functions I wrote while reverse engineering stuff +that may or may not be useful. +''' + + +import urllib.request +import urllib.parse +import re +import time +import json +import os +import pprint + + +# ------ from proto.py ----------------------------------------------- +from math import ceil +import base64 +import io + +def byte(n): + return bytes((n,)) + + +def varint_encode(offset): + '''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one. + The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is + aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as: + 1ccccccc 1bbbbbbb 0aaaaaaa + + This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data. + See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.''' + needed_bytes = ceil(offset.bit_length()/7) or 1 # (0).bit_length() returns 0, but we need 1 in that case. + encoded_bytes = bytearray(needed_bytes) + for i in range(0, needed_bytes - 1): + encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits + offset = offset >> 7 + encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte + + return bytes(encoded_bytes) + + +def varint_decode(encoded): + decoded = 0 + for i, byte in enumerate(encoded): + decoded |= (byte & 127) << 7*i + + if not (byte & 128): + break + return decoded + + +def string(field_number, data): + data = as_bytes(data) + return _proto_field(2, field_number, varint_encode(len(data)) + data) +nested = string + +def uint(field_number, value): + return _proto_field(0, field_number, varint_encode(value)) + + +def _proto_field(wire_type, field_number, data): + ''' See https://developers.google.com/protocol-buffers/docs/encoding#structure ''' + return varint_encode( (field_number << 3) | wire_type) + data + + +def percent_b64encode(data): + return base64.urlsafe_b64encode(data).replace(b'=', b'%3D') + + +def unpadded_b64encode(data): + return base64.urlsafe_b64encode(data).replace(b'=', b'') + + +def as_bytes(value): + if isinstance(value, str): + return value.encode('utf-8') + return value + + +def read_varint(data): + result = 0 + i = 0 + while True: + try: + byte = data.read(1)[0] + except IndexError: + if i == 0: + raise EOFError() + raise Exception('Unterminated varint starting at ' + str(data.tell() - i)) + result |= (byte & 127) << 7*i + if not byte & 128: + break + + i += 1 + return result + + +def read_group(data, end_sequence): + start = data.tell() + index = data.original.find(end_sequence, start) + if index == -1: + raise Exception('Unterminated group') + data.seek(index + len(end_sequence)) + return data.original[start:index] + + +def parse(data): + return {field_number: value for _, field_number, value in read_protobuf(data)} + + +def b64_to_bytes(data): + if isinstance(data, bytes): + data = data.decode('ascii') + data = data.replace("%3D", "=") + return base64.urlsafe_b64decode(data + "="*((4 - len(data)%4)%4) ) +# -------------------------------------------------------------------- + + +dec = b64_to_bytes + + +def enc(t): + return base64.urlsafe_b64encode(t).decode('ascii') + +def uenc(t): + return enc(t).replace("=", "%3D") + +def b64_to_ascii(t): + return base64.urlsafe_b64decode(t).decode('ascii', errors='replace') + +def b64_to_bin(t): + decoded = base64.urlsafe_b64decode(t) + #print(len(decoded)*8) + return " ".join(["{:08b}".format(x) for x in decoded]) + +def bytes_to_bin(t): + return " ".join(["{:08b}".format(x) for x in t]) +def bin_to_bytes(t): + return int(t, 2).to_bytes((len(t) + 7) // 8, 'big') + +def bytes_to_hex(t): + return ' '.join(hex(n)[2:].zfill(2) for n in t) +tohex = bytes_to_hex +fromhex = bytes.fromhex + + +def aligned_ascii(data): + return ' '.join(' ' + chr(n) if n in range(32,128) else ' _' for n in data) + +def parse_protobuf(data, mutable=False, spec=()): + data_original = data + data = io.BytesIO(data) + data.original = data_original + while True: + try: + tag = read_varint(data) + except EOFError: + break + wire_type = tag & 7 + field_number = tag >> 3 + + if wire_type == 0: + value = read_varint(data) + elif wire_type == 1: + value = data.read(8) + elif wire_type == 2: + length = read_varint(data) + value = data.read(length) + elif wire_type == 3: + end_bytes = varint_encode((field_number << 3) | 4) + value = read_group(data, end_bytes) + elif wire_type == 5: + value = data.read(4) + else: + raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell())) + if mutable: + yield [wire_type, field_number, value] + else: + yield (wire_type, field_number, value) + +def pb(data, mutable=False): + return list(parse_protobuf(data, mutable=mutable)) + +def make_proto(fields): + if len(fields) == 2 and fields[0] == 'base64': + return enc(make_proto(fields[1])) + result = b'' + for field in fields: + if field[0] == 0: + result += _proto_field(0, field[1], varint_encode(field[2])) + elif field[0] == 2: + data = field[2] + if isinstance(data, str): + data = data.encode('utf-8') + elif len(data) == 2 and data[0] == 'base64': + data = base64.urlsafe_b64encode(make_proto(data[1])) + elif len(data) == 2 and data[0] == 'base64s': + data = base64.urlsafe_b64encode(make_proto(data[1])).rstrip(b'=') + elif len(data) == 2 and data[0] == 'base64u': + data = base64.urlsafe_b64encode(make_proto(data[1])).replace(b'=', b'%3D') + elif isinstance(data, list): + data = make_proto(data) + result += _proto_field(2, field[1], varint_encode(len(data)) + data) + else: + raise NotImplementedError('Wire type ' + str(field[0]) + ' not implemented') + return result + + +def bytes_to_base4(data): + result = '' + for b in data: + result += str(b >> 6) + str((b >> 4) & 0b11) + str((b >> 2) & 0b11) + str(b & 0b11) + return result + + +import re +import struct +import binascii + + +# Base32 encoding/decoding must be done in Python +_b32alphabet = b'abcdefghijklmnopqrstuvwxyz012345' +_b32tab2 = None +_b32rev = None + +bytes_types = (bytes, bytearray) # Types acceptable as binary data + +def _bytes_from_decode_data(s): + if isinstance(s, str): + try: + return s.encode('ascii') + except UnicodeEncodeError: + raise ValueError('string argument should contain only ASCII characters') + if isinstance(s, bytes_types): + return s + try: + return memoryview(s).tobytes() + except TypeError: + raise TypeError("argument should be a bytes-like object or ASCII " + "string, not %r" % s.__class__.__name__) from None + + + +def b32decode(s, casefold=False, map01=None): + """Decode the Base32 encoded bytes-like object or ASCII string s. + + Optional casefold is a flag specifying whether a lowercase alphabet is + acceptable as input. For security purposes, the default is False. + + RFC 3548 allows for optional mapping of the digit 0 (zero) to the + letter O (oh), and for optional mapping of the digit 1 (one) to + either the letter I (eye) or letter L (el). The optional argument + map01 when not None, specifies which letter the digit 1 should be + mapped to (when map01 is not None, the digit 0 is always mapped to + the letter O). For security purposes the default is None, so that + 0 and 1 are not allowed in the input. + + The result is returned as a bytes object. A binascii.Error is raised if + the input is incorrectly padded or if there are non-alphabet + characters present in the input. + """ + global _b32rev + # Delay the initialization of the table to not waste memory + # if the function is never called + if _b32rev is None: + _b32rev = {v: k for k, v in enumerate(_b32alphabet)} + s = _bytes_from_decode_data(s) + if len(s) % 8: + raise binascii.Error('Incorrect padding') + # Handle section 2.4 zero and one mapping. The flag map01 will be either + # False, or the character to map the digit 1 (one) to. It should be + # either L (el) or I (eye). + if map01 is not None: + map01 = _bytes_from_decode_data(map01) + assert len(map01) == 1, repr(map01) + s = s.translate(bytes.maketrans(b'01', b'O' + map01)) + if casefold: + s = s.upper() + # Strip off pad characters from the right. We need to count the pad + # characters because this will tell us how many null bytes to remove from + # the end of the decoded string. + l = len(s) + s = s.rstrip(b'=') + padchars = l - len(s) + # Now decode the full quanta + decoded = bytearray() + b32rev = _b32rev + for i in range(0, len(s), 8): + quanta = s[i: i + 8] + acc = 0 + try: + for c in quanta: + acc = (acc << 5) + b32rev[c] + except KeyError: + raise binascii.Error('Non-base32 digit found') from None + decoded += acc.to_bytes(5, 'big') + # Process the last, partial quanta + if padchars: + acc <<= 5 * padchars + last = acc.to_bytes(5, 'big') + if padchars == 1: + decoded[-5:] = last[:-1] + elif padchars == 3: + decoded[-5:] = last[:-2] + elif padchars == 4: + decoded[-5:] = last[:-3] + elif padchars == 6: + decoded[-5:] = last[:-4] + else: + raise binascii.Error('Incorrect padding') + return bytes(decoded) + +def dec32(data): + if isinstance(data, bytes): + data = data.decode('ascii') + return b32decode(data + "="*((8 - len(data)%8)%8)) + +def recursive_pb(data, filt=True): + b64 = False + if isinstance(data, str) or all(i > 32 for i in data): + try: + if len(data) > 11 and data[0:2] != b'UC': + data = b64_to_bytes(data) + b64 = True + else: + return data + except Exception as e: + return data + + try: + result = pb(data, mutable=True) + except Exception as e: + return data + for tuple in result: + if tuple[0] == 2: + try: + tuple[2] = recursive_pb(tuple[2]) + except Exception: + pass + if b64: + return ('base64', result) + return result + + + +def indent_lines(lines, indent): + return re.sub(r'^', ' '*indent, lines, flags=re.MULTILINE) + +def _pp(obj, indent): # not my best work + if isinstance(obj, tuple): + if len(obj) == 3: # (wire_type, field_number, data) + return obj.__repr__() + else: # (base64, [...]) + return ('(' + obj[0].__repr__() + ',\n' + + indent_lines(_pp(obj[1], indent), indent) + '\n' + + ')') + elif isinstance(obj, list): + # [wire_type, field_number, data] + if (len(obj) == 3 + and not any(isinstance(x, (list, tuple)) for x in obj) + ): + return obj.__repr__() + + # [wire_type, field_number, [...]] + elif (len(obj) == 3 + and not any(isinstance(x, (list, tuple)) for x in obj[0:2]) + ): + return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n' + + indent_lines(_pp(obj[2], indent), indent) + '\n' + + ']') + else: + s = '[\n' + for x in obj: + s += indent_lines(_pp(x, indent), indent) + ',\n' + s += ']' + return s + else: + return obj.__repr__() + +def pp(obj, indent=1): + '''Pretty prints the recursive pb structure''' + print(_pp(obj, indent)) + + +desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0' +desktop_headers = ( + ('Accept', '*/*'), + ('Accept-Language', 'en-US,en;q=0.5'), + ('X-YouTube-Client-Name', '1'), + ('X-YouTube-Client-Version', '2.20180830'), +) + (('User-Agent', desktop_user_agent),) + +mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36' +mobile_headers = ( + ('Accept', '*/*'), + ('Accept-Language', 'en-US,en;q=0.5'), + ('X-YouTube-Client-Name', '2'), + ('X-YouTube-Client-Version', '2.20180830'), +) + (('User-Agent', mobile_user_agent),) + + |