palaestrai.experiment package#

Submodules#

palaestrai.experiment.executor module#

exception palaestrai.experiment.executor.DeadMajorDomoBrokerError[source]#

Bases: RuntimeError

class palaestrai.experiment.executor.Executor[source]#

Bases: object

The executor is the entrypoint for every run execution.

The role of the executor is to receive new experiment runs and distribute them to existing RunGovernor instances. If palaestrAI is used in local run mode, the executor will initialize a RunGovernor.

Furthermore, the Executor can stop running experiment runs.

Notes

At some point, when the core protocol has progressed far enough, we will be able to run several experiments at once from an executor. But, until we’re sure we can, the public API accepts only one experiment.

async cancel(experiment_run_id)[source]#

Shuts an experiment run down prematurely.

This method sends a ExperimentShutdownRequest to the RunGovernor responsible for executing the associated experiment run. This allows for a graceful, yet premature shutdown of a running experiment run. Normally, experiment runs terminate when their termination condition is met, so this method provides a way for an external user to interfere with a running experiment.

Parameters:

experiment_run_id (str) – UID of the experiment run to shut down.

async execute()[source]#

Executes an experiment run.

This method starts the whole experiment run execution process. It initializes the RunGovernor for the experiment run and sets up communication. This method returns only if:

  1. The experiment run has terminated successfully;

  2. an error has occurred, in which case an exception is raised;

  3. the user has terminated the process (e.g., by hitting ^C).

Returns:

The state the executor is now in, either “SHUTDOWN” if everything exited normally, or one of the SIG* states if a signal was received.

Return type:

ExecutorState

experiment_runs() List[ExperimentRunRuntimeInformation][source]#
schedule(experiment_run: palaestrai.experiment.ExperimentRun | Sequence[palaestrai.experiment.ExperimentRun])[source]#

Schedules an experiment run to be executed.

This method schedules experiment runs, i.e., puts them in the waiting queue. The main loop (started by ::execute) picks up experiment run objects and executes them.

Parameters:
  • experiment_run (Union[palaestrai.experiment.ExperimentRun,) –

  • Sequence[palaestrai.experiment.ExperimentRun]] – One or many palaestrai.experiment.ExperimentRun objects, which are added to the queue.

shutdown()[source]#

Performs an oderly shutdown

class palaestrai.experiment.executor.ExecutorState(value)[source]#

Bases: Enum

An enumeration.

EXITED = 4#
INITIALIZED = 1#
PRISTINE = 0#
RUNNING = 2#
SHUTDOWN = 3#
SIGABRT = 6#
SIGINT = 5#
SIGTERM = 7#
class palaestrai.experiment.executor.ExperimentRunRuntimeInformation(experiment_run: palaestrai.experiment.ExperimentRun, started_at: datetime | None = None, run_governor_id: str | None = None, experiment_run_id: str | None = None)[source]#

Bases: object

Accumulated information about the one experiment run

This structure contains information about one experiment run. It stores on which RunGovernor it is executed, when it was started, which run is being executed and what ID the experiment run has.

experiment_run: palaestrai.experiment.ExperimentRun#
experiment_run_id: str | None = None#
property is_running#
run_governor_id: str | None = None#
started_at: datetime | None = None#
exception palaestrai.experiment.executor.ExperimentRunStartError(experiment_run_id, run_governor_id, message)[source]#

Bases: RuntimeError

exception palaestrai.experiment.executor.InterruptSignal[source]#

Bases: RuntimeError

palaestrai.experiment.experiment_run module#

This module contains the class ExperimentRun that defines an experiment run and contains all the information needed to execute it.

class palaestrai.experiment.experiment_run.ExperimentRun(uid: str | None, seed: int | None, version: str | None, schedule: List[Dict], run_config: dict)[source]#

Bases: object

Defines an experiment run and stores information.

The experiment run class defines a run in palaestrAI. It contains all information needed to execute the run. With the setup function the experiment run can be build.

SCHEMA_FILE = 'run_schema.yaml'#
agent_conductors(phase=0)[source]#
property canonical_config#
static check_syntax(path_or_stream: str | IO[str] | PathLike) SyntaxValidationResult[source]#

Checks if the provided experiment configuration conforms with our syntax.

Parameters:

path_or_stream

  1. Path - Same as above

  2. Any text stream

Returns:

  • SyntaxValidationResult

  • Custom object that contains the following information

    1. SyntaxValidationResult.is_valid: Whether the provided experiment

      is valid or not (::bool).

    2. SyntaxValidationResult.error_message: Contains ::None if the

      experiment is valid or the corresponding error message if it is invalid.

static constr_randomstate(constructor, node)[source]#
create_subseed() int[source]#

uses the seeded random number generator to create reproducible sub-seeds

environment_conductors(phase=0) Dict[str, EnvironmentConductor][source]#
get_episodes(phase: int)[source]#
get_phase_name(phase: int)[source]#
has_next_phase(current_phase)[source]#

Return if this run has a subsequent phase.

Parameters:

current_phase (int) – Index of the phase that is being executed.

