1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
|
# TODO: clean this file up more and heavily refactor
''' Helper functions for reverse engineering protobuf.
Basic guide:
Run interactively with python3 -i proto_debug.py
The function dec will decode a base64 string
(regardless of whether it includes = or %3D at the end) to a bytestring
The function pb (parse_protobuf) will return a list of tuples.
Each tuple is (wire_type, field_number, field_data)
The function enc encodes as base64 (inverse of dec)
The function uenc is like enc but replaces = with %3D
See https://developers.google.com/protocol-buffers/docs/encoding#structure
Example usage:
>>> pb(dec('4qmFsgJcEhhVQ1lPX2phYl9lc3VGUlY0YjE3QUp0QXcaQEVnWjJhV1JsYjNNWUF5QUFNQUU0QWVvREdFTm5Ua1JSVlVWVFEzZHBYM2gwTTBaeFRuRkZiRFZqUWclM0QlM0Q%3D'))
[(2, 80226972, b'\x12\x18UCYO_jab_esuFRV4b17AJtAw\x1a@EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D')]
>>> pb(b'\x12\x18UCYO_jab_esuFRV4b17AJtAw\x1a@EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D')
[(2, 2, b'UCYO_jab_esuFRV4b17AJtAw'), (2, 3, b'EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D')]
>>> pb(dec(b'EgZ2aWRlb3MYAyAAMAE4AeoDGENnTkRRVUVTQ3dpX3h0M0ZxTnFFbDVjQg%3D%3D'))
[(2, 2, b'videos'), (0, 3, 3), (0, 4, 0), (0, 6, 1), (0, 7, 1), (2, 61, b'CgNDQUESCwi_xt3FqNqEl5cB')]
>>> pb(dec(b'CgNDQUESCwi_xt3FqNqEl5cB'))
[(2, 1, b'CAA'), (2, 2, b'\x08\xbf\xc6\xdd\xc5\xa8\xda\x84\x97\x97\x01')]
>>> pb(b'\x08\xbf\xc6\xdd\xc5\xa8\xda\x84\x97\x97\x01')
[(0, 1, 10893665244101960511)]
>>> pb(dec(b'CAA'))
[(0, 1, 0)]
The function recursive_pb will try to do dec/pb recursively automatically.
It's a dumb function (so might try to dec or pb something that isn't really
base64 or protobuf) and it's a mess right now so disclaimer.
The function pp will pretty print the recursive structure:
>>> pp(recursive_pb('4qmFsgJcEhhVQ1lPX2phYl9lc3VGUlY0YjE3QUp0QXcaQEVnWjJhV1JsYjNNWUF5QUFNQUU0QWVvREdFTm5Ua1JSVlVWVFEzZHBYM2gwTTBaeFRuRkZiRFZqUWclM0QlM0Q%3D'))
('base64',
[
[2, 80226972,
[
[2, 2, b'UCYO_jab_esuFRV4b17AJtAw'],
[2, 3,
('base64',
[
[2, 2, b'videos'],
[0, 3, 3],
[0, 4, 0],
[0, 6, 1],
[0, 7, 1],
[2, 61,
('base64',
[
[2, 1, b'CAA'],
[2, 2,
[
[0, 1, 10893665244101960511],
]
],
]
)
],
]
)
],
]
],
]
)
make_proto will take a recursive_pb structure and make a ctoken out of it:
- base64 means a base64 encode with equals sign paddings
- base64s means a base64 encode without padding
- base64u means a url base64 encode with equals signs replaced with %3D
recursive_pb cannot detect between base64 or base64u or base64s so
those must be manually specified if recreating the token. Will not have
make_proto(recursive_pb(x)) == x if x is using base64u or base64s
There are some other functions I wrote while reverse engineering stuff
that may or may not be useful.
'''
import urllib.request
import urllib.parse
import re
import time
import json
import os
import pprint
# ------ from proto.py -----------------------------------------------
from math import ceil
import base64
import io
def byte(n):
return bytes((n,))
def varint_encode(offset):
'''In this encoding system, for each 8-bit byte, the first bit is 1 if there are more bytes, and 0 is this is the last one.
The next 7 bits are data. These 7-bit sections represent the data in Little endian order. For example, suppose the data is
aaaaaaabbbbbbbccccccc (each of these sections is 7 bits). It will be encoded as:
1ccccccc 1bbbbbbb 0aaaaaaa
This encoding is used in youtube parameters to encode offsets and to encode the length for length-prefixed data.
See https://developers.google.com/protocol-buffers/docs/encoding#varints for more info.'''
# (0).bit_length() returns 0, but we need 1 in that case.
needed_bytes = ceil(offset.bit_length()/7) or 1
encoded_bytes = bytearray(needed_bytes)
for i in range(0, needed_bytes - 1):
encoded_bytes[i] = (offset & 127) | 128 # 7 least significant bits
offset = offset >> 7
encoded_bytes[-1] = offset & 127 # leave first bit as zero for last byte
return bytes(encoded_bytes)
def varint_decode(encoded):
decoded = 0
for i, byte in enumerate(encoded):
decoded |= (byte & 127) << 7*i
if not (byte & 128):
break
return decoded
def string(field_number, data):
data = as_bytes(data)
return _proto_field(2, field_number, varint_encode(len(data)) + data)
nested = string
def uint(field_number, value):
return _proto_field(0, field_number, varint_encode(value))
def _proto_field(wire_type, field_number, data):
''' See https://developers.google.com/protocol-buffers/docs/encoding#structure '''
return varint_encode((field_number << 3) | wire_type) + data
def percent_b64encode(data):
return base64.urlsafe_b64encode(data).replace(b'=', b'%3D')
def unpadded_b64encode(data):
return base64.urlsafe_b64encode(data).replace(b'=', b'')
def as_bytes(value):
if isinstance(value, str):
return value.encode('utf-8')
return value
def read_varint(data):
result = 0
i = 0
while True:
try:
byte = data.read(1)[0]
except IndexError:
if i == 0:
raise EOFError()
raise Exception(
'Unterminated varint starting at ' + str(data.tell() - i)
)
result |= (byte & 127) << 7*i
if not byte & 128:
break
i += 1
return result
def read_group(data, end_sequence):
start = data.tell()
index = data.original.find(end_sequence, start)
if index == -1:
raise Exception('Unterminated group')
data.seek(index + len(end_sequence))
return data.original[start:index]
def parse(data):
return {
field_number: value for _,
field_number, value in read_protobuf(data)
}
def b64_to_bytes(data):
if isinstance(data, bytes):
data = data.decode('ascii')
data = data.replace("%3D", "=")
return base64.urlsafe_b64decode(data + "="*((4 - len(data) % 4) % 4))
# --------------------------------------------------------------------
dec = b64_to_bytes
def enc(t):
return base64.urlsafe_b64encode(t).decode('ascii')
def uenc(t):
return enc(t).replace("=", "%3D")
def b64_to_ascii(t):
return base64.urlsafe_b64decode(t).decode('ascii', errors='replace')
def b64_to_bin(t):
decoded = base64.urlsafe_b64decode(t)
# print(len(decoded)*8)
return " ".join(["{:08b}".format(x) for x in decoded])
def bytes_to_bin(t):
return " ".join(["{:08b}".format(x) for x in t])
def bin_to_bytes(t):
return int(t, 2).to_bytes((len(t) + 7) // 8, 'big')
def bytes_to_hex(t):
return ' '.join(hex(n)[2:].zfill(2) for n in t)
tohex = bytes_to_hex
fromhex = bytes.fromhex
def aligned_ascii(data):
return ' '.join(' ' + chr(n) if n in range(
32, 128) else ' _' for n in data)
def parse_protobuf(data, mutable=False, spec=()):
data_original = data
data = io.BytesIO(data)
data.original = data_original
while True:
try:
tag = read_varint(data)
except EOFError:
break
wire_type = tag & 7
field_number = tag >> 3
if wire_type == 0:
value = read_varint(data)
elif wire_type == 1:
value = data.read(8)
elif wire_type == 2:
length = read_varint(data)
value = data.read(length)
elif wire_type == 3:
end_bytes = varint_encode((field_number << 3) | 4)
value = read_group(data, end_bytes)
elif wire_type == 5:
value = data.read(4)
else:
raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(varint_encode(tag)) + ", at position " + str(data.tell()))
if mutable:
yield [wire_type, field_number, value]
else:
yield (wire_type, field_number, value)
def pb(data, mutable=False):
return list(parse_protobuf(data, mutable=mutable))
def make_proto(fields):
if len(fields) == 2 and fields[0] == 'base64':
return enc(make_proto(fields[1]))
result = b''
for field in fields:
if field[0] == 0:
result += _proto_field(0, field[1], varint_encode(field[2]))
elif field[0] == 2:
data = field[2]
if isinstance(data, str):
data = data.encode('utf-8')
elif len(data) == 2 and data[0] == 'base64':
data = base64.urlsafe_b64encode(make_proto(data[1]))
elif len(data) == 2 and data[0] == 'base64s':
data = base64.urlsafe_b64encode(make_proto(data[1])).rstrip(b'=')
elif len(data) == 2 and data[0] == 'base64u':
data = base64.urlsafe_b64encode(make_proto(data[1])).replace(b'=', b'%3D')
elif isinstance(data, list):
data = make_proto(data)
result += _proto_field(2, field[1], varint_encode(len(data)) + data)
else:
raise NotImplementedError('Wire type ' + str(field[0]) + ' not implemented')
return result
def bytes_to_base4(data):
result = ''
for b in data:
result += str(b >> 6) + str((b >> 4) & 0b11) + str((b >> 2) & 0b11) + str(b & 0b11)
return result
import re
import struct
import binascii
# Base32 encoding/decoding must be done in Python
_b32alphabet = b'abcdefghijklmnopqrstuvwxyz012345'
_b32tab2 = None
_b32rev = None
bytes_types = (bytes, bytearray) # Types acceptable as binary data
def _bytes_from_decode_data(s):
if isinstance(s, str):
try:
return s.encode('ascii')
except UnicodeEncodeError:
raise ValueError('string argument should contain only ASCII characters')
if isinstance(s, bytes_types):
return s
try:
return memoryview(s).tobytes()
except TypeError:
raise TypeError("argument should be a bytes-like object or ASCII "
"string, not %r" % s.__class__.__name__) from None
def b32decode(s, casefold=False, map01=None):
"""Decode the Base32 encoded bytes-like object or ASCII string s.
Optional casefold is a flag specifying whether a lowercase alphabet is
acceptable as input. For security purposes, the default is False.
RFC 3548 allows for optional mapping of the digit 0 (zero) to the
letter O (oh), and for optional mapping of the digit 1 (one) to
either the letter I (eye) or letter L (el). The optional argument
map01 when not None, specifies which letter the digit 1 should be
mapped to (when map01 is not None, the digit 0 is always mapped to
the letter O). For security purposes the default is None, so that
0 and 1 are not allowed in the input.
The result is returned as a bytes object. A binascii.Error is raised if
the input is incorrectly padded or if there are non-alphabet
characters present in the input.
"""
global _b32rev
# Delay the initialization of the table to not waste memory
# if the function is never called
if _b32rev is None:
_b32rev = {v: k for k, v in enumerate(_b32alphabet)}
s = _bytes_from_decode_data(s)
if len(s) % 8:
raise binascii.Error('Incorrect padding')
# Handle section 2.4 zero and one mapping. The flag map01 will be either
# False, or the character to map the digit 1 (one) to. It should be
# either L (el) or I (eye).
if map01 is not None:
map01 = _bytes_from_decode_data(map01)
assert len(map01) == 1, repr(map01)
s = s.translate(bytes.maketrans(b'01', b'O' + map01))
if casefold:
s = s.upper()
# Strip off pad characters from the right. We need to count the pad
# characters because this will tell us how many null bytes to remove from
# the end of the decoded string.
l = len(s)
s = s.rstrip(b'=')
padchars = l - len(s)
# Now decode the full quanta
decoded = bytearray()
b32rev = _b32rev
for i in range(0, len(s), 8):
quanta = s[i: i + 8]
acc = 0
try:
for c in quanta:
acc = (acc << 5) + b32rev[c]
except KeyError:
raise binascii.Error('Non-base32 digit found') from None
decoded += acc.to_bytes(5, 'big')
# Process the last, partial quanta
if padchars:
acc <<= 5 * padchars
last = acc.to_bytes(5, 'big')
if padchars == 1:
decoded[-5:] = last[:-1]
elif padchars == 3:
decoded[-5:] = last[:-2]
elif padchars == 4:
decoded[-5:] = last[:-3]
elif padchars == 6:
decoded[-5:] = last[:-4]
else:
raise binascii.Error('Incorrect padding')
return bytes(decoded)
def dec32(data):
if isinstance(data, bytes):
data = data.decode('ascii')
return b32decode(data + "="*((8 - len(data) % 8) % 8))
def recursive_pb(data, filt=True):
b64 = False
if isinstance(data, str) or all(i > 32 for i in data):
try:
if len(data) > 11 and data[0:2] != b'UC':
data = b64_to_bytes(data)
b64 = True
else:
return data
except Exception as e:
return data
try:
result = pb(data, mutable=True)
except Exception as e:
return data
for tuple in result:
if tuple[0] == 2:
try:
tuple[2] = recursive_pb(tuple[2])
except Exception:
pass
if b64:
return ('base64', result)
return result
def indent_lines(lines, indent):
return re.sub(r'^', ' '*indent, lines, flags=re.MULTILINE)
def _pp(obj, indent): # not my best work
if isinstance(obj, tuple):
if len(obj) == 3: # (wire_type, field_number, data)
return obj.__repr__()
else: # (base64, [...])
return ('(' + obj[0].__repr__() + ',\n'
+ indent_lines(_pp(obj[1], indent), indent) + '\n'
+ ')')
elif isinstance(obj, list):
# [wire_type, field_number, data]
if (len(obj) == 3 and not any(
isinstance(x, (list, tuple)) for x in obj)):
return obj.__repr__()
# [wire_type, field_number, [...]]
elif (len(obj) == 3 and not any(
isinstance(x, (list, tuple)) for x in obj[0:2])):
return ('[' + obj[0].__repr__() + ', ' + obj[1].__repr__() + ',\n'
+ indent_lines(_pp(obj[2], indent), indent) + '\n'
+ ']')
else:
s = '[\n'
for x in obj:
s += indent_lines(_pp(x, indent), indent) + ',\n'
s += ']'
return s
else:
return obj.__repr__()
def pp(obj, indent=1):
'''Pretty prints the recursive pb structure'''
print(_pp(obj, indent))
desktop_user_agent = 'Mozilla/5.0 (Windows NT 6.1; rv:52.0) Gecko/20100101 Firefox/52.0'
desktop_headers = (
('Accept', '*/*'),
('Accept-Language', 'en-US,en;q=0.5'),
('X-YouTube-Client-Name', '1'),
('X-YouTube-Client-Version', '2.20180830'),
) + (('User-Agent', desktop_user_agent),)
mobile_user_agent = 'Mozilla/5.0 (Linux; Android 7.0; Redmi Note 4 Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Mobile Safari/537.36'
mobile_headers = (
('Accept', '*/*'),
('Accept-Language', 'en-US,en;q=0.5'),
('X-YouTube-Client-Name', '2'),
('X-YouTube-Client-Version', '2.20180830'),
) + (('User-Agent', mobile_user_agent),)
|