from threading import Thread, Event, Lock
from logging import log, DEBUG, INFO, WARN, ERROR, CRITICAL
from binascii import hexlify
from .base import *
from ..schema import *
from ..codec.write import WriterBase, SchemaWriter, schema_to_bytes
from ..util import view_hex
[docs]class Sender(WriterBase):
def __init__(self, schema, outstream, get_id_for_object):
super().__init__(schema, outstream)
self.method_table = self.schema.method_table
self.serialized_schema = schema_to_bytes(schema)
self.get_id_for_object = get_id_for_object
self.request_id = 0
self._write_value_functions = {
int8: self.write_int8,
int16: self.write_int16,
int32: self.write_int32,
int64: self.write_int64,
uint8: self.write_uint8,
uint16: self.write_uint16,
uint32: self.write_uint32,
uint64: self.write_uint64,
float32: self.write_float32,
float64: self.write_float64,
string: self.write_string,
void: lambda *args: None
}
for interface in self.schema.interfaces:
self._write_value_functions[interface] = self.write_object_ref
for enum in self.schema.enums:
self._write_value_functions[enum] = self.write_enum_value
[docs] def write_to_stream(self, data: bytes):
log(DEBUG, 'Writing data of length {} to stream: {}'.format(len(data), hexlify(data)))
self._outstream.write(data)
self._outstream.flush()
[docs] def write_request_id(self, request_id=None):
if request_id is None:
self.request_id = (self.request_id + 1) % (1 << 32)
request_id = self.request_id
self.write_uint32(request_id)
[docs] def request_schema(self):
self.write_to_stream(REQUEST_SCHEMA)
[docs] def send_schema(self):
self.write_to_stream(SEND_SCHEMA)
self.write_to_stream(self.serialized_schema)
[docs] def write_object_ref(self, obj):
oid = self.get_id_for_object(obj)
self._write_signed_integer_functions[self.schema.bytes_object_ref](oid)
[docs] def write_enum_value(self, enum_value):
self.write_uint8(enum_value.value)
[docs] def write_record_value(self, val):
raise NotImplementedError('Writing records')
[docs] def write_value(self, typ, value):
self._write_value_functions[typ](value)
[docs] def call_method(self, method, this, args_dict):
log(INFO, 'Preparing to request method call for method {} on object {} with arguments {}'.format(method.name, this, args_dict))
method_idx = self.method_table[method]
self.write_to_stream(CALL_METHOD)
self.write_request_id()
self.write_method_ref(method_idx)
self.write_object_ref(this)
for typ, name in method.arguments:
self.write_value(typ, args_dict[name])
log(DEBUG, 'Requested method call with request ID {} on stream {}'.format(self.request_id, self._outstream))
return self.request_id
[docs] def return_method(self, request_id, return_type, return_value):
log(DEBUG, 'Returning method call result for request {} with value {} of type {}'.format(request_id, return_value, return_type))
self.write_to_stream(RETURN_FROM_METHOD)
self.write_request_id(request_id)
self.write_value(return_type, return_value)
[docs] def noop(self):
self.write_to_stream(NOOP)
[docs] def disconnect(self):
log(INFO, 'Disconnecting')
self.write_to_stream(DISCONNECT)
[docs] def acknowledge_disconnect(self):
log(INFO, 'Acknowledging disconnect')
self.write_to_stream(ACKNOWLEDGE_DISCONNECT)