importosimportsignalimportasyncioimportloggingimportlogging.configimportlogging.handlersfrompathlibimportPathfromtypingimportCallable,Any,Unionimportpalaestrai.loggingfrompalaestrai.coreimportRuntimeConfigLOG=logging.getLogger(__name__)def_install_sighandlers():signal.signal(signal.SIGINT,signal.SIG_IGN)signal.signal(signal.SIGTERM,signal.SIG_DFL)signal.signal(signal.SIGCHLD,signal.SIG_DFL)def_set_proctitle(process_name:str):try:importsetproctitlesetproctitle.setproctitle(f"palaestrAI[{process_name}]")exceptImportError:passdef_restore_runtime_configuration(runtime_configuration_dict:dict):RuntimeConfig().reset()RuntimeConfig().load(runtime_configuration_dict)def_get_parent_logger_name(name,logger_dict=None):"""Get the parent logger name based on the hierarchy."""iflogger_dictisNone:logger_dict=logging.Logger.manager.loggerDictifnotname:returnNoneparts=name.split(".")foriinrange(len(parts)-1,0,-1):parent=".".join(parts[:i])ifparentinlogger_dict:returnparentreturnNone# Helper to apply filters to all loggersdef_inherit_filters():logger_dict=logging.Logger.manager.loggerDictforlogger_name,loggerinlogger_dict.items():ifisinstance(logger,logging.Logger):parent=_get_parent_logger_name(logger.name,RuntimeConfig().logging["loggers"])ifparentisnotNoneandparentinlogger_dict:filters=logger_dict[parent].filtersforfinfilters:logger.addFilter(f)def_reinitialize_logging():try:logging.config.dictConfig(RuntimeConfig().logging)_inherit_filters()logging.addLevelName(palaestrai.logging.ASYNCIO_LOG_LEVEL,"ASYNCIO")logging.root.handlers.clear()logging.root.addHandler(logging.handlers.SocketHandler("127.0.0.1",RuntimeConfig().logger_port))logging.debug("Reinitialized logging from RuntimeConfig(%s)",RuntimeConfig())except(KeyError,ValueError)ase:logging.basicConfig(level=logging.INFO)logging.warning("Could not load logging config (%s), continuing with defaults",e,)
[docs]asyncdefspawn_wrapper(name:str,runtime_config:dict,callee:Callable,args:Union[list,None]=None,kwargs:Union[dict,None]=None,)->Any:"""Wraps a target for fork/spawn and takes care of initialization. Whenever a new subprocess is created (regardless of whether spawn, fork, or forkserver is used), some caretaking needs to be done: * The runtime configuration needs to be transferred, and the ::`RuntimeConfig` properly reinitialized * Logging is reinitialized/rewired to send messages to the parent process * A proctitle is set Parameters ---------- * name : str Name of the process; will lead to a proctitle in the form of ``palaestrai[%s]`` * runtime_config : dict Runtime configuration dict, normally obtained from ::`RuntimeConfig.to_dict` * callee : Callable The target method * args : list, optional Positional arguments of ::`callee`. * kwargs : dict, optional Keyword arguments of ::`callee` Returns ------- Any Whatever the target function returns. """_install_sighandlers()ifname:_set_proctitle(name)ifnotargs:# [] as default arg is mutable, workaround with None:args=[]ifnotkwargs:# {} as default arg is mutable, workaround with None:kwargs={}_restore_runtime_configuration(runtime_config)_reinitialize_logging()# We assument that we're using the aiomultiprocess. So we do not need to# initalize a new asyncio event loop here. Should we ever change the# underlying libraries (i.e., not use aiomultiprocess any longer), this# method most become sync instead of async and then we must also re-init# the event loop properly.ifRuntimeConfig().profile:importyappi# type: ignore[import-not-found,import-untyped]# Start profiling before launching the loopyappi.set_clock_type("cpu")# or "wall" if you want wall timeyappi.start(profile_threads=True)ret=Nonetry:ifasyncio.iscoroutinefunction(callee):ret=awaitcallee(*args,**kwargs)else:ret=callee(*args,**kwargs)returnretexceptExceptionase:LOG.critical("Running %s failed: %s",str(callee),e,exc_info=e)raiseefinally:ifRuntimeConfig().profile:yappi.stop()# Print stats to consolestats=yappi.get_func_stats()stats.sort("ttot")# sort by total time# stats.print_all()# Save to a file for later inspectionstats.save(Path(os.curdir)/f"{name}.yappi",type="pstat")# can be opened by snakeviz, gprof2dot