aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/proto.py
diff options
context:
space:
mode:
Diffstat (limited to 'youtube/proto.py')
-rw-r--r--youtube/proto.py68
1 files changed, 66 insertions, 2 deletions
diff --git a/youtube/proto.py b/youtube/proto.py
index 6230e51..004375a 100644
--- a/youtube/proto.py
+++ b/youtube/proto.py
@@ -1,5 +1,6 @@
from math import ceil
import base64
+import io
def byte(n):
return bytes((n,))
@@ -61,5 +62,68 @@ def as_bytes(value):
if isinstance(value, str):
return value.encode('ascii')
return value
-
- \ No newline at end of file
+
+
+def read_varint(data):
+ result = 0
+ i = 0
+ while True:
+ try:
+ byte = data.read(1)[0]
+ except IndexError:
+ if i == 0:
+ raise EOFError()
+ raise Exception('Unterminated varint starting at ' + str(data.tell() - i))
+ result |= (byte & 127) << 7*i
+ if not byte & 128:
+ break
+
+ i += 1
+ return result
+
+
+def read_group(data, end_sequence):
+ start = data.tell()
+ index = data.original.find(end_sequence, start)
+ if index == -1:
+ raise Exception('Unterminated group')
+ data.seek(index + len(end_sequence))
+ return data.original[start:index]
+
+def read_protobuf(data):
+ data_original = data
+ data = io.BytesIO(data)
+ data.original = data_original
+ while True:
+ try:
+ tag = read_varint(data)
+ except EOFError:
+ break
+ wire_type = tag & 7
+ field_number = tag >> 3
+
+ if wire_type == 0:
+ value = read_varint(data)
+ elif wire_type == 1:
+ value = data.read(8)
+ elif wire_type == 2:
+ length = read_varint(data)
+ value = data.read(length)
+ elif wire_type == 3:
+ end_bytes = encode_varint((field_number << 3) | 4)
+ value = read_group(data, end_bytes)
+ elif wire_type == 5:
+ value = data.read(4)
+ else:
+ raise Exception("Unknown wire type: " + str(wire_type) + ", Tag: " + bytes_to_hex(succinct_encode(tag)) + ", at position " + str(data.tell()))
+ yield (wire_type, field_number, value)
+
+def parse(data):
+ return {field_number: value for _, field_number, value in read_protobuf(data)}
+
+def b64_to_bytes(data):
+ if isinstance(data, bytes):
+ data = data.decode('ascii')
+ data = data.replace("%3D", "=")
+ return base64.urlsafe_b64decode(data + "="*((4 - len(data)%4)%4) )
+