Source code for palaestrai.experiment.run_governor

from __future__ import annotations

import io
import logging
import traceback
import uuid
from typing import TYPE_CHECKING, Dict, Union, Optional

from palaestrai.core import MajorDomoClient, MajorDomoWorker

from .rungovernor_states.base_state import RunGovernorState
from .rungovernor_states.rgs_pristine import RGSPristine

if TYPE_CHECKING:
    import aiomultiprocess
    from palaestrai.experiment import TerminationCondition
    from palaestrai.experiment.experiment_run import ExperimentRun


LOG = logging.getLogger(__name__)


[docs] class RunGovernor: """This class implements the Run-Governor. Upon receiving requests from the executor, a RunGovernor instance handles a single experiment run by starting it, initialize the simulation controllers, the environment and the agent conductors, and, finally, shutting the experiment run down. The RunGovernor is implemented as state machine and this class provides the context for the distinct state classes. A freshly initialized RunGovernor waits in the state PRISTINE until the run method is called by the executor. See the distinct state classes for more information. Parameters ---------- broker_connection: str The URI representing the connection to the broker for communication with the executor. termination_condition: TerminationCondition The condition that tells the RunGovernor when to stop the simulation. rungov_uid: str The UUID for this RunGovernor is provided by the executor. Attributes ---------- uid: str The UUID of this RunGovernor termination_condition: :class:`.TerminationCondition` A reference to the TerminationCondition instance. run_broker: :class:`.MajorDomoBroker` The broker for the communication with the simulation controller, the agents, and the environments. experiment_run_id: str The UUID of the current experiment run. tasks: List[aiomultiprocess.Process] A list of tasks the RunGovernor has started and that it has to shutdown in the end. worker: :class:`.MajorDomoWorker` The major domo worker for handling incoming requests client: :class:`.MajorDomoClient` The major domo client for sending requests to other workers. shutdown: bool The major kill switch of the RunGovernor. Setting this to false will stop the RunGovernor after the current state. state: :class:`.RunGovernorState` Holds the current state instance. The first state is PRISTINE. errors: List[Exception] A list that is used to collect errors raised in the states. """ def __init__(self, broker_uri: str, uid: Optional[str] = None): self.uid = uid if uid else "RunGovernor-%s" % str(uuid.uuid4()) self.broker_uri = broker_uri self.major_domo_client: MajorDomoClient self.major_domo_worker: MajorDomoWorker self.termination_condition: TerminationCondition self.experiment_run: Union[ExperimentRun, None] = None self.experiment_run_id: Union[str, None] = None self.current_phase = 0 self.current_episode = 0 self.transceive_task = None self.signal_monitor_task = None self.signal_received = None self.agent_conductors: Dict[str, aiomultiprocess.Process] = dict() self.env_conductors: Dict[str, aiomultiprocess.Process] = dict() self.sim_controllers: Dict[str, aiomultiprocess.Process] = dict() # List is a "workaround" to satisfy the type checking ... self.last_request: list = list() self.next_reply: list = list() self.stopping = False self.shutdown = False self.state: RunGovernorState = RGSPristine(self) self.errors: list = list() self.dead_children: list = list() # self._simulation_controllers: dict = dict()
[docs] async def run(self): """Start the main loop of the run governor. In each iteration, the run method of the current state is called. """ while not self.shutdown: LOG.info( "RunGovernor(id=0x%x, uid=%s) now processing state %s.", id(self), self.uid, self.state.name, ) try: await self.state.run() self.state.next_state() except Exception as err: LOG.critical( "RunGovernor(id=0x%x, uid=%s) died in disgrace!", id(self), self.uid, ) tb = io.StringIO() traceback.print_exc(file=tb) LOG.critical( "RunGovernor(id=0x%x, uid=%s) has crashed in state " "%s with error %s", id(self), self.uid, self.state.name, tb.getvalue(), ) raise err LOG.info( "RunGovernor(id=0x%x, uid=%s) has finished.", id(self), self.uid )