Source code for palaestrai.simulation.vanilla_sim_controller from __future__ import annotations from typing import Tuple import logging import itertools from palaestrai.types import SimulationFlowControl from .simulation_controller import SimulationController LOG = logging.getLogger(__name__) [docs] class VanillaSimController(SimulationController): """Scatter-gather simulation controller for agents This simulation controller implements an execution strategy in which agents act in parallel. """ [docs] async def advance(self): rsp = await self.act(self.agents) _ = await self.step( self.agents, [ a for a in itertools.chain.from_iterable( [r.actuators for r in rsp] ) ], )