Source code for palaestrai.util.spawn

import asyncio
import logging
import logging.config
import logging.handlers
import os
from pathlib import Path
from typing import Callable, Any, Union

from palaestrai.core import RuntimeConfig

LOG = logging.getLogger(__name__)


def _set_proctitle(process_name: str):
    try:
        import setproctitle

        setproctitle.setproctitle(f"palaestrAI[{process_name}")
    except ImportError:
        pass


def _restore_runtime_configuration(runtime_configuration_dict: dict):
    RuntimeConfig().reset()
    RuntimeConfig().load(runtime_configuration_dict)


def _reinitialize_logging():
    try:
        logging.config.dictConfig(RuntimeConfig().logging)
        logging.root.handlers.clear()
        logging.root.addHandler(
            logging.handlers.SocketHandler(
                "127.0.0.1", RuntimeConfig().logger_port
            )
        )
        logging.debug(
            "Reinitialized logging from RuntimeConfig(%s)", RuntimeConfig()
        )
    except (KeyError, ValueError) as e:
        logging.basicConfig(level=logging.INFO)
        logging.warning(
            "Could not load logging config (%s), continuing with defaults",
            e,
        )


[docs] async def spawn_wrapper( name: str, runtime_config: dict, callee: Callable, args: Union[list, None] = None, kwargs: Union[dict, None] = None, ) -> Any: """Wraps a target for fork/spawn and takes care of initialization. Whenever a new subprocess is created (regardless of whether spawn, fork, or forkserver is used), some caretaking needs to be done: * The runtime configuration needs to be transferred, and the ::`RuntimeConfig` properly reinitialized * Logging is reinitialized/rewired to send messages to the parent process * A proctitle is set Parameters ---------- * name : str Name of the process; will lead to a proctitle in the form of ``palaestrai[%s]`` * runtime_config : dict Runtime configuration dict, normally obtained from ::`RuntimeConfig.to_dict` * callee : Callable The target method * args : list, optional Positional arguments of ::`callee`. * kwargs : dict, optional Keyword arguments of ::`callee` Returns ------- Any Whatever the target function returns. """ if name: _set_proctitle(name) if not args: # [] as default arg is mutable, workaround with None: args = [] if not kwargs: # {} as default arg is mutable, workaround with None: kwargs = {} _restore_runtime_configuration(runtime_config) _reinitialize_logging() profiler = None if RuntimeConfig().profile: import cProfile profiler = cProfile.Profile() profiler.enable() try: if asyncio.iscoroutinefunction(callee): ret = await callee(*args, **kwargs) else: ret = callee(*args, **kwargs) except Exception as e: LOG.critical("Running %s failed: %s", str(callee), e) raise e if profiler: profiler.disable() profiler.dump_stats(Path(os.curdir) / f"{name}.prof") return ret