from hashlib import sha256
import struct
from binascii import hexlify
from threading import Thread, Event, Lock
from logging import log, DEBUG, INFO, WARN, ERROR, CRITICAL
from .base import *
from ..error import WrongNumberOfBytesRead
from ..schema import *
from ..schema.typeref import TypeRef
from .write import SchemaWriter
[docs]class ReaderBase:
def __init__(self, stream):
super().__init__()
self._instream = stream
self._read_signed_integer_functions = {
1: self.read_int8,
2: self.read_int16,
4: self.read_int32,
8: self.read_int64
}
self._read_unsigned_integer_functions = {
1: self.read_uint8,
2: self.read_uint16,
4: self.read_uint32,
8: self.read_uint64
}
[docs] def read_from_stream(self, bytes_count: int):
b = self._instream.read(bytes_count)
if len(b) != bytes_count:
raise WrongNumberOfBytesRead(bytes_count, len(b), self._idx)
self._idx += len(b)
self._hsh.update(b)
return b
[docs] def read_constant(self, bytes_const: bytes):
bts = self.read_from_stream(len(bytes_const))
assert bts == bytes_const, 'Expecting {} at offset 0x{:x}, got {}'.format(bytes_const, self._idx - len(bytes_const), bts)
[docs] def read_int8(self):
return self.read_struct_format('!b')
[docs] def read_uint8(self):
return self.read_struct_format('!B')
[docs] def read_int16(self):
return self.read_struct_format('!h')
[docs] def read_uint16(self):
return self.read_struct_format('!H')
[docs] def read_int32(self):
return self.read_struct_format('!i')
[docs] def read_uint32(self):
return self.read_struct_format('!I')
[docs] def read_int64(self):
return self.read_struct_format('!q')
[docs] def read_uint64(self):
return self.read_struct_format('!Q')
[docs] def read_signed_integer(self, nbytes: int):
assert nbytes in (1, 2, 4, 8), 'Integers have to be 1, 2, 4 or 8 bytes long, got {}'.format(nbytes)
fn = self._read_signed_integer_functions[nbytes]
return fn()
[docs] def read_unsigned_integer(self, nbytes: int):
assert nbytes in (1, 2, 4, 8), 'Integers have to be 1, 2, 4 or 8 bytes long, got {}'.format(nbytes)
fn = self._read_unsigned_integer_functions[nbytes]
return fn()
[docs] def read_float32(self):
return self.read_struct_format('!f')
[docs] def read_float64(self):
return self.read_struct_format('!d')
[docs] def read_string(self):
size = self.read_uint32()
bts = self.read_from_stream(size)
return bts.decode('utf8')
[docs] def read_name(self):
name = self.read_string()
assert_name(name)
return name
[docs] def read_type_ref(self):
return TypeRef(self.read_int32())
[docs]class SchemaReader(ReaderBase):
def __init__(self, stream):
super().__init__(stream)
self._idx = 0
self._hsh = sha256()
[docs] def read_enum(self):
self.read_constant(DECLARE_ENUM)
enum_type_ref = self.read_type_ref()
enum_name = self.read_name()
enum_count = self.read_uint32()
values = []
for value_idx in range(enum_count):
values.append(self.read_name())
return enum_type_ref, Enum(enum_name, values)
[docs] def read_record(self):
self.read_constant(DECLARE_RECORD)
record_type_ref = self.read_type_ref()
record_name = self.read_name()
field_count = self.read_uint32()
fields = []
for field_idx in range(field_count):
fields.append((self.read_type_ref(), self.read_name()))
return record_type_ref, Record(record_name, fields)
[docs] def read_method(self):
method_ref = self.read_unsigned_integer(self.bytes_method_ref)
method_name = self.read_name()
arg_count = self.read_uint32()
args = []
for arg_idx in range(arg_count):
args.append((self.read_type_ref(), self.read_name()))
return_type_ref = self.read_type_ref()
return method_ref, Method(method_name, args, return_type_ref)
[docs] def read_interface(self):
self.read_constant(DECLARE_INTERFACE)
interface_type_ref = self.read_type_ref()
interface_name = self.read_name()
method_count = self.read_uint32()
methods = []
for local_method_idx in range(method_count):
method_ref, method = self.read_method()
methods.append(method)
return interface_type_ref, Interface(interface_name, methods)
[docs] def read_schema(self):
self.read_constant(MAGIC)
self.read_constant(SCHEMA)
schema_label = self.read_string()
self.bytes_method_ref = self.read_uint32()
self.bytes_object_ref = self.read_uint32()
enums_count = self.read_uint32()
records_count = self.read_uint32()
interfaces_count = self.read_uint32()
#print(schema_label, bytes_method_ref, bytes_object_ref, enums_count, records_count, interfaces_count)
types = {TypeRef(idx): typ for idx, typ in enumerate(primitive_types)}
for enum_idx in range(enums_count):
type_ref, enum = self.read_enum()
assert type_ref not in types, 'Trying to specify enum {} twice before offset 0x{:x}'.format(type_ref, self.idx)
types[type_ref] = enum
for record_idx in range(records_count):
type_ref, record = self.read_record()
assert type_ref not in types, 'Trying to specify record {} twice before offset 0x{:x}'.format(type_ref, self.idx)
types[type_ref] = record
for interface_idx in range(interfaces_count):
type_ref, interface = self.read_interface()
assert type_ref not in types, 'Trying to specify interface {} twice before offset 0x{:x}'.format(type_ref, self.idx)
types[type_ref] = interface
for type_ref, typ in list(types.items()):
if type_ref.type_ref > 0: # avoid void[]
types[TypeRef(-type_ref.type_ref)] = Array(typ)
for typ in types.values():
typ.resolve_type_references(types)
for type_ref, typ in list(types.items()):
if isinstance(typ, Primitive) or isinstance(typ, Array):
del types[type_ref]
hsh_digest = self._hsh.digest()
hsh_digest_read = self.read_from_stream(len(hsh_digest))
assert hsh_digest == hsh_digest_read, 'SHA256 sum of schema does not match: computed {}, but read {}'.format(view_hex(hsh_digest), view_hex(hsh_digest_read))
return Schema(schema_label, types.values(), self.bytes_method_ref, self.bytes_object_ref, sha256_digest=hsh_digest)
[docs]def read_schema(stream):
return SchemaReader(stream).read_schema()
[docs]def schema_from_bytes(byt: bytes):
from io import BytesIO
with BytesIO(byt) as stream:
return read_schema(stream)