palaestrai.experiment package

Submodules

palaestrai.experiment.executor module

exception palaestrai.experiment.executor.DeadMajorDomoBrokerError[source]

Bases: RuntimeError

class palaestrai.experiment.executor.Executor[source]

Bases: object

The executor is the entrypoint for every run execution.

The role of the executor is to receive new experiment runs and distribute them to existing RunGovernor instances. If palaestrAI is used in local run mode, the executor will initialize a RunGovernor.

Furthermore, the Executor can stop running experiment runs.

Notes

At some point, when the core protocol has progressed far enough, we will be able to run several experiments at once from an executor. But, until we’re sure we can, the public API accepts only one experiment.

async cancel(experiment_run_id)[source]

Shuts an experiment run down prematurely.

This method sends a ExperimentShutdownRequest to the RunGovernor responsible for executing the associated experiment run. This allows for a graceful, yet premature shutdown of a running experiment run. Normally, experiment runs terminate when their termination condition is met, so this method provides a way for an external user to interfere with a running experiment.

Parameters:

experiment_run_id (str) – UID of the experiment run to shut down.

async execute()[source]

Executes an experiment run.

This method starts the whole experiment run execution process. It initializes the RunGovernor for the experiment run and sets up communication. This method returns only if:

  1. The experiment run has terminated successfully;

  2. an error has occurred, in which case an exception is raised;

  3. the user has terminated the process (e.g., by hitting ^C).

Returns:

The state the executor is now in, either “SHUTDOWN” if everything exited normally, or one of the SIG* states if a signal was received.

Return type:

ExecutorState

experiment_runs() List[ExperimentRunRuntimeInformation][source]
schedule(experiment_run: palaestrai.experiment.ExperimentRun | Sequence[palaestrai.experiment.ExperimentRun])[source]

Schedules an experiment run to be executed.

This method schedules experiment runs, i.e., puts them in the waiting queue. The main loop (started by ::execute) picks up experiment run objects and executes them.

Parameters:
  • experiment_run (Union[palaestrai.experiment.ExperimentRun,)

  • Sequence[palaestrai.experiment.ExperimentRun]] – One or many palaestrai.experiment.ExperimentRun objects, which are added to the queue.

shutdown()[source]

Performs an oderly shutdown

class palaestrai.experiment.executor.ExecutorState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

EXITED = 4
INITIALIZED = 1
PRISTINE = 0
RUNNING = 2
SHUTDOWN = 3
SIGABRT = 6
SIGINT = 5
SIGTERM = 7
class palaestrai.experiment.executor.ExperimentRunRuntimeInformation(experiment_run: palaestrai.experiment.ExperimentRun, started_at: datetime | None = None, run_governor_id: str | None = None, experiment_run_id: str | None = None)[source]

Bases: object

Accumulated information about the one experiment run

This structure contains information about one experiment run. It stores on which RunGovernor it is executed, when it was started, which run is being executed and what ID the experiment run has.

experiment_run: palaestrai.experiment.ExperimentRun
experiment_run_id: str | None = None
property is_running
run_governor_id: str | None = None
started_at: datetime | None = None
exception palaestrai.experiment.executor.ExperimentRunStartError(experiment_run_id, run_governor_id, message)[source]

Bases: RuntimeError

exception palaestrai.experiment.executor.InterruptSignal[source]

Bases: RuntimeError

palaestrai.experiment.experiment_run module

This module contains the class ExperimentRun that defines an experiment run and contains all the information needed to execute it.

class palaestrai.experiment.experiment_run.ExperimentRun(uid: str | None, seed: int | None, version: str | None, schedule: List[Dict], run_config: dict, experiment_uid: str | None = None)[source]

Bases: object

Defines an experiment run and stores information.

The experiment run class defines a run in palaestrAI. It contains all information needed to execute the run. With the setup function the experiment run can be build.

SCHEMA_FILE = 'run_schema.yaml'
agent_conductors(phase=0)[source]
property canonical_config
static check_syntax(path_or_stream: str | IO[str] | PathLike) SyntaxValidationResult[source]

Checks if the provided experiment configuration conforms with our syntax.

Parameters:

path_or_stream

  1. Path - Same as above

  2. Any text stream