Returns:

True if at least one phase is taking place after the current phase.

Return type:

bool

property instance_uid#

The unique ID of this particular experiment run instance

As an ::ExperimentRun object is transferred via network, stored in the DB, etc., it still remains the same instance, but it becomes different objects in memory. This UID identifies it even if it travels over the network.

Returns:

The instances unique ID

Return type:

str

static load(str_path_stream_or_dict: str | Path | Dict | IO[str])[source]#

Load an ::ExerimentRun object from a serialized representation.

This method serves as deserializing constructor. It takes a path to a file, a dictionary representation, or a stream and creates a new ::ExperimentRun object from it.

This method also validates the string/stream representation.

Parameters:

str_path_stream_or_dict (Union[str, Path, Dict, IO[str]]) –

  • If str, it is interpreted as a file path, and the file is resolved and loaded;

  • if Path, the same happens as above;

  • if Dict, the ::ExperimentRun object is initialized directly from the values of the Dict;

  • if TextIO, the method assumes that it is a serialzed representation of the ::ExperimentRun object (e.g., from an open file stream) and interprets it as YAML (with a prior syntax/schema check).

Returns:

An initialized, de-serialized ::ExperimentRun object

Return type:

ExperimentRun

property num_phases#

The number of phases in this experiment run’s schedule.

phase_configuration(phase: int)[source]#
static repr_randomstate(representer, data)[source]#

Custom serializer and deserializer so we can dump our subseed Data = rng

save(experiment_uid: str | None = None, session: Session | None = None)[source]#

Save an ::ExerimentRun object to the store.

This method saves an experiment run and adds it to the database. Connection credentials are taken from the runtime config. If an experiment_uid is supplied, then the experiment run is also associated with it in the database. A session instance can also be supplied in order ot reuse an open database connection. Otherwise, a new connection will be opened.

Parameters:
  • experiment_uid (Optional[str]) – The unique ID of this particular experiment run instance

  • session (Optional[Session]) – Creates a new, connected database session to run queries on.

setup(broker_uri)[source]#

Set up an experiment run.

Creates and configures relevant actors.

simulation_controllers(phase=0)[source]#
exception palaestrai.experiment.experiment_run.RunDefinitionError(run: ExperimentRun, message)[source]#

Bases: RuntimeError

palaestrai.experiment.experiment_run.update_dict(src, upd)[source]#

Recursive update of dictionaries.

See stackoverflow:

https://stackoverflow.com/questions/3232943/ update-value-of-a-nested-dictionary-of-varying-depth

palaestrai.experiment.run_governor module#

class palaestrai.experiment.run_governor.RunGovernor(broker_uri: str, uid: str | None = None)[source]#

Bases: object

This class implements the Run-Governor.

Upon receiving requests from the executor, a RunGovernor instance handles a single experiment run by starting it, initialize the simulation controllers, the environment and the agent conductors, and, finally, shutting the experiment run down.

The RunGovernor is implemented as state machine and this class provides the context for the distinct state classes. A freshly initialized RunGovernor waits in the state PRISTINE until the run method is called by the executor. See the distinct state classes for more information.

Parameters:
  • broker_connection (str) – The URI representing the connection to the broker for communication with the executor.

  • termination_condition (TerminationCondition) – The condition that tells the RunGovernor when to stop the simulation.

  • rungov_uid (str) – The UUID for this RunGovernor is provided by the executor.

uid#

The UUID of this RunGovernor

Type:

str

termination_condition#

A reference to the TerminationCondition instance.

Type:

TerminationCondition

run_broker#

The broker for the communication with the simulation controller, the agents, and the environments.

Type:

MajorDomoBroker

experiment_run_id#

The UUID of the current experiment run.

Type:

str

tasks#

A list of tasks the RunGovernor has started and that it has to shutdown in the end.

Type:

List[aiomultiprocess.Process]

worker#

The major domo worker for handling incoming requests

Type:

MajorDomoWorker

client#

The major domo client for sending requests to other workers.

Type:

MajorDomoClient

shutdown#

The major kill switch of the RunGovernor. Setting this to false will stop the RunGovernor after the current state.

Type:

bool

state#

Holds the current state instance. The first state is PRISTINE.

Type:

RunGovernorState

errors#

A list that is used to collect errors raised in the states.

Type:

List[Exception]

async run()[source]#

Start the main loop of the run governor.

In each iteration, the run method of the current state is called.

palaestrai.experiment.termination_condition module#

class palaestrai.experiment.termination_condition.TerminationCondition[source]#

Bases: ABC

The base class for termination conditions.

Parameters:
  • message – termination request message object

  • component – Component for which the termination condition is to be checked. Only needed if terminiation condition can not be derived from termination request alone.

abstract check_termination(message, component=None)[source]#

palaestrai.experiment.vanilla_rungovernor_termination_condition module#

class palaestrai.experiment.vanilla_rungovernor_termination_condition.VanillaRunGovernorTerminationCondition[source]#

Bases: TerminationCondition

check_termination(message, component)[source]#

Module contents#