Source code for palaestrai.environment.environment

from __future__ import annotations

import inspect
import logging
import typing
from abc import ABC, abstractmethod
from copy import deepcopy
from warnings import warn
from typing import (
    List,
    Union,
    Sequence,
    Optional,
    TYPE_CHECKING,
)

from palaestrai.types import SimTime
from palaestrai.core import EventStateMachine as ESM
from palaestrai.core.protocol import (
    EnvironmentResetRequest,
    EnvironmentResetResponse,
    EnvironmentShutdownRequest,
    EnvironmentShutdownResponse,
    EnvironmentStartRequest,
    EnvironmentStartResponse,
    EnvironmentUpdateRequest,
    EnvironmentUpdateResponse,
)
from .environment_baseline import EnvironmentBaseline
from .environment_state import EnvironmentState

if TYPE_CHECKING:
    from ..agent import (
        ActuatorInformation,
        SensorInformation,
        RewardInformation,
    )
    from .reward import Reward
    from .environment_state_transformer import EnvironmentStateTransformer

LOG = logging.getLogger(__name__)


[docs] @ESM.monitor(is_mdp_worker=True) class Environment(ABC): """Abstract class for environment implementation This abstract calls provides all necessary functions needed to implement a new environment. The developer only has to implement the functions start_environment and update. Parameters ---------- uid : str Unique identifier to identify an environment Attributes ---------- reward : ::`Reward` If present, this method calculates the reward of the environment ( ("external reward"). See ::`EnvironmentState.world_state`. _sensor_ids: List[str] A list of sensor IDs including the UID of the environment. _actuator_ids: List[str] A list of actuator IDs including the UID of the environment. """ def __init__( self, uid: str, *args, **kwargs, ): self._uid = uid self._name: str = "" self._done = False if "seed" in kwargs: self.seed = kwargs["seed"] elif len(args) > 0 and isinstance( args[1], int ): # very hacky, just to see if MosaikEnvironment still breaks self.seed = args[1] if "broker_uri" in kwargs: warn( f"broker_uri is deprecated, please update your environment.", DeprecationWarning, ) self.reward: Optional[Reward] = None self.sensors: List[SensorInformation] = [] self.actuators: List[ActuatorInformation] = [] self._state_transformer: Optional[EnvironmentStateTransformer] = None # Filter lists for incoming actuators self._sensor_ids: List[str] = list() self._actuator_ids: List[str] = list() self._tick = 0 LOG.debug("%s created", self) @property def done(self) -> bool: """Checks whether the environment has terminated""" return self._done @property def uid(self) -> str: """The unique identifier of the Environment object""" return str(self._uid) @property def name(self) -> str: """User-given name of this environment""" return self._name
[docs] def setup(self): self.mdp_service = self.uid LOG.debug("%s started", self)
def _remove_uid( self, actuator_information_list: List[ActuatorInformation] ): for i in actuator_information_list: if isinstance(i, (int, float)): continue if i.uid.startswith(f"{self._name}."): # Sensor/Actuators themselves may also contain a dot. Only up # to the first dot we have the environment's UID: i.uid = ".".join(i.uid.split(".", 1)[1:]) def _prepend_uid( self, sensor_actuator_information_list: Sequence[ Union[SensorInformation, ActuatorInformation] ], ): for i in sensor_actuator_information_list: if isinstance(i.uid, (int, float)): i.id = f"{i.uid}" if isinstance(i.uid, str) and i.uid.startswith(f"{self._name}."): continue i.uid = f"{self._name}.{i.uid}" @ESM.on(EnvironmentStartRequest) def _handle_setup( self, request: EnvironmentStartRequest ) -> EnvironmentStartResponse: """Handle an environment start request. The :meth:`.start_environment` is called that can be used by environments for setup purposes and that should provide the available sensors and actuators. Finally, a start response is prepared. Parameters ---------- request: EnvironmentStartRequest The start request from the simulation controller. Returns ------- EnvironmentStartResponse The answer from the environment, contains the available sensors and actuators. """ LOG.debug( "Environment %s received a %s", self, repr(request), ) # Start environment, return baseline; then, # prepend our UID to it to allow a sensor/actuator to be # distinguished by its containing environment. simtime = ( inspect.signature(EnvironmentStartResponse) .parameters["simtime"] .default ) baseline = self.start_environment() if isinstance(baseline, tuple): sensors, actuators = baseline elif isinstance(baseline, EnvironmentBaseline): sensors = baseline.sensors_available actuators = baseline.actuators_available simtime = baseline.simtime else: # Wtf?! raise RuntimeError( "Unknown return value from environment start: %s " % (str(baseline)) ) sensors = deepcopy(sensors) actuators = deepcopy(actuators) self._prepend_uid([*sensors, *actuators]) self._sensor_ids = [sen.uid for sen in sensors] self._actuator_ids = [act.uid for act in actuators] self._done = False LOG.info("Environment %s has been set up.", self) return EnvironmentStartResponse( sender_environment=self.uid, receiver_simulation_controller=request.sender, experiment_run_id=request.experiment_run_id, experiment_run_instance_id=request.experiment_run_instance_id, experiment_run_phase=request.experiment_run_phase, sensors=sensors, actuators=actuators, simtime=simtime, ) @ESM.on(EnvironmentUpdateRequest) def _handle_update( self, request: EnvironmentUpdateRequest ) -> EnvironmentUpdateResponse: """Handle an environment update request. The request contains current actuator values and the environment receives the actuator values in the update method. The environment answers with updated sensor readings, an environment reward, and the done flag, whether the environment has finished or not. Finally, an update response is prepared. Parameters ---------- request: EnvironmentUpdateRequest The update request from the simulation controller, contains the current actuator values from one or more agent. Returns ------- EnvironmentUpdateResponse The response for the simulation controller, containing the updated sensor values, a reward, and the done flag. """ actuators = [ act for act in request.actuators if act.uid in self._actuator_ids ] actuators = deepcopy(actuators) self._remove_uid(actuators) self._tick += 1 state = self.update(actuators) if isinstance(state, tuple): # Handle old-style updates by upgrade: state = EnvironmentState( sensor_information=deepcopy(state[0]), world_state=deepcopy(state[0]), rewards=state[1], done=state[2], simtime=SimTime(simtime_ticks=self._tick), ) elif isinstance(state, EnvironmentState): pass # No conversion to do else: # Wtf?! raise RuntimeError( "Unknown return value from environment update: %s " % (str(state)) ) self._prepend_uid(state.sensor_information) if self._state_transformer: state = self._state_transformer(state) LOG.debug("%s got stepped on: %s", self, request) self._done = state.done return EnvironmentUpdateResponse( sender_environment_id=self.uid, receiver_simulation_controller_id=request.sender, experiment_run_id=request.experiment_run_id, experiment_run_instance_id=request.experiment_run_instance_id, experiment_run_phase=request.experiment_run_phase, environment_name=self._name, sensors=state.sensor_information, rewards=state.rewards, world_state=state.world_state, done=state.done, simtime=state.simtime, ) @ESM.on(EnvironmentResetRequest) def _handle_reset( self, request: EnvironmentResetRequest ) -> EnvironmentResetResponse: """Handle an environment reset request. The actual behavior of the restart is delegated to the method :meth:`reset`. Parameters ---------- request: EnvironmentResetRequest The reset request send by the simulation controller. Returns ------- EnvironmentResetResponse The response for the simulation controller. """ LOG.info( 'Environment worker "%s" for environment "%s" resetting.', self.uid, self.name, ) self._tick = 0 self._done = False return self.reset(request) @ESM.on(EnvironmentShutdownRequest) def _handle_shutdown( self, request: EnvironmentShutdownRequest ) -> EnvironmentShutdownResponse: """Handle an environment shutdown request. The :meth:`shutdown` is called that handles the shutdown of the environment. Finally, a shutdown response is prepared. Parameters ---------- request: EnvironmentShutdownRequest The shutdown request from the simulation controller. Returns ------- EnvironmentShutdownResponse The shutdown response for the simulation controller. """ LOG.info("Environment %s now handling shutdown.", self) self.stop() # type: ignore return EnvironmentShutdownResponse( sender=self.uid, receiver=request.sender, experiment_run_id=request.experiment_run_id, experiment_run_instance_id=request.experiment_run_instance_id, experiment_run_phase=request.experiment_run_phase, )
[docs] @abstractmethod def start_environment( self, ) -> Union[ EnvironmentBaseline, typing.Tuple[List[SensorInformation], List[ActuatorInformation]], ]: """Launches execution of an environment. If the environment uses a simulation tool, this function can be used to initiate the simulation tool. In addion, this function is used to prepare the environment for the simulation. It must be able to provide initial sensor information. On a reset, this method is called to restart a new environment run. Therefore, it also must provide initial values for all variables used! Returns ------- Union[EnvironmentBaseline, typing.Tuple[List[SensorInformation], List[ActuatorInformation]]] An :class:`~EnvironmentBaseline` object containing all initial data from the environment. For backwards compatibility, it is also possible (though deprecated) to return a tuple containing a list of available sensors and a list of available actuators. """ pass
[docs] @abstractmethod def update(self, actuators: List[ActuatorInformation]) -> Union[ EnvironmentState, typing.Tuple[List[SensorInformation], List[RewardInformation], bool], ]: """Function to update the environment This function receives the agent's actions and has to respond with new sensor information. This function should create a new simulation step. Parameters ---------- actuators : List[ActuatorInformation] List of actuators with values Returns ------- Union[EnvironmentState, typing.Tuple[List[SensorInformation], List[RewardInformation], bool]] An :class:`~EnvironmentState` object; for backwards compatibility, environments can return a tuple containing a list of sensor readings (:class:`~SensorInformation`), a list of rewards (:class:`~RewardInformation`), and a flag whether the environment has terminated. Returning a tuple is considered deprecated. """ pass
[docs] def reset( self, request: EnvironmentResetRequest ) -> EnvironmentResetResponse: """Resets the environment in-place. The default behavior for a reset comprises: * calling shutdown to allow a graceful shutdown of environment simulation processes * calling :meth:`~.start_environment` again * preparing the :class:`EnvironmentResetResponse` If an environment requires a more special reset procedure, this method can be overwritten. Parameters ---------- request : EnvironmentResetRequest The reset request send by the simulation controller. Returns ------- EnvironmentResetResponse The response for the simulation controller. """ LOG.debug( "Environment %s stopped the leftovers " "of the previous run. Initiating a new run now.", self, ) simtime = ( inspect.signature(EnvironmentResetResponse) .parameters["simtime"] .default ) baseline = self.start_environment() if isinstance(baseline, tuple): sensors, actuators = baseline elif isinstance(baseline, EnvironmentBaseline): sensors = baseline.sensors_available actuators = baseline.actuators_available simtime = baseline.simtime else: # Wtf?! raise RuntimeError( "Unknown return value from environment start: %s " % (str(baseline)) ) sensors = deepcopy(sensors) actuators = deepcopy(actuators) self._prepend_uid([*sensors, *actuators]) LOG.info('Environment worker "%s" restarted.', self.uid) return EnvironmentResetResponse( receiver_simulation_controller_id=request.sender, sender_environment_id=self.uid, create_new_instance=False, sensors=sensors, actuators=actuators, simtime=simtime, )
def __str__(self): return f"{self.__class__.__name__}(id=0x{id(self):x}, uid={self.uid}, tick={self._tick})"