[docs]classLogServer:"""A simple, internal logging server that reinjects remote log messages Each submodule of palaestrAI that gets spawned lives in a separate process. As the ::`~spawn_wrapper` takes care of reinitializing the logger, it replaces all defined log handlers with a :py:class:`logging.SocketHandler`. This log server is ran by the ::`Executor` and receives all those messages. They are re-injected in the main process' logging system and treated according to the original logging configuration. """def__init__(self,listen_host:str,listen_port:int):"""Constructs a new log server for a given address and port Parameters ---------- listen_host : str The address the log server should bind to listen_port : int The port the log server should bind to """self._listen_host=listen_hostself._listen_port=listen_portself._server=Noneself._clients:List[Tuple]=[]self._obj_queue:Queue=Queue()self._running=Trueself._spiller_thread:Optional[Thread]=Nonedef_add_client(self,reader:asyncio.StreamReader,writer:asyncio.StreamWriter):LOG.debug("LogServer(id=0x%x) registered a new client (%s, %s).",id(self),reader,writer,)self._clients.append((reader,writer,asyncio.create_task(self._read_from_client(reader)),))def_spill_messages(self):whileself._runningornotself._obj_queue.empty():try:logobj=self._obj_queue.get(block=True,timeout=0.1)exceptEmpty:continuerecord=logging.makeLogRecord(logobj)LOG.debug("LogServer received new record: %s",record)logging.getLogger(record.name).handle(record)asyncdef_read_from_client(self,reader:asyncio.StreamReader):whileTrue:chunk=awaitreader.read(4)msglen=struct.unpack(">L",chunk)[0]chunk=awaitreader.read(msglen)whilelen(chunk)<msglen:chunk=chunk+awaitreader.read(msglen-len(chunk))try:logobj=pickle.loads(chunk)ifnotlogobj:continueself._obj_queue.put(logobj)except:continue