from threading import Thread, Event, Lock
from logging import log, DEBUG, INFO, WARN, ERROR, CRITICAL
from binascii import hexlify
from .base import *
from ..schema import *
from ..codec.read import ReaderBase
from ..codec.write import SchemaWriter, schema_to_bytes
from ..util import view_hex
from ..error import WrongNumberOfBytesRead, UnknownCommand, MethodNotAvailable, DuplicateRegistrationForMethodReturn, DuplicateMethodReturnValue, MissingMethodReturnValueEvent
[docs]class Receiver(ReaderBase):
def __init__(self, schema, instream, get_object, return_method_result, acknowledge_disconnect, name_converter):
super().__init__(instream)
self.schema = schema
self.method_lookup = self.schema.method_lookup
self.method_to_interface = self.schema.method_to_interface
self.serialized_schema = schema_to_bytes(schema)
self.get_object = get_object
self.method_return_events = {}
self.method_return_values = {}
self.return_method_result = return_method_result
self.acknowledge_disconnect = acknowledge_disconnect
self.name_converter = name_converter
self._read_value_functions = {
int8: self.read_int8,
int16: self.read_int16,
int32: self.read_int32,
int64: self.read_int64,
uint8: self.read_uint8,
uint16: self.read_uint16,
uint32: self.read_uint32,
uint64: self.read_uint64,
float32: self.read_float32,
float64: self.read_float64,
string: self.read_string,
void: lambda *args: None
}
[docs] def read_from_stream(self, bytes_count: int):
b = self._instream.read(bytes_count)
log(DEBUG, 'Read data of length {} from stream: {}'.format(len(b), hexlify(b)))
if len(b) != bytes_count:
ex = WrongNumberOfBytesRead(bytes_count, len(b), None)
log(ERROR, str(ex))
raise ex
return b
[docs] def read_request_id(self):
return self.read_uint32()
[docs] def read_method_ref(self):
return self.read_unsigned_integer(self.schema.bytes_method_ref)
[docs] def read_object_ref(self, typ: Type):
oid = self._read_signed_integer_functions[self.schema.bytes_object_ref]()
log(DEBUG, 'Read object ID {}'.format(oid))
return oid
[docs] def read_object(self, typ: Type):
oid = self.read_object_ref(typ)
obj = self.get_object(oid, typ)
log(DEBUG, 'Found object {}'.format(obj))
return obj
[docs] def read_enum_value(self, typ: Type):
enum_value = self.read_uint8()
return self.get_enum_implementation(typ)(enum_value) # todo: better api
[docs] def read_value(self, typ: Type):
if isinstance(typ, Interface):
return self.read_object(typ)
elif isinstance(typ, Enum):
return self.read_enum_value(typ)
else:
return self._read_value_functions[typ]()
[docs] def mainloop(self):
self.exit_mainloop = False
while not self.exit_mainloop:
self.process_next()
[docs] def process_next(self):
log(DEBUG, 'Processing next command on stream {}'.format(self._instream))
cmd = self.read_from_stream(1)
if cmd == NOOP:
log(DEBUG, 'Received NOOP command, doing nothing')
elif cmd == DISCONNECT:
log(DEBUG, 'Received DISCONNECT command, exiting mainloop')
self.exit_mainloop = True
self.acknowledge_disconnect()
elif cmd == ACKNOWLEDGE_DISCONNECT:
log(DEBUG, 'Received ACKNOWLEDGE_DISCONNECT command, exiting mainloop')
self.exit_mainloop = True
elif cmd == REQUEST_SCHEMA:
self.send_schema()
elif cmd == SEND_SCHEMA:
self.receive_and_check_schema()
elif cmd == CALL_METHOD:
self.process_method_call()
elif cmd == RETURN_FROM_METHOD:
self.process_method_return()
else:
raise UnknownCommand(cmd)
[docs] def process_method_call(self):
request_id = self.read_request_id()
method_ref = self.read_method_ref()
log(INFO, 'Received method call with request ID {} and method reference {}'.format(request_id, method_ref))
assert method_ref in self.method_lookup, 'Received method call with request ID {} and unknown method reference {}'.format(request_id, method_ref)
method = self.method_lookup[method_ref]
log(DEBUG, 'Found method {}'.format(method))
this = self.read_object(self.method_to_interface[method_ref])
impl_method_name = self.name_converter.method_name(method.name)
try:
method_impl = getattr(this, impl_method_name)
except AttributeError:
raise MethodNotAvailable(method, impl_method_name, this)
args = {}
for typ, name in method.arguments:
args[name] = self.read_value(typ)
def method_call_thread():
log(DEBUG, 'Calling method implementation {} with arguments {}'.format(method_impl, args))
return_value = method_impl(**args)
log(DEBUG, 'Return value of method implementation call is {}'.format(return_value))
self.return_method_result(request_id, method.return_type, return_value)
Thread(target=method_call_thread).start()
[docs] def process_method_return(self):
request_id = self.read_request_id()
log(DEBUG, 'Received return from method call with request ID {}'.format(request_id))
if request_id in self.method_return_values:
raise DuplicateMethodReturnValue(request_id)
if request_id in self.method_return_events:
event, return_type = self.method_return_events.pop(request_id)
return_value = self.read_value(return_type)
log(DEBUG, 'Return value for method call with request ID {} is {} of type {}'.format(request_id, return_value, return_type))
self.method_return_values[request_id] = return_value
event.set()
else:
raise MissingMethodReturnValueEvent(request_id)
# ToDo: Wait and retry?
[docs] def receive_and_check_schema(self):
received_schema = SchemaReader(self._instream).read_schema()
assert SchemaWriter(received_schema).to_bytes() == self.serialized_schema
[docs] def wait_for_method_return(self, request_id, return_type):
log(DEBUG, 'Waiting for method return corresponding to request {} with return type {} on stream {}'.format(request_id, return_type, self._instream))
if request_id in self.method_return_events or request_id in self.method_return_values:
raise DuplicateRegistrationForMethodReturn(request_id)
wait_for_method_return_event = Event()
self.method_return_events[request_id] = (wait_for_method_return_event, return_type)
log(DEBUG, 'Waiting event registered for request {}'.format(request_id))
wait_for_method_return_event.wait()
return self.method_return_values.pop(request_id)