importasyncioimportloggingimportlogging.configimportlogging.handlersimportosfrompathlibimportPathfromtypingimportCallable,Any,Unionfrompalaestrai.coreimportRuntimeConfigLOG=logging.getLogger(__name__)def_set_proctitle(process_name:str):try:importsetproctitlesetproctitle.setproctitle(f"palaestrAI[{process_name}]")exceptImportError:passdef_restore_runtime_configuration(runtime_configuration_dict:dict):RuntimeConfig().reset()RuntimeConfig().load(runtime_configuration_dict)def_reinitialize_logging():try:logging.config.dictConfig(RuntimeConfig().logging)logging.root.handlers.clear()logging.root.addHandler(logging.handlers.SocketHandler("127.0.0.1",RuntimeConfig().logger_port))logging.debug("Reinitialized logging from RuntimeConfig(%s)",RuntimeConfig())except(KeyError,ValueError)ase:logging.basicConfig(level=logging.INFO)logging.warning("Could not load logging config (%s), continuing with defaults",e,)
[docs]asyncdefspawn_wrapper(name:str,runtime_config:dict,callee:Callable,args:Union[list,None]=None,kwargs:Union[dict,None]=None,)->Any:"""Wraps a target for fork/spawn and takes care of initialization. Whenever a new subprocess is created (regardless of whether spawn, fork, or forkserver is used), some caretaking needs to be done: * The runtime configuration needs to be transferred, and the ::`RuntimeConfig` properly reinitialized * Logging is reinitialized/rewired to send messages to the parent process * A proctitle is set Parameters ---------- * name : str Name of the process; will lead to a proctitle in the form of ``palaestrai[%s]`` * runtime_config : dict Runtime configuration dict, normally obtained from ::`RuntimeConfig.to_dict` * callee : Callable The target method * args : list, optional Positional arguments of ::`callee`. * kwargs : dict, optional Keyword arguments of ::`callee` Returns ------- Any Whatever the target function returns. """ifname:_set_proctitle(name)ifnotargs:# [] as default arg is mutable, workaround with None:args=[]ifnotkwargs:# {} as default arg is mutable, workaround with None:kwargs={}_restore_runtime_configuration(runtime_config)_reinitialize_logging()profiler=NoneifRuntimeConfig().profile:importcProfileprofiler=cProfile.Profile()profiler.enable()try:ifasyncio.iscoroutinefunction(callee):ret=awaitcallee(*args,**kwargs)else:ret=callee(*args,**kwargs)exceptExceptionase:LOG.critical("Running %s failed: %s",str(callee),e)raiseeifprofiler:profiler.disable()profiler.dump_stats(Path(os.curdir)/f"{name}.prof")returnret