Source code for palaestrai.util.logserver

"""
A simple, built-in logging server
"""

from __future__ import annotations

import asyncio
import logging
import pickle
import struct
from typing import List, Tuple

LOG = logging.getLogger(__name__)


[docs] class LogServer: """A simple, internal logging server that reinjects remote log messages Each submodule of palaestrAI that gets spawned lives in a separate process. As the ::`~spawn_wrapper` takes care of reinitializing the logger, it replaces all defined log handlers with a :py:class:`logging.SocketHandler`. This log server is ran by the ::`Executor` and receives all those messages. They are re-injected in the main process' logging system and treated according to the original logging configuration. """ def __init__(self, listen_host: str, listen_port: int): """Constructs a new log server for a given address and port Parameters ---------- listen_host : str The address the log server should bind to listen_port : int The port the log server should bind to """ self._listen_host = listen_host self._listen_port = listen_port self._server = None self._clients: List[Tuple] = [] def _add_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ): LOG.debug( "LogServer(id=0x%x) registered a new client (%s, %s).", id(self), reader, writer, ) self._clients.append( (reader, asyncio.create_task(LogServer._read_from_client(reader))) ) @staticmethod async def _read_from_client(reader: asyncio.StreamReader): while True: chunk = await reader.read(4) msglen = struct.unpack(">L", chunk)[0] chunk = await reader.read(msglen) while len(chunk) < msglen: chunk = chunk + await reader.read(msglen - len(chunk)) try: logobj = pickle.loads(chunk) if not logobj: continue except: continue record = logging.makeLogRecord(logobj) LOG.debug("LogServer received new record: %s", record) logging.getLogger(record.name).handle(record)
[docs] async def start(self): self._server = await asyncio.start_server( self._add_client, host=self._listen_host, port=self._listen_port, ) await self._server.start_serving()
[docs] async def stop(self): self._server.close() await self._server.wait_closed() for _, task in self._clients: task.cancel()