Source code for palaestrai.experiment.agent_objective_termination_condition

from __future__ import annotations

import itertools
from typing import TYPE_CHECKING, Optional, Union, Any, Tuple, Dict

import re
import logging
import numpy as np
from itertools import chain
from dataclasses import dataclass
from collections import defaultdict, deque, namedtuple

from palaestrai.types import SimulationFlowControl
from .termination_condition import TerminationCondition

if TYPE_CHECKING:
    from palaestrai.agent import Brain
    from palaestrai.experiment import RunGovernor
    from palaestrai.core.protocol import (
        SimulationControllerTerminationRequest,
        MuscleUpdateRequest,
    )


@dataclass
class _WorkerCums:
    done: bool
    worker_avgs: Dict


LOG = logging.getLogger(__name__)


[docs] class AgentObjectiveTerminationCondition(TerminationCondition): """ Brain controls execution flow of experiments. This termination condition allows to control the simulation flow based on the overall success of an agent. Users may supply any objective average to terminate the flow, which will lead to a ::`SimulationFlowControl.RESET` during for an episode, and ::`SimulationFlowControl.STOP_PHASE` on phase flow control level. I.e., when an agent becomes successful during an episode, it will request to restart that episode. If the agent becomes successful over a number of episodes, the phase will end. Threshhold values are given in the termination condition's parameters for each agent. Under each agent key, the actual threshhold values are given. The keys follow a specific pattern: {brain|phase}_avg{number}, where "{brain|phase}" means either "brain" or "phase, and "number" is the number for the floating average. ``brain_avgN`` specifies that an agent signals to end an episode once the mean of the last *N* objective values is equal or more than the number given. The simulation controller can then decide to end the episode. This change in flow control is only relevant for the current worker; i.e., other workers will continue until they are equally successful, or the phase ends for another reason. I.e., .. math:: \\frac{1}{N} \\sum [r_{T-N}, r_{T-N+1}, \\dotsc, r_{T}] \\ge X ``phase_avgN`` signals termination of a *phase* once the *average cumulative* reward of the last *N* episodes is equal to or greater than the number given. I.e., this parameter considers the average reward of all steps over all workers (1 worker = 1 episode), since a worker acts within one particular episode. Put in math: .. math:: \\frac{1}{N} \\sum_{\\mathit{episode = 1}}^{N} \\sum \\frac{1}{M} [ r_1, r_2, \\dotsc, r_M ]_\\mathit{episode} where *M* is the number of steps in a particular episode. .. note:: Any particular ``phase_avgN`` must hold for *all* workers. Suppose you have 2 workers, then a ``phase_avg10: 1.0`` forces both workers to have at least 10 successful episodes, where the average objective value over all steps is at least 1.0. E.g., * ``brain_avg100: 8.9`` as parameter means that the episode ends once the brain reaches an objective score of at least 8.9, averaged over the last 100 actions. * ``brain_avg10: 8.9``: similar to the above, except that the averaging is done over 10 actions. * ``phase_avg10: 1.0``: ends the phase once the average cumulative success of the brain from the last 10 *episodes* of *all workers* is at least 1.0. .. warning:: A word of caution: Make sure that your ``brain_avgN`` and ``phase_avgN`` definitions are compatible, mathematically speaking. A ``brain_avg10: 100`` does not necessarily imply that ``phase_avg10: 100`` also holds. The ``brain_avg10`` considers the last 10 steps of one episode, while ``phase_avg10`` considers the average objective value of all steps in 10 episodes. Misaligning them can easily create a setup during which the phase never terminates. As an example, suppose your objective value of step 1 is 1, step 2 yields an objective value of 2, step 3 of 3, etc. Then, ``brain_avg10: 100`` will terminate after 105 steps, because the average objective value over the last 10 steps is greater than 100, as (96 + 97 + ... + 104 + 105) / 10.0 = 100.5. However, the average objective value over all steps for each episode is 53 = (1 + 2 + ... + 105) / 105, so the average value over the last 10 episodes is also 53 and thus the condition ``phase_avg10: 100`` does not rise and the phase will never terminate as always 53 < 100. If you specify any ``avgN``, then the termination condition will ensure that at least *N* actions are recorded before calculating the average. Meaning: If your environment terminates after N steps, but you specify a ``brain_avgM``, with N < M, then the termination condition is never calculated. To calculate the average of the last 10 steps, the agent must have had the change to act 10 times, after all. .. note:: For technical reasons, you must specify a ``brain_avg*`` parameter if you want to use ``phase_avg*``, as the result of a brain objective averaging is transmitted to the phase-specific portion of the termination condition. However, a special case exist when specifying a ``brain_avgN`` parameter, but not a ``phase_avgN`` parameter. Then, the first agent that triggers the termination condition during an episode will end the whole phase. Examples -------- The following snipped is a shortened example from palaestrAI's experiment definition:: definitions: agents: myagent: name: My Agent # (Other agent definitions omitted) simulation: tt: name: palaestrai.simulation:TakingTurns conditions: - name: palaestrai.experiment:AgentObjectiveTerminationCondition params: My Agent: brain_avg100: 8.9 run_config: condition: name: palaestrai.experiment:AgentObjectiveTerminationCondition params: My Agent: phase_avg100: 8.9 This configuration means that an episode ends once that last 100 steps have an average objective of at least 8.9. The phase ends once the average reward of the last 10 episodes is, on average, at least 8.9. I.e., consider 10 episodes with an average reward of 10, 11, 6, 12, 15, 20, 17, 11, 9, 10, then the phase termination condition will hold, as (10 + 11 + 6 + 12 + 15 + 20 + 17 + 11 + 9 + 10) / 10 = 12.1 > 8.0 """ _AVG_RE = re.compile(r"(brain|phase)_avg(\d+)\Z") def __init__(self, *args, **kwargs): super().__init__() self._brain_avgs = defaultdict(dict) self._phase_avgs = defaultdict(dict) self._stop_phase_for_all = False for auid, conds in kwargs.items(): if not isinstance(conds, dict): continue for k, v in conds.items(): m = re.search( AgentObjectiveTerminationCondition._AVG_RE, str(k) ) if m and m.group(1) == "brain": self._brain_avgs[auid][int(m.group(2))] = float(v) if m and m.group(1) == "phase": self._phase_avgs[auid][int(m.group(2))] = float(v) if len(self._brain_avgs) == 0 and len(self._phase_avgs) == 0: LOG.warning( "%s condition does not have any configuration. This " "will essentially be a noop. " "Please check your arguments: %s", self.__class__.__name__, kwargs, ) self._max_cumulative_objectives = 0 self._cumulative_objectives = defaultdict(self._new_worker_cumsums) if self._phase_avgs: self._max_cumulative_objectives = max( chain.from_iterable( i.keys() for i in self._phase_avgs.values() ) ) def _new_worker_cumsums(self): return _WorkerCums( done=False, worker_avgs=defaultdict(self._new_worker_sums_deque) ) def _new_worker_sums_deque(self): return deque(maxlen=self._max_cumulative_objectives) def brain_flow_control( self, brain: Brain, message: MuscleUpdateRequest ) -> Tuple[SimulationFlowControl, Dict]: """ Allows a learning process to control the simulation flow. A learner can control the simulation, e.g., by indicating that the simulation should be reset or can end when it has become good enough. Descendant classes can reimplement this method. They will receive access to the respective agent's ::`Brain`, which contains all the necessary information (e.g., its memory, training success, etc.) Returns ------- Tuple of ::`SimulationFlowControl` and Dict: An indicator for simulation control: The flow control indicator with the highest priority (i.e., highest value number in the enum) wins. The second element contains the dictionary of computed averages, indexed by the agent's name. E.g., ``{"my_agent_name": {10: 5.6}}`` """ conditions = self._brain_avgs[brain.name] windows = sorted(list(conditions.keys())) fc_data = { brain.name: { w: np.mean( brain.memory.tail( w, include_only=[message.sender] ).objective ) for w in windows if len(brain.memory.tail(w)) == w } } fc = ( SimulationFlowControl.RESET if any(v >= conditions[k] for k, v in fc_data[brain.name].items()) else SimulationFlowControl.CONTINUE ) fc_data[brain.name]["cumsum"] = ( brain.memory.tail(len(brain.memory)).objective.sum().item() ) # Add this for phase fc fc_data[brain.name]["avg"] = brain.memory.tail( len(brain.memory) ).objective.sum().item() / float( len(brain.memory) if len(brain.memory) > 0 else 1 ) # Add this for phase fc if fc.value > SimulationFlowControl.CONTINUE.value: LOG.info( "%s's rollout worker %s " "meets objective value termination condition " "for this episode: %s", brain.name, message.sender, fc_data, ) else: LOG.debug( "%s needs to continue: %s < %s", brain.name, fc_data[brain.name], ) return fc, fc_data def phase_flow_control( self, run_governor: RunGovernor, message: SimulationControllerTerminationRequest, ) -> Tuple[SimulationFlowControl, Any]: if self._stop_phase_for_all: # Short-circuit of wanted return SimulationFlowControl.STOP_PHASE, {} if ( message.flow_control_indicator.value < SimulationFlowControl.RESET.value ): # Safety check, consider only RESET and above: return message.flow_control_indicator, {} if not self._phase_avgs: # End phase once one worker is successful self._stop_phase_for_all = True return SimulationFlowControl.STOP_PHASE, {} assert run_governor.experiment_run is not None # When the SC requests termination, it usually means that something # has ended. So we at least want to RESET. # Otherwise, we'll just go through all cumulative sums. for brain, conditions in message.flow_control_data.get( self.__class__.__name__, (SimulationFlowControl.CONTINUE, {}) )[1].items(): brain_data = self._cumulative_objectives[brain] if brain_data.done: # Any worker was already successful return SimulationFlowControl.STOP_PHASE, {} avgs = brain_data.worker_avgs[message.sender] avgs.append(conditions["avg"]) target_conds = self._phase_avgs[brain] for win, target in target_conds.items(): if win <= 0: # Safety net for user data return SimulationFlowControl.RESET, {} if len(avgs) < win: continue recorded_sum = sum( i for i in itertools.islice(avgs, len(avgs) - win, len(avgs)) ) avg = recorded_sum / float(win) if avg >= target: LOG.info( "%s terminates the current phase " "according to %s having avg%d %.2f > %.2f", self.__class__.__name__, brain, win, avg, target, ) brain_data.done = True return SimulationFlowControl.STOP_PHASE, { win: recorded_sum } # Usual default case is RESET: return SimulationFlowControl.RESET, {} def check_termination(self, message, component=None): return False