Source code for palaestrai.simulation.taking_turns_simulation_controller

from __future__ import annotations

import logging
from palaestrai.core import BasicState
from .simulation_controller import SimulationController, FlowControlChange

LOG = logging.getLogger(__name__)


[docs] class TakingTurnsSimulationController(SimulationController): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) async def simulate(self): """Main simulation task This method is usually scheduled as a task at the end of the simulation setup phase. It can be overwritten by descendant classes to excert complete control over the simulation. """ agents = list(self._agents.values()) # Needed to preserve order ai = 0 # Index of current agent while self.is_running: # With the index ai, we iterate over agents in the order in which # they were loaded, which, in turn, is given by the order in # which comes from the ExperimentRun object. # The index ai wraps (see the end of the loop). # Python's dict is guaranteed to remember the order in which items # were added to it (since Python version 3.7). agent = agents[ai] try: response = (await self.act([agent]))[0] _ = await self.step([agent], response.actuators) except FlowControlChange: self._state = BasicState.STOPPING ai = (ai + 1) % len(agents) LOG.debug( "The simulation has ended, updating agents one last time.", ) await self.flow_control()