from __future__ import annotations
import itertools
import logging
from dataclasses import dataclass, field, fields
from collections import deque, OrderedDict, defaultdict
from typing import (
Any,
List,
Tuple,
Optional,
Dict,
Set,
Union,
DefaultDict,
)
import numpy as np
import pandas
import pandas as pd
from .actuator_information import ActuatorInformation
from .reward_information import RewardInformation
from .sensor_information import SensorInformation
LOG = logging.getLogger(__name__)
@dataclass
class MemoryShard:
"""Collected data from one muscle
Attributes
----------
sensor_readings : pd.DataFrame
Column-wise (original) sensor readings as they are provided by the
environments. Each sensor name is a column; the
:class:`SensorInformation` objects are stored as-is.
actuator_setpoints : pd.DataFrame
Column-wise (original) actuator setpoints. Each actuator name is a
column; this data frame stores the :class:`ActuatorInformation`
objects as-is.
rewards : pd.DataFrame
Column-wise environment rewards; stores the :class:`RewardInformation`
objects as-is, with each reward having its own column
observations : np.ndarray
Transformed observations: Any data :class:`Muscle` and :class:`Brain`
want to store
actions : np.ndarray
Transformed observations: Any data :class:`Muscle` and :class:`Brain`
want to store
objective : np.ndarray
Result of calling the agent's objective function
dones : np.ndarray
Whether the simulation was done at the respective time index or not.
additional_data : pd.DataFrame
Any additional data a :class:`Muscle` shares with is :class:`Brain`
"""
sensor_readings: pd.DataFrame
actuator_setpoints: pd.DataFrame
rewards: pd.DataFrame
dones: np.ndarray
observations: np.ndarray
actions: np.ndarray
objective: np.ndarray
additional_data: list
def __len__(self):
"""Number of usable entries in this particular muscle memory
"Usable" entries are entries that have at least sensor inputs,
actuator setpoints, and related rewards.
"""
return len(self.rewards)
@staticmethod
def concat(
memories: Union[List[MemoryShard], Tuple[MemoryShard]]
) -> MemoryShard:
"""Concatenate a number of ::`MemoryShard` objects
All attributes are concatenated in order.
"""
# Handle special case with len == 0:
if len(memories) == 0:
return MemoryShard(
sensor_readings=pd.DataFrame(),
actuator_setpoints=pd.DataFrame(),
rewards=pd.DataFrame(),
dones=np.array([]),
observations=np.array([]),
actions=np.array([]),
objective=np.array([]),
additional_data=[],
)
# Make sure to keep semantics with the NumPy arrays:
return MemoryShard(
sensor_readings=pd.concat(
[m.sensor_readings for m in memories], ignore_index=True
),
actuator_setpoints=pd.concat(
[m.actuator_setpoints for m in memories], ignore_index=True
),
rewards=pd.concat(
[m.rewards for m in memories], ignore_index=True
),
dones=np.concatenate([m.dones for m in memories]),
observations=np.vstack(
[
np.resize(
m.observations,
np.max([y.observations.shape[1] for y in memories]),
)
for m in memories
]
),
actions=np.vstack(
[
np.resize(
m.actions,
np.max([y.actions.shape[1] for y in memories]),
)
for m in memories
]
),
objective=np.concatenate([m.objective for m in memories]),
additional_data=[
x
for x in itertools.chain.from_iterable(
m.additional_data for m in memories
)
],
)
@dataclass
class _MuscleMemory:
sensor_readings: deque[List[SensorInformation]] = field(
default_factory=deque
)
actuator_setpoints: deque[List[ActuatorInformation]] = field(
default_factory=deque
)
rewards: deque[List[RewardInformation]] = field(default_factory=deque)
dones: deque[bool] = field(default_factory=deque)
observations: deque[np.ndarray] = field(default_factory=deque)
actions: deque[np.ndarray] = field(default_factory=deque)
objective: deque[np.ndarray] = field(default_factory=deque)
additional_data: deque[Any] = field(default_factory=deque)
@staticmethod
def _get_from_deque(d: deque, item: int, default=None):
try:
return d[item]
except IndexError:
return default
@staticmethod
def _infos_to_df(
infos: Union[
List[SensorInformation],
List[ActuatorInformation],
List[RewardInformation],
]
) -> pd.DataFrame:
data = defaultdict(list)
for i in infos:
data[i.uid].append(i.value)
return pd.DataFrame(data)
def __getitem__(self, item: int) -> MemoryShard:
"""Receives a "full row" from the MuscleMemory
A full row is defined by Rewards being present; other values are either
retrieved from the memory if present or substituted with (empty)
default values.
Returns
-------
shard : MemoryShard
A fully expanded :class:`MemoryShard`
"""
rewards = self.rewards[item] # Better fail here directly, if needed.
sensor_readings = _MuscleMemory._get_from_deque(
self.sensor_readings, item, []
)
actuator_setpoints = _MuscleMemory._get_from_deque(
self.actuator_setpoints, item, []
)
dones = _MuscleMemory._get_from_deque(self.dones, item, [False])
observations = _MuscleMemory._get_from_deque(
self.observations, item, [np.NAN]
)
actions = _MuscleMemory._get_from_deque(self.actions, item, [np.NAN])
objective = _MuscleMemory._get_from_deque(
self.objective, item, [np.NAN]
)
additional_data = _MuscleMemory._get_from_deque(
self.additional_data, item, []
)
return MemoryShard(
sensor_readings=_MuscleMemory._infos_to_df(sensor_readings),
actuator_setpoints=_MuscleMemory._infos_to_df(actuator_setpoints),
rewards=_MuscleMemory._infos_to_df(rewards),
dones=np.array([dones]),
objective=np.array(objective), # Shape (1,)
observations=np.array([observations]),
actions=np.array([actions]),
additional_data=additional_data,
)
[docs]
class Memory:
"""An in-memory data structure to store experinences in a ::`~Brain`.
Each agent needs a memory to store experiences, regardless of the training
algorithm that is used. This class represents this memory. It is an
in-memory data strcture that uses pandas DataFrames for its public API.
The memory stores observations, actions, rewards given from the
envrionment, and the internal reward of the agent (objective value). The
memory is passed to an :class:`~Objective` to calculate the objective value
from rewards.
Parameters
----------
size_limit : int = 1e6
Maximum size the memory is allowed to grow to until old entries are
overwritten by new ones.
"""
def __init__(self, size_limit: int = int(1e6)):
self.size_limit = size_limit
self._data: DefaultDict[str, _MuscleMemory] = defaultdict(
_MuscleMemory
)
self._index: deque[str] = deque()
@property
def tags(self) -> Set[str]:
"""All tags known to this memory"""
return set(self._data.keys())
[docs]
def append(
self,
muscle_uid: str,
sensor_readings: Optional[List[SensorInformation]] = None,
actuator_setpoints: Optional[List[ActuatorInformation]] = None,
rewards: Optional[List[RewardInformation]] = None,
done: Optional[bool] = None,
observations: Optional[np.ndarray] = None,
actions: Optional[np.ndarray] = None,
objective: Optional[np.ndarray] = None,
additional_data: Optional[Dict] = None,
):
"""Stores a new item in the agent's memory (append)
An agent has experiences throughout its existence. The memory stores
those by appending them.
The memory stores at least those pieces of information that come from
an environment, which are:
* sensor readings
* actuator setpoints (as issued by the agent)
* rewards
* whether the simulation has terminated (is "done")
Readings, setpoints, and rewards are stored in their palaestrAI-native
objects: :class:`SensorInformation`, :class:`ActuatorInformation`, and
:class:`RewardInformation`.
Additionally, an agent (i.e., its muscle) may store its own view in
terms of transformed values.
Parameters
----------
muscle_uid : str
UID of the agent (:class:`Muscle`) whose experiences we store
sensor_readings : List[SensorInformation]
A muscle's sensor readings as provided by the environment
actuator_setpoints : List[ActuatorInformation]
A muscle's setpoints as provided to an environment
rewards : List[RewardInformation]
Rewards issued by the environment. It is not necessary that
sensor readings, setpoints, and rewards belong to the same
time step; usually, rewards at a time step ``t`` belong to the
sensor readings and actions from ``t-1``. This memory class
correctly correlates rewards to the previous readings/actions.
done : bool = False
Whether this was the last action executed in the environment
observations : Optional[np.ndarray] = None
Observations the :class:`Muscle` wants to share with its
:class:`Brain`, e.g., transformed/scaled values
actions: Optional[np.ndarray] = None,
Action-related data a :class:`Muscle` emitted, such as
probabilities, or other data. Can be fed directly to the
corresponding :class:`Brain`, as with ``observations``
objective: Optional[np.ndarray] = None
The agent's objective value describing its own goal. Optional,
because the agent might calculate such a value separately.
additional_data : Optional[Dict] = None
Any additional data a :class:`Muscle` wants to store
"""
muscle_memory: _MuscleMemory = self._data[muscle_uid]
# Add everything that we've been supplied with to the respective
# muscle memery:
if sensor_readings is not None:
muscle_memory.sensor_readings.append(sensor_readings)
if actuator_setpoints is not None:
muscle_memory.actuator_setpoints.append(actuator_setpoints)
if rewards is not None:
muscle_memory.rewards.append(rewards)
if additional_data is not None:
muscle_memory.additional_data.append(additional_data)
if done is not None:
muscle_memory.dones.append(done)
if observations is not None:
muscle_memory.observations.append(observations)
if actions is not None:
muscle_memory.actions.append(actions)
if objective is not None:
muscle_memory.objective.append(objective)
# A "full row" is defined by having rewards supplied. In order to
# remember which muscle supplied which values, we index full rows
# in self._index. We simply append the muscle name (aka tag) to that
# self._index deque.
if rewards is not None:
self._index.append(muscle_uid)
self.truncate(self.size_limit)
[docs]
def tail(self, n=1):
"""Returns the n last full entries
This method returns a nested data frame that returns the n last entries
from the memory. This method constructs a multi-indexed data frame,
i.e., a dataframe that contains other dataframes. You access each
value through the hierarchy, e.g.,
df = memory.tail(10)
df.observations.uid.iloc[-1]
Parameters
----------
n : int = 1
How many data items to return, counted from the latest addition.
Defaults to 1.
Returns
-------
MemoryShard :
A dataclass that contains the *n* last full entries, i.e.,
*all* entries where the
(observations, actions, rewards, objective)
quadruplet is fully set. I.e., you can be sure that the all
indexes correspond to each other, and that calling ``iloc``
with an index really gives you the n-th observation, action, and
reward for it.
However, if for whatever reason the environment returned an
empty reward, this will also be included. This is in contrast to
the ::`~.sample` method, which will return only entries with where
an associated reward is also present.
"""
s = max(0, len(self._index) - abs(n))
tags = list(itertools.islice(self._index, s, len(self._index)))
tag_cur_idx = defaultdict(lambda: -1)
tags_to_query = deque()
for tag in reversed(tags):
idx = tag_cur_idx[tag]
tags_to_query.append((tag, idx))
tag_cur_idx[tag] = idx - 1
return MemoryShard.concat(
[self._data[t][i] for t, i in reversed(tags_to_query)]
)
[docs]
def truncate(self, n: int):
"""Truncates the memory: Only the last *n* entries are retained.
Parameters
----------
n : int
How many of the most recent entries should be retained. Negative
values of n are treated as ``abs(n)``.
"""
n = abs(n)
if len(self) <= n:
return
upto_exc = len(self) - n
for _ in range(upto_exc):
tag = self._index.popleft()
mem = self._data[tag]
_ = mem.rewards.popleft() # If this crashes, we're in trouble.
for f in [x for x in fields(mem) if x.name != "rewards"]:
try:
_ = mem.__dict__[f.name].popleft()
except:
pass # Ok, this may fail
def __len__(self) -> int:
"""Returns the number of fully usable entries in the memory.
"Fully usable entries" are those returned by, e.g., ::`~sample()`.
I.e., the quadruplet of (observation, action, reward, objective).
"""
return len(self._index)