Returns:

  • SyntaxValidationResult

  • Custom object that contains the following information

    1. SyntaxValidationResult.is_valid: Whether the provided experiment

      is valid or not (::bool).

    2. SyntaxValidationResult.error_message: Contains ::None if the

      experiment is valid or the corresponding error message if it is invalid.

static constr_randomstate(constructor, node)[source]
create_subseed() int[source]

uses the seeded random number generator to create reproducible sub-seeds

environment_conductors(phase=0) Dict[str, EnvironmentConductor][source]
static from_dict(state: Dict) ExperimentRun[source]
get_episodes(phase: int)[source]
get_phase_name(phase: int)[source]
has_next_phase(current_phase)[source]

Return if this run has a subsequent phase.

Parameters:

current_phase (int) – Index of the phase that is being executed.

Returns:

True if at least one phase is taking place after the current phase.

Return type:

bool

property hash: str
property instance_uid

The unique ID of this particular experiment run instance

As an ::ExperimentRun object is transferred via network, stored in the DB, etc., it still remains the same instance, but it becomes different objects in memory. This UID identifies it even if it travels over the network.

Returns:

The instances unique ID

Return type:

str

static load(str_path_stream_or_dict: str | Path | Dict | IO[str])[source]

Load an ::ExerimentRun object from a serialized representation.

This method serves as deserializing constructor. It takes a path to a file, a dictionary representation, or a stream and creates a new ::ExperimentRun object from it.

This method also validates the string/stream representation.

Parameters:

str_path_stream_or_dict (Union[str, Path, Dict, IO[str]]) –

  • If str, it is interpreted as a file path, and the file is resolved and loaded;

  • if Path, the same happens as above;

  • if Dict, the ::ExperimentRun object is initialized directly from the values of the Dict;

  • if TextIO, the method assumes that it is a serialzed representation of the ::ExperimentRun object (e.g., from an open file stream) and interprets it as YAML (with a prior syntax/schema check).

Returns:

An initialized, de-serialized ::ExperimentRun object

Return type:

ExperimentRun

property num_phases

The number of phases in this experiment run’s schedule.

phase_configuration(phase: int)[source]
static repr_randomstate(representer, data)[source]

Custom serializer and deserializer so we can dump our subseed Data = rng

save(experiment_uid: str | None = None, session: Session | None = None)[source]

Save an ::ExerimentRun object to the store.

This method saves an experiment run and adds it to the database. Connection credentials are taken from the runtime config. If an experiment_uid is supplied, then the experiment run is also associated with it in the database. A session instance can also be supplied in order ot reuse an open database connection. Otherwise, a new connection will be opened.

Parameters:
  • experiment_uid (Optional[str]) – The unique ID of this particular experiment run instance

  • session (Optional[Session]) – Creates a new, connected database session to run queries on.

setup(broker_uri)[source]

Set up an experiment run.

Creates and configures relevant actors.

simulation_controllers(phase=0)[source]
exception palaestrai.experiment.experiment_run.RunDefinitionError(run: ExperimentRun, message)[source]

Bases: RuntimeError

palaestrai.experiment.experiment_run.update_dict(src, upd)[source]

Recursive update of dictionaries.

See stackoverflow:

https://stackoverflow.com/questions/3232943/ update-value-of-a-nested-dictionary-of-varying-depth

palaestrai.experiment.run_governor module

class palaestrai.experiment.run_governor.RunGovernor(uid: str | None = None)[source]

Bases: object

This class implements the Run-Governor.

Upon receiving requests from the executor, a RunGovernor instance handles a single experiment run by starting it, initialize the simulation controllers, the environment and the agent conductors, and, finally, shutting the experiment run down.

The RunGovernor is implemented as state machine and this class provides the context for the distinct state classes. A freshly initialized RunGovernor waits in the state PRISTINE until the run method is called by the executor. See the distinct state classes for more information.

Parameters:

uid (str) – The universally unique ID that identifies this run governor

uid

The UUID of this RunGovernor

Type:

str

termination_condition

A reference to the TerminationCondition instance.

Type:

TerminationCondition

run_broker

The broker for the communication with the simulation controller, the agents, and the environments.

Type:

MajorDomoBroker

