Source code for palaestrai.environment.environment

from __future__ import annotations

import inspect
import logging
import signal
import typing
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import (
    List,
    Union,
    Sequence,
    Optional,
    TYPE_CHECKING,
)

from palaestrai.types import SimTime
from palaestrai.core import MajorDomoWorker
from palaestrai.core.protocol import (
    EnvironmentResetRequest,
    EnvironmentResetResponse,
    EnvironmentShutdownRequest,
    EnvironmentShutdownResponse,
    EnvironmentStartRequest,
    EnvironmentStartResponse,
    EnvironmentUpdateRequest,
    EnvironmentUpdateResponse,
)
from .environment_baseline import EnvironmentBaseline
from .environment_state import EnvironmentState

if TYPE_CHECKING:
    from ..agent import (
        ActuatorInformation,
        SensorInformation,
        RewardInformation,
    )
    from .reward import Reward
    from .environment_state_transformer import EnvironmentStateTransformer

LOG = logging.getLogger(__name__)


[docs] class Environment(ABC): """Abstract class for environment implementation This abstract calls provides all necessary functions needed to implement a new environment. The developer only has to implement the functions start_environment and update. Parameters ---------- uid : str Unique identifier to identify an environment broker_uri : str URI used to connect to the simulation broker seed : int Seed for recreation Attributes ---------- reward : ::`Reward` If present, this method calculates the reward of the environment ( ("external reward"). See ::`EnvironmentState.world_state`. _sensor_ids: List[str] A list of sensor IDs including the UID of the environment. _actuator_ids: List[str] A list of actuator IDs including the UID of the environment. """ def __init__( self, uid: str, broker_uri: str, seed: int, ): self._uid = uid self.seed = seed self._ctx = None self._worker = None self._broker_uri = broker_uri self.reward: Optional[Reward] = None self.sensors: List[SensorInformation] = [] self.actuators: List[ActuatorInformation] = [] self._state_transformer: Optional[EnvironmentStateTransformer] = None # Filter lists for incoming actuators self._sensor_ids: List[str] = list() self._actuator_ids: List[str] = list() self.is_terminal = False self._tick = 0 LOG.debug( "Environment %s(id=0x%x, uid=%s) created.", self.__class__, id(self), self.uid, ) @property def uid(self) -> str: """The unique identifier of the Environment object""" return str(self._uid) @property def worker(self): """Return the major domo worker. The worker will be created if necessary. """ if self._worker is None: self._worker = MajorDomoWorker( self._broker_uri, self.uid, ) return self._worker def _handle_sigintterm(self, signum, frame): LOG.info( "Environment %s(id=0x%x, uid=%s) interrupted by signal %s in " "frame %s.", self.__class__, id(self), self.uid, signum, frame, ) raise SystemExit() def _remove_uid( self, actuator_information_list: List[ActuatorInformation] ): for i in actuator_information_list: if isinstance(i, (int, float)): continue if i.uid.startswith(f"{self.uid}."): # Sensor/Actuators themselves may also contain a dot. Only up # to the first dot we have the environment's UID: i.uid = ".".join(i.uid.split(".", 1)[1:]) def _prepend_uid( self, sensor_actuator_information_list: Sequence[ Union[SensorInformation, ActuatorInformation] ], ): for i in sensor_actuator_information_list: if isinstance(i.uid, (int, float)): i.id = f"{i.uid}" if isinstance(i.uid, str) and i.uid.startswith(f"{self.uid}."): continue i.uid = f"{self.uid}.{i.uid}"
[docs] async def run(self): """Main execution loop for an environment object This method takes care of the actual execution. As long as this method does not return, the environment is still active. The method receives and processes incoming messages. It applies changes to itself, i.e., setpoints delivered via :class:`~EnvironmentUpdateRequest` objects. It subsequently takes care of sending the appropriate update responses ( :meth:`~EnvironmentUpdateResponse`). This method also interpretes :class:`~EnvironmentSetupRequest` and :class:`~EnvironmentShutdownRequest` messages. """ reply = None signal.signal(signal.SIGINT, self._handle_sigintterm) signal.signal(signal.SIGTERM, self._handle_sigintterm) LOG.info( "Environment %s(id=0x%x, uid=%s) commencing run.", self.__class__, id(self), self.uid, ) while not self.is_terminal: try: request = await self.worker.transceive(reply) except SystemExit: LOG.critical( "Environment %s(id=0x%x, uid=%s) " "interrupted in transceive by SIGINT/SIGTERM, " "existing run loop", self.__class__, id(self), self.uid, ) break LOG.debug( "Environment %s(id=0x%x, uid=%s) received message: %s", self.__class__, id(self), self.uid, request, ) if request is None: break elif isinstance(request, EnvironmentStartRequest): reply = self._handle_setup(request) elif isinstance(request, EnvironmentUpdateRequest): reply = self._handle_update(request) elif isinstance(request, EnvironmentResetRequest): reply = self._handle_reset(request) elif isinstance(request, EnvironmentShutdownRequest): reply = self._handle_shutdown(request) await self.worker.transceive(reply, skip_recv=True) LOG.info( "Environment %s(id=0x%x, uid=%s) completed shutdown: " "so much fun we had.", self.__class__, id(self), self.uid, )
def _handle_setup( self, request: EnvironmentStartRequest ) -> EnvironmentStartResponse: """Handle an environment start request. The :meth:`.start_environment` is called that can be used by environments for setup purposes and that should provide the available sensors and actuators. Finally, a start response is prepared. Parameters ---------- request: EnvironmentStartRequest The start request from the simulation controller. Returns ------- EnvironmentStartResponse The answer from the environment, contains the available sensors and actuators. """ LOG.debug( "Environment %s(id=0x%x, uid=%s) received a %s", self.__class__, id(self), self.uid, repr(request), ) # Start environment, return baseline; then, # prepend our UID to it to allow a sensor/actuator to be # distinguished by its containing environment. simtime = ( inspect.signature(EnvironmentStartResponse) .parameters["simtime"] .default ) baseline = self.start_environment() if isinstance(baseline, tuple): sensors, actuators = baseline elif isinstance(baseline, EnvironmentBaseline): sensors = baseline.sensors_available actuators = baseline.actuators_available simtime = baseline.simtime else: # Wtf?! raise RuntimeError( "Unknown return value from environment start: %s " % (str(baseline)) ) sensors = deepcopy(sensors) actuators = deepcopy(actuators) self._prepend_uid([*sensors, *actuators]) self._sensor_ids = [sen.uid for sen in sensors] self._actuator_ids = [act.uid for act in actuators] msg = EnvironmentStartResponse( sender_environment=self.uid, receiver_simulation_controller=request.sender, experiment_run_id=request.experiment_run_id, experiment_run_instance_id=request.experiment_run_instance_id, experiment_run_phase=request.experiment_run_phase, sensors=sensors, actuators=actuators, simtime=simtime, ) LOG.debug( "Environment %s(id=0x%x, uid=%s) sending %s", self.__class__, id(self), self.uid, repr(msg), ) return msg def _handle_update( self, request: EnvironmentUpdateRequest ) -> EnvironmentUpdateResponse: """Handle an environment update request. The request contains current actuator values and the environment receives the actuator values in the update method. The environment answers with updated sensor readings, an environment reward, and the done flag, whether the environment has finished or not. Finally, an update response is prepared. Parameters ---------- request: EnvironmentUpdateRequest The update request from the simulation controller, contains the current actuator values from one or more agent. Returns ------- EnvironmentUpdateResponse The response for the simulation controller, containing the updated sensor values, a reward, and the done flag. """ actuators = [ act for act in request.actuators if act.uid in self._actuator_ids ] actuators = deepcopy(actuators) self._remove_uid(actuators) self._tick += 1 state = self.update(actuators) if isinstance(state, tuple): # Handle old-style updates by upgrade: state = EnvironmentState( sensor_information=deepcopy(state[0]), world_state=deepcopy(state[0]), rewards=state[1], done=state[2], simtime=SimTime(simtime_ticks=self._tick), ) elif isinstance(state, EnvironmentState): pass # No conversion to do else: # Wtf?! raise RuntimeError( "Unknown return value from environment update: %s " % (str(state)) ) self._prepend_uid(state.sensor_information) if self._state_transformer: state = self._state_transformer(state) return EnvironmentUpdateResponse( sender_environment_id=self.uid, receiver_simulation_controller_id=request.sender, experiment_run_id=request.experiment_run_id, experiment_run_instance_id=request.experiment_run_instance_id, experiment_run_phase=request.experiment_run_phase, sensors=state.sensor_information, rewards=state.rewards, world_state=state.world_state, done=state.done, simtime=state.simtime, ) def _handle_reset( self, request: EnvironmentResetRequest ) -> EnvironmentResetResponse: """Handle an environment reset request. The actual behavior of the restart is delegated to the method :meth:`reset`. Parameters ---------- request: EnvironmentResetRequest The reset request send by the simulation controller. Returns ------- EnvironmentResetResponse The response for the simulation controller. """ LOG.info( "Environment %s(id=0x%x, uid=%s) performing a reset due to " "EnvironmentResetRequest(simulation_controller_id=%s, " "environment_id=%s).", self.__class__, id(self), self.uid, request.sender, request.receiver, ) return self.reset(request) def _handle_shutdown( self, request: EnvironmentShutdownRequest ) -> EnvironmentShutdownResponse: """Handle an environment shutdown request. The :meth:`shutdown` is called that handles the shutdown of the environment. Finally, a shutdown response is prepared. Parameters ---------- request: EnvironmentShutdownRequest The shutdown request from the simulation controller. Returns ------- EnvironmentShutdownResponse The shutdown response for the simulation controller. """ LOG.info( "Environment %s(id=0x%x, uid=%s) now handling shutdown.", self.__class__, id(self), self.uid, ) _ = self.shutdown() return EnvironmentShutdownResponse( sender=self.uid, receiver=request.sender, experiment_run_id=request.experiment_run_id, experiment_run_instance_id=request.experiment_run_instance_id, experiment_run_phase=request.experiment_run_phase, )
[docs] @abstractmethod def start_environment( self, ) -> Union[ EnvironmentBaseline, typing.Tuple[List[SensorInformation], List[ActuatorInformation]], ]: """Launches execution of an environment. If the environment uses a simulation tool, this function can be used to initiate the simulation tool. In addion, this function is used to prepare the environment for the simulation. It must be able to provide initial sensor information. On a reset, this method is called to restart a new environment run. Therefore, it also must provide initial values for all variables used! Returns ------- Union[EnvironmentBaseline, typing.Tuple[List[SensorInformation], List[ActuatorInformation]]] An :class:`~EnvironmentBaseline` object containing all initial data from the environment. For backwards compatibility, it is also possible (though deprecated) to return a tuple containing a list of available sensors and a list of available actuators. """ pass
[docs] @abstractmethod def update(self, actuators: List[ActuatorInformation]) -> Union[ EnvironmentState, typing.Tuple[List[SensorInformation], List[RewardInformation], bool], ]: """Function to update the environment This function receives the agent's actions and has to respond with new sensor information. This function should create a new simulation step. Parameters ---------- actuators : List[ActuatorInformation] List of actuators with values Returns ------- Union[EnvironmentState, typing.Tuple[List[SensorInformation], List[RewardInformation], bool]] An :class:`~EnvironmentState` object; for backwards compatibility, environments can return a tuple containing a list of sensor readings (:class:`~SensorInformation`), a list of rewards (:class:`~RewardInformation`), and a flag whether the environment has terminated. Returning a tuple is considered deprecated. """ pass
[docs] def reset( self, request: EnvironmentResetRequest ) -> EnvironmentResetResponse: """Resets the environment in-place. The default behavior for a reset comprises: * calling shutdown to allow a graceful shutdown of environment simulation processes * calling :meth:`~.start_environment` again * preparing the :class:`EnvironmentResetResponse` If an environment requires a more special reset procedure, this method can be overwritten. Parameters ---------- request : EnvironmentResetRequest The reset request send by the simulation controller. Returns ------- EnvironmentResetResponse The response for the simulation controller. """ # Allow graceful shutdown ... self.shutdown(reset=True) LOG.debug( "Environment %s(id=0x%x, uid=%s) stopped the leftovers " "of the previous run. Initiating a new run now.", self.__class__, id(self), self.uid, ) # ... but do not break the main loop self.is_terminal = False simtime = ( inspect.signature(EnvironmentResetResponse) .parameters["simtime"] .default ) baseline = self.start_environment() if isinstance(baseline, tuple): sensors, actuators = baseline elif isinstance(baseline, EnvironmentBaseline): sensors = baseline.sensors_available actuators = baseline.actuators_available simtime = baseline.simtime else: # Wtf?! raise RuntimeError( "Unknown return value from environment start: %s " % (str(baseline)) ) sensors = deepcopy(sensors) actuators = deepcopy(actuators) self._prepend_uid([*sensors, *actuators]) LOG.debug( "Environment %s(id=0x%x, uid=%s) restarted successfully.", self.__class__, id(self), self.uid, ) return EnvironmentResetResponse( receiver_simulation_controller_id=request.sender, sender_environment_id=self.uid, create_new_instance=False, sensors=sensors, actuators=actuators, simtime=simtime, )
[docs] def shutdown(self, reset: bool = False) -> bool: """Initiate the environment shutdown. In this function the :attr:`.is_terminal` is set to True, which leads to a break of the main loop in the :meth:`.run` method. Parameters ---------- reset: bool, optional Is set to True when only a reset is required. A concrete environment may distinguish between reset and shutdown. Returns ------- bool ``True`` if the shutdown was successful, ``False`` otherwise. """ self.is_terminal = not reset return True
def __str__(self): return f"{self.__class__.__name__}(id=0x{id(self):x}, uid={self.uid})"