from hashlib import sha256
import struct
import io
from binascii import hexlify
from logging import log, DEBUG, INFO, WARN, ERROR, CRITICAL
from .base import *
from ..util import view_hex
from ..schema import *
[docs]class WriterBase:
def __init__(self, schema, outstream):
super().__init__()
self.schema = schema
self._outstream = outstream
self.type_table = schema.type_table
self._write_signed_integer_functions = {
1: self.write_int8,
2: self.write_int16,
4: self.write_int32,
8: self.write_int64
}
self._write_unsigned_integer_functions = {
1: self.write_uint8,
2: self.write_uint16,
4: self.write_uint32,
8: self.write_uint64
}
[docs] def write_to_stream(self, data: bytes):
raise NotImplementedError()
[docs] def write_int8(self, i: int):
self.write_to_stream(struct.pack('!b', i))
[docs] def write_uint8(self, i: int):
self.write_to_stream(struct.pack('!B', i))
[docs] def write_int16(self, i: int):
self.write_to_stream(struct.pack('!h', i))
[docs] def write_uint16(self, i: int):
self.write_to_stream(struct.pack('!H', i))
[docs] def write_int32(self, i: int):
self.write_to_stream(struct.pack('!i', i))
[docs] def write_uint32(self, i: int):
self.write_to_stream(struct.pack('!I', i))
[docs] def write_int64(self, i: int):
self.write_to_stream(struct.pack('!q', i))
[docs] def write_uint64(self, i: int):
self.write_to_stream(struct.pack('!Q', i))
[docs] def write_signed_integer(self, i: int, nbytes: int):
assert nbytes in (1, 2, 4, 8), 'Integers have to be 1, 2, 4 or 8 bytes long, got {}'.format(nbytes)
fn = self._write_signed_integer_functions[nbytes]
return fn(i)
[docs] def write_unsigned_integer(self, i: int, nbytes: int):
assert nbytes in (1, 2, 4, 8), 'Integers have to be 1, 2, 4 or 8 bytes long, got {}'.format(nbytes)
fn = self._write_unsigned_integer_functions[nbytes]
return fn(i)
[docs] def write_float32(self, f: float):
self.write_to_stream(struct.pack('!f', f))
[docs] def write_float64(self, f: float):
self.write_to_stream(struct.pack('!d', f))
[docs] def write_bytes(self, b):
self.write_uint32(len(b))
self.write_to_stream(b)
[docs] def write_string(self, s):
b = s.encode('utf8')
self.write_bytes(b)
[docs] def write_type_ref(self, typ: Type):
assert isinstance(typ, Type), '{} is not an instance of Type'.format(typ)
type_ref = self.type_table.get(typ)
if type_ref is None:
raise Exception('Trying to write type reference to unknown type {}'.format(typ))
self.write_int32(type_ref)
[docs] def write_method_ref(self, method_idx):
assert self.schema.bytes_method_ref in (1, 2, 4, 8), 'Method references have to be 1, 2, 4 or 8 bytes long, got {}'.format(bytes_method_ref)
assert method_idx < 2 << (self.schema.bytes_method_ref * 8 - 1), 'Trying to store method reference {} using only {} bytes'.format(method_idx, bytes_method_ref)
self.write_unsigned_integer(method_idx, self.schema.bytes_method_ref)
[docs]class SchemaWriter(WriterBase):
def __init__(self, schema, stream):
super().__init__(schema, stream)
self._idx = 0
self._hsh = sha256()
self._method_idx = 0
[docs] def write_to_stream(self, data: bytes):
self._outstream.write(data)
self._idx += len(data)
self._hsh.update(data)
[docs] def write_name(self, s):
assert_name(s)
b = s.encode('ascii')
self.write_bytes(b)
[docs] def write_enum(self, enum):
self.write_to_stream(DECLARE_ENUM)
self.write_type_ref(enum)
self.write_name(enum.name)
self.write_uint32(len(enum.values))
for value in enum.values:
self.write_name(value)
[docs] def write_record(self, record):
self.write_to_stream(DECLARE_RECORD)
self.write_type_ref(record)
self.write_name(record.name)
self.write_uint32(len(record.fields))
for field_type, field_name in record.fields:
self.write_type_ref(field_type)
self.write_name(field_name)
[docs] def write_method(self, method):
self.write_method_ref(self._method_idx)
self.write_name(method.name)
self.write_uint32(len(method.arguments))
for argtype, argname in method.arguments:
self.write_type_ref(argtype)
self.write_name(argname)
self.write_type_ref(method.return_type)
self._method_idx += 1
[docs] def write_interface(self, interface):
self.write_to_stream(DECLARE_INTERFACE)
self.write_type_ref(interface)
self.write_name(interface.name)
self.write_uint32(len(interface.methods))
for method in interface.methods:
self.write_method(method)
[docs] def write_schema(self):
self.write_to_stream(MAGIC)
self.write_to_stream(SCHEMA)
self.write_string(self.schema.label)
self.write_uint32(self.schema.bytes_method_ref)
self.write_uint32(self.schema.bytes_object_ref)
self.write_uint32(len(self.schema.enums))
self.write_uint32(len(self.schema.records))
self.write_uint32(len(self.schema.interfaces))
for typ in self.schema.declared_types:
if isinstance(typ, Enum):
self.write_enum(typ)
elif isinstance(typ, Record):
self.write_record(typ)
elif isinstance(typ, Interface):
self.write_interface(typ)
else:
raise Exception('Unknown type {}'.format(typ))
hsh_digest = self._hsh.digest()
if self.schema.sha256_digest:
assert hsh_digest == self.schema.sha256_digest, 'SHA256 sum of schema does not match: computed {} while writing, but got {}'.format(view_hex(hsh_digest), view_hex(self.schema.sha256_digest))
self.write_to_stream(hsh_digest)
idx = self._idx
return hsh_digest, idx
[docs]def write_schema(schema, stream):
SchemaWriter(schema, stream).write_schema()
[docs]def schema_to_bytes(schema):
with io.BytesIO() as stream:
write_schema(schema, stream)
stream.seek(0)
return stream.read()