Source code for palaestrai.util.spawn

import os
import signal
import asyncio
import logging
import logging.config
import logging.handlers
from pathlib import Path
from typing import Callable, Any, Union

from palaestrai.core import RuntimeConfig

LOG = logging.getLogger(__name__)


def _install_sighandlers():
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    signal.signal(signal.SIGCHLD, signal.SIG_DFL)


def _set_proctitle(process_name: str):
    try:
        import setproctitle

        setproctitle.setproctitle(f"palaestrAI[{process_name}]")
    except ImportError:
        pass


def _restore_runtime_configuration(runtime_configuration_dict: dict):
    RuntimeConfig().reset()
    RuntimeConfig().load(runtime_configuration_dict)


def _reinitialize_logging():
    try:
        logging.config.dictConfig(RuntimeConfig().logging)
        logging.root.handlers.clear()
        logging.root.addHandler(
            logging.handlers.SocketHandler(
                "127.0.0.1", RuntimeConfig().logger_port
            )
        )
        logging.debug(
            "Reinitialized logging from RuntimeConfig(%s)", RuntimeConfig()
        )
    except (KeyError, ValueError) as e:
        logging.basicConfig(level=logging.INFO)
        logging.warning(
            "Could not load logging config (%s), continuing with defaults",
            e,
        )


[docs] async def spawn_wrapper( name: str, runtime_config: dict, callee: Callable, args: Union[list, None] = None, kwargs: Union[dict, None] = None, ) -> Any: """Wraps a target for fork/spawn and takes care of initialization. Whenever a new subprocess is created (regardless of whether spawn, fork, or forkserver is used), some caretaking needs to be done: * The runtime configuration needs to be transferred, and the ::`RuntimeConfig` properly reinitialized * Logging is reinitialized/rewired to send messages to the parent process * A proctitle is set Parameters ---------- * name : str Name of the process; will lead to a proctitle in the form of ``palaestrai[%s]`` * runtime_config : dict Runtime configuration dict, normally obtained from ::`RuntimeConfig.to_dict` * callee : Callable The target method * args : list, optional Positional arguments of ::`callee`. * kwargs : dict, optional Keyword arguments of ::`callee` Returns ------- Any Whatever the target function returns. """ _install_sighandlers() if name: _set_proctitle(name) if not args: # [] as default arg is mutable, workaround with None: args = [] if not kwargs: # {} as default arg is mutable, workaround with None: kwargs = {} _restore_runtime_configuration(runtime_config) _reinitialize_logging() profiler = None if RuntimeConfig().profile: import cProfile profiler = cProfile.Profile() profiler.enable() try: if asyncio.iscoroutinefunction(callee): ret = await callee(*args, **kwargs) else: ret = callee(*args, **kwargs) except Exception as e: LOG.critical("Running %s failed: %s", str(callee), e, exc_info=e) raise e if profiler: profiler.disable() profiler.dump_stats(Path(os.curdir) / f"{name}.prof") return ret