experiment_run_id

The UUID of the current experiment run.

Type:

str

tasks

A list of tasks the RunGovernor has started and that it has to shutdown in the end.

Type:

List[aiomultiprocess.Process]

worker

The major domo worker for handling incoming requests

Type:

MajorDomoWorker

client

The major domo client for sending requests to other workers.

Type:

MajorDomoClient

shutdown

The major kill switch of the RunGovernor. Setting this to false will stop the RunGovernor after the current state.

Type:

bool

state

Holds the current state instance. The first state is PRISTINE.

Type:

RunGovernorState

errors

A list that is used to collect errors raised in the states.

Type:

List[Exception]

property mdp_service: str
async run()

Main event/state loop of the ESM

This run method is injected into monitored classes if they do not have one already. The structure of run is as follows:

  1. It resets the handlers for SIGCHLD, SIGINT, and SIGTERM to the OS’ default.

  2. It calls monitored.setup(), if it exists.

  3. It creates an ESM instance for the monitored object and adds signal handlers for SIGCHLD, SIGINT, and SIGTERM according to what the monitored class defines (via @ESM.on(signal.SIGINT), etc.)

  4. It transides to the first state, defined by @ESM.enter. It then waits for state changes/events until monitored.stop() is called.

  5. Finally, once the main event/state loop concludes, monitored.teardown() is called (if present).

setup()[source]
stop(error=None)

Stops the ESM.

Stopping the ESM also means shutting down all running processes and cancelling all outstanding tasks (e.g., request monitors).

Paramters

errorException

If given, the ESM will raise this after cleaning up.

palaestrai.experiment.termination_condition module

class palaestrai.experiment.termination_condition.TerminationCondition[source]

Bases: ABC

Control execution flow of simulations.

Termination conditions control the flow of the simulation execution. For every ::palaestrai.envrionment.Environment update and every ::palaestrai.agent.Brain update, the configured termination conditions are queried. They then return a flow control indicator (::SimulationFlowControl).

This base class offers default implementations for two situations:

  • ::TerminationCondition.brain_flow_control is called after an agent’s ::Brain has received a ::Muscle update and had time to think about it.

  • ::TerminationCondition.environment_flow_control is called after an environment update.

The ::SimulationFlowControl enum defines a number of constants. They are ordered, i.e., ::SimulationFlowControl.CONTINUE has the lowest priority, whereas ::SimulationFlowControl.STOP has the highest. The indicator with the highest priority wins overall, i.e., if one agent indicates that the simulation should stop, then it will terminate the current experiment run phase.

brain_flow_control(brain: Brain) SimulationFlowControl[source]

Allows a learning process to control the simulation flow.

A learner can control the simulation, e.g., by indicating that the simulation should be reset or can end when it has become good enough. Descendant classes can reimplement this method. They will receive access to the respective agent’s ::Brain, which contains all the necessary information (e.g., its memory, training success, etc.)

Returns:

An indicator for simulation control: The flow control indicator with the highest priority (i.e., highest value number in the enum) wins.

Return type:

::SimulationFlowControl

check_termination(message, component=None)[source]
environment_flow_control(environment: palaestrai.environment.Environment) SimulationFlowControl[source]

Allows an environment to control the simulation flow.

The logic is the same as for ::.brain_flow_control, except that an environment is now checked. The default implementation is to reset the run when the environment is done (::palaestrai.environment.Environment.done).

phase_flow_control(run_governor: RunGovernor, message: SimulationControllerTerminationRequest | None) SimulationFlowControl[source]

Allows overall control of a simulation phase via the ::RunGovernor

The logic is similar to the of ::.brain_flow_control, with the exception that this function is called in the ::RunGovernor.

palaestrai.experiment.vanilla_rungovernor_termination_condition module

class palaestrai.experiment.vanilla_rungovernor_termination_condition.VanillaRunGovernorTerminationCondition[source]

Bases: TerminationCondition

phase_flow_control(run_governor: RunGovernor, message: SimulationControllerTerminationRequest | None) SimulationFlowControl[source]

Allows overall control of a simulation phase via the ::RunGovernor

The logic is similar to the of ::.brain_flow_control, with the exception that this function is called in the ::RunGovernor.

Module contents