Source code for palaestrai.store.query

"""
Store Query API.

palaestrAI provides a hierarchical database schema where experiments,
experiment runs, agent/environment configurations, as well as states,
actions, and rewards are stored.
The ::`palaestrai.store.query` module provides convenience methods for the data
that is requested most of the time, such as experiment configuration or agent
rewards.
All data is returned either as pandas DataFrame or dask DataFrame, depending on
the expected size of the data and query options.

All query functions offer some parameters to filte the query.
The easiest option is to pass a data frame via the ``like_dataframe``
parameter. This constructs the query according to the contents of the data
frame:
The column names are the table's attributes and all data frame contents are
used as filter predicate in to construct a query using the schema of
``WHERE column_name IN [cell1, cell2, cell3] AND column_name_2 IN ...``.
More colloquially, this means that the data frame passed via the
``like_dataframe`` parameter contains the data used for filtering.
If a data frame contains the columns ``experiment_run_uid`` and
``agent_name``, and the contents are ``1``, ``2``, and ``a1`` and ``a2``,
respectively, then the results from the database contain only those rows where
the experiment run UID is either ``1`` or ``2``, and the agent name is
either ``a1`` or ``a2``.
In addition, each query function also has explicitly spelled out parameters
for filtering.
E.g., with ``experiment_run_uids``, only the mentioned experiment run UIDs
are being queried.
If multiple parameters are specified, they are joined with an implicit
logical *and*.
E.g., if both ``experiment_run_uids`` and ``experiment_run_phase_uids`` are
specified, then the database is queried for data that belongs to the
specified experiment runs AND the specified experiment run phases.
The resulting query that is rendered is equivalent to
``... experiment_run.uid IN ... AND experiment_run_phase.uid IN ...``.

In addition, each query function allows for a user-defined predicate to be
passed. This parameter, ``predicate``, is expected to be a callable (e.g., a
lambda expression) that receives the query object *after* all other query
options are applied. It is expected to return the (modified) query object.
For example::

    df: pandas.DataFrame = experiments_and_runs_configurations(
        predicate=lambda query: query.limit(5)
    )

This would select only five entries from the database.

All results are ordered in *descending* order, such that the newest entries
are always first.
I.e., the ``limit(5)`` example above would automatically select the five newest
entries.

In order to avoid confusion, relation names and attributes are joined by an
underscore (``_``).
E.g., the ID of an environment is represented by the ``environment_id``
attribute in the resulting data frame.

Each function expects a :class:`~Session` object as first argument.
I.e., the access credentials will be those that are stored in the current
runtime configuration.
If the ``session`` parameter is not supplied, a new session will be
automatically created.
However, the store API does not take care of cleaning sessions. I.e., running
more than one query function without explicitly supplying a session object will
most likely lead to dangling open connections to the database.
The best solution is to use a context manager, e.g.::

    from palaestrai.store import Session
    import palaestrai.store.query as palq

    with Session() as session:
        ers = palq.experiments_and_runs_configurations(session)

.. warning::
    The query API is currently in *beta state*. That means that it is currently
    caught in the impeding API changes. This includes the way the
    :class:`~.SensorInformation`, :class:`~.ActuatorInformation`, and
    :class:`~.RewardInformation` classes are serialized.
    If you encounter bugs, please report them at the
    `palaestrAI issue tracker <https://gitlab.com/arl2/palaestrai/-/issues>`_
    for the *store* subsystem.
"""

from __future__ import annotations

import logging
from io import BytesIO
from typing import (
    TYPE_CHECKING,
    Optional,
    List,
    Union,
    Callable,
    Tuple,
    Dict,
    Generator,
    Any,
    cast,
)

import pandas as pd

import jsonpickle
import jsonpickle.ext.numpy as jsonpickle_numpy

import sqlalchemy.orm
import sqlalchemy as sa
from pandas._libs.missing import NAType
from sqlalchemy import func
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm.attributes import QueryableAttribute

from .database_model import (
    Experiment,
    ExperimentRun,
    ExperimentRunInstance,
    ExperimentRunPhase,
    Agent,
    Environment,
    MuscleAction,
    BrainState,
)
from .query_debug_utils import _debug_print_bound_stmt
from .session import Session
from palaestrai.core import RuntimeConfig
from ..agent import BrainLocation, FileBrainDumper

if TYPE_CHECKING:
    import dask.config as dc
    import dask.dataframe as dd

    Predicate = Callable[[sqlalchemy.sql.select], sqlalchemy.sql.select]
    AttribPredicate = Union[
        Callable[
            [Dict[str, QueryableAttribute]],
            Union[
                Generator[QueryableAttribute, None, None],
                Tuple[QueryableAttribute],
            ],
        ],
    ]


LOG = logging.getLogger(__name__)


def _like_df_list(
    like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]], key: str
) -> List[Any]:
    if like_dataframe is None:
        return []
    values = cast(Any, like_dataframe).get(key, [])
    return [] if values is None else list(values)


def _like_df_str(
    like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]], key: str
) -> str:
    if like_dataframe is None:
        return ""
    value = cast(Any, like_dataframe).get(key, "")
    return value if isinstance(value, str) else ""


def make_deserialization_session():
    jsonpickle_numpy.register_handlers()
    _db_engine = sqlalchemy.create_engine(
        RuntimeConfig().store_uri,
        json_deserializer=jsonpickle.loads,
    )
    _db_session_maker = sqlalchemy.orm.sessionmaker()
    _db_session_maker.configure(bind=_db_engine)
    dbh = _db_session_maker()
    return dbh


def _default_attrib_func(query_attribute_dict: Dict[str, QueryableAttribute]):
    return (
        query_attribute
        for query_attribute_label, query_attribute in query_attribute_dict.items()
    )


[docs] def experiments_and_runs_configurations( session: Optional[sqlalchemy.orm.Session] = None, attrib_func: Optional[AttribPredicate] = None, predicate: Predicate = lambda query: query, index_col: Optional[str] = None, ) -> pd.DataFrame: """ Known Experiments, Experiment Runs, Instances, and Phases. Creates a comprehensive list containing information about * experiments * experiment runs * experiment run instances * experiment run phases. Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established attrib_func : Callable[[Dict[str, QueryableAttribute]], Tuple[QueryableAttribute]] A function that can modify each ::`sqlalchemy.orm.attributes.QueryableAttribute` for the select statement. This can be done by e.g. adding sqlalchemy functions when mapping from the dict to attributes. The dict thereby maps the corresponding labels to the attributes. For an example on how to get the max experiment_run_instance_id, see the Examples section. predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query index_col : Optional[str] = "experiment_run_instance_uid" The column used as the index for the returned DataFrame Returns ------- pandas.DataFrame: A dataframe containing the following columns: * experiment_id * experiment_name * experiment_document * experiment_run_id * experiment_run_uid * experiment_run_document * experiment_run_instance_id * experiment_run_instance_uid * experiment_run_phase_id * experiment_run_phase_uid * experiment_run_phase_mode Examples ------- >>> from sqlalchemy import func >>> import palaestrai.store.query as palq >>> from palaestrai.store.database_model import ( ... Experiment, ... ExperimentRun, ... ) >>> experiment_run_uid = "Dummy experiment run where the agents take turns" >>> erc = palq.experiments_and_runs_configurations( ... dbh, # Session needs to be defined ... attrib_func=lambda query_attribute_dict: ( ... func.max(query_attribute) ... if query_attribute_label == "experiment_run_instance_id" ... else query_attribute ... for query_attribute_label, query_attribute in query_attribute_dict.items() ... ), ... predicate=lambda query: query.filter( ... Experiment.name ... == "Dummy Experiment record for ExperimentRun " ... + str(experiment_run_uid), ... ExperimentRun.uid == experiment_run_uid, ... ), ... ) """ if session is None: session = Session() if attrib_func is None: attrib_func = _default_attrib_func query_attribute_dict = { "experiment_id": Experiment.id, "experiment_name": Experiment.name, "experiment_document": Experiment.document, "experiment_run_id": ExperimentRun.id, "experiment_run_uid": ExperimentRun.uid, "experiment_run_document": ExperimentRun.document, "experiment_run_instance_id": ExperimentRunInstance.id, "experiment_run_instance_uid": ExperimentRunInstance.uid, "experiment_run_phase_id": ExperimentRunPhase.id, "experiment_run_phase_uid": ExperimentRunPhase.uid, "experiment_run_phase_mode": ExperimentRunPhase.mode, } query_attribute_dict = { query_attribute_label: query_attribute.label(query_attribute_label) for query_attribute_label, query_attribute in query_attribute_dict.items() } query = ( sa.select(*attrib_func(query_attribute_dict)) .select_from(Experiment) .join(ExperimentRun) .join(ExperimentRunInstance) .join(ExperimentRunPhase) ) query = predicate(query) _debug_print_bound_stmt(query, session) return pd.read_sql_query(query, session.bind, index_col=index_col)
def get_max_experiment_run_instance_uid( session: Optional[sqlalchemy.orm.Session] = None, experiment_name: Optional[str] = None, experiment_run_uid: Optional[str] = None, ) -> Tuple[str, pd.DataFrame]: if session is None: session = Session() query_filter = [] if experiment_name is not None: query_filter.append(Experiment.name == experiment_name) if experiment_run_uid is not None: query_filter.append(ExperimentRun.uid == experiment_run_uid) erc_instance_id: pd.DataFrame = experiments_and_runs_configurations( session, attrib_func=lambda query_attribute_dict: ( func.max(query_attribute_dict["experiment_run_instance_id"]).label( "experiment_run_instance_id" ), ), predicate=lambda query: query.filter(*tuple(query_filter)), index_col=None, ) experiment_run_instance_id: Optional[int] = ( erc_instance_id.experiment_run_instance_id.iloc[0] ) if experiment_run_instance_id is None: raise RuntimeError( "There are no run instances and thus no latest entry to fetch" ) query_filter.append( ExperimentRunInstance.id == str(experiment_run_instance_id) ) erc_instance_uid: pd.DataFrame = experiments_and_runs_configurations( session, predicate=lambda query: query.filter(*tuple(query_filter)), index_col=None, ) experiment_run_instance_uid: str = ( erc_instance_uid.experiment_run_instance_uid.iloc )[0] return experiment_run_instance_uid, erc_instance_uid
[docs] def agents_configurations( session: Optional[sqlalchemy.orm.Session] = None, like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]] = None, experiment_ids: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> pd.DataFrame: """ Configurations of agents. Creates a composite list containing information about * agents * associated experiment run phases * associated experiment run instances * associated experiment runs * associated experiments Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established like_dataframe : Optional[Union[pd.DataFrame, dd.DataFrame]] = None Uses the given dataframe to construct a search predicate. If any of the columns ``experiment_uid``, ``experiment_run_uid``, and/or ``experiment_run_phase_uid`` are given, then the data in the frame is used in a ``WHERE ... IN ...``-style clause. If more than one of these columns are present, they are joined by ``AND``. Note the singular form, e.g., ``experiment_uid`` (singular), not ``experiment_run_uids`` (plural). The reason for this seemingly inconsistent naming is that the singular form is used in the column headers of the data frames that are returned by all query functions. Thus, the ``like_dataframe`` parameter allows to pass a data frame from another query function (e.g., ::`~experiments_and_runs_configurations`) for filtering. Note that the index of the data frame is *not* used. experiment_ids : Optional[List[str]] An iterable containing experiment IDs to filter for experiment_run_uids : Optional[List[str]] An iterable containing experiment run UIDs to filter for experiment_run_instance_uids : Optional[List[str]] An iterable containing experiment run instance UIDs to filter for experiment_run_phase_uids : Optional[List[str]] An iterable containing experiment run phase UIDs to filter for predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query after all other predicates have been applied Returns ------- pandas.DataFrame A dataframe containing the following columns: * agent_id * agent_uid * agent_name * agent_configuration * experiment_run_phase_id * experiment_run_phase_uid * experiment_run_phase_configuration * experiment_run_phase_configuration * experiment_run_instance_uid * experiment_run_id * experiment_run_uid * experiment_id * experiment_name """ if session is None: session = Session() experiment_ids = (experiment_ids or []) + _like_df_list( like_dataframe, "experiment_ids" ) experiment_run_uids = (experiment_run_uids or []) + _like_df_list( like_dataframe, "experiment_run_uid" ) experiment_run_instance_uids = ( experiment_run_instance_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_instance_uid") experiment_run_phase_uids = ( experiment_run_phase_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_phase_uid") query = ( sa.select( Agent.id.label("agent_id"), Agent.uid.label("agent_uid"), Agent.name.label("agent_name"), Agent.configuration.label("agent_configuration"), ExperimentRunPhase.id.label("experiment_run_phase_id"), ExperimentRunPhase.uid.label("experiment_run_phase_uid"), ExperimentRunPhase.configuration.label( "experiment_run_phase_configuration" ), ExperimentRunInstance.uid.label("experiment_run_instance_uid"), ExperimentRun.id.label("experiment_run_id"), ExperimentRun.uid.label("experiment_run_uid"), Experiment.id.label("experiment_id"), Experiment.name.label("experiment_name"), ) .select_from(Agent) .join(ExperimentRunPhase) .join(ExperimentRunInstance) .join(ExperimentRun) .join(Experiment) .order_by(Agent.id.desc()) ) if experiment_run_phase_uids: query = query.where( ExperimentRunPhase.uid.in_(experiment_run_phase_uids) ) if experiment_run_instance_uids: query = query.where( ExperimentRunInstance.uid.in_(experiment_run_instance_uids) ) if experiment_run_uids: query = query.where(ExperimentRun.uid.in_(experiment_run_uids)) if experiment_ids: query = query.where(Experiment.id.in_(experiment_ids)) query = predicate(query) return pd.read_sql_query(query, session.bind, index_col="agent_id")
[docs] def environments_configurations( session: Optional[sqlalchemy.orm.Session] = None, like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]] = None, experiment_uids: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> pd.DataFrame: """ Configurations of Environments. Creates a composite list containing information about * agents * associated experiment run phases * associated experiment run instances * associated experiment runs * associated experiments Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established like_dataframe : Optional[Union[pd.DataFrame, dd.DataFrame]] = None Uses the given dataframe to construct a search predicate. Refer to the parameter documentation of ::`~experiments_and_runs_configurations` experiment_uids : Optional[List[str]] An iterable containing experiment UIDs to filter for experiment_run_uids : Optional[List[str]] An iterable containing experiment run UIDs to filter for experiment_run_phase_uids : Optional[List[str]] An iterable containing experiment run phase UIDs to filter for predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query after all other predicates have been applied Returns ------- pandas.DataFrame A dataframe containing the following columns: * environment_id * environment_uid * environment_type * environment_parameters * experiment_run_phase_id * experiment_run_phase_uid * experiment_run_phase_configuration * experiment_run_phase_configuration * experiment_run_instance_uid * experiment_run_id * experiment_run_uid * experiment_id * experiment_name """ if session is None: session = Session() experiment_uids = (experiment_uids or []) + _like_df_list( like_dataframe, "experiment_uid" ) experiment_run_uids = (experiment_run_uids or []) + _like_df_list( like_dataframe, "experiment_run_uid" ) experiment_run_phase_uids = ( experiment_run_phase_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_phase_uid") query = ( sa.select( Environment.id.label("environment_id"), Environment.uid.label("environment_uid"), Environment.type.label("environment_type"), Environment.parameters.label("environment_parameters"), ExperimentRunPhase.id.label("experiment_run_phase_id"), ExperimentRunPhase.uid.label("experiment_run_phase_uid"), ExperimentRunPhase.configuration.label( "experiment_run_phase_configuration" ), ExperimentRunInstance.uid.label("experiment_run_instance_uid"), ExperimentRun.id.label("experiment_run_id"), ExperimentRun.uid.label("experiment_run_uid"), Experiment.id.label("experiment_id"), Experiment.name.label("experiment_name"), ) .select_from(Environment) .join(Agent.experiment_run_phase) .join(ExperimentRunPhase.experiment_run_instance) .join(ExperimentRunInstance.experiment_run) .join(ExperimentRun.experiment) .order_by(Environment.id.desc()) ) if experiment_run_phase_uids: query = query.where( ExperimentRunPhase.uid.in_(experiment_run_phase_uids) ) if experiment_run_uids: query = query.where(ExperimentRun.uid.in_(experiment_run_uids)) if experiment_uids: query = query.where(ExperimentRun.uid.in_(experiment_uids)) query = predicate(query) _debug_print_bound_stmt(query, session) return pd.read_sql_query(query, session.bind, index_col="environment_id")
def make_muscle_actions_query( experiment_ids: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, modes: Optional[List[str]] = None, episodes: Optional[List[int]] = None, agent_names: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> sqlalchemy.sql.expression.Select: query = ( sa.select( MuscleAction.id.label("muscle_action_id"), MuscleAction.walltime.label("muscle_action_walltime"), MuscleAction.simtimes.label("muscle_action_simtimes"), MuscleAction.rollout_worker_uid.label("rollout_worker_uid"), MuscleAction.sensor_readings.label("muscle_sensor_readings"), MuscleAction.actuator_setpoints.label("muscle_actuator_setpoints"), MuscleAction.rewards.label("muscle_action_rewards"), MuscleAction.objective.label("muscle_action_objective"), MuscleAction.done.label("muscle_action_done"), MuscleAction.mode.label("mode"), MuscleAction.episode.label("episode"), Agent.id.label("agent_id"), Agent.uid.label("agent_uid"), Agent.name.label("agent_name"), ExperimentRunPhase.id.label("experiment_run_phase_id"), ExperimentRunPhase.uid.label("experiment_run_phase_uid"), ExperimentRunPhase.configuration.label( "experiment_run_phase_configuration" ), ExperimentRunInstance.uid.label("experiment_run_instance_uid"), ExperimentRun.id.label("experiment_run_id"), ExperimentRun.uid.label("experiment_run_uid"), Experiment.id.label("experiment_id"), Experiment.name.label("experiment_name"), ) .select_from(MuscleAction) .join(Agent) .join(ExperimentRunPhase) .join(ExperimentRunInstance) .join(ExperimentRun) .join(Experiment) .where(MuscleAction.actuator_setpoints != sa.JSON.NULL) ) if experiment_ids: query = query.where(Experiment.id.in_(experiment_ids)) if experiment_run_uids: query = query.where(ExperimentRun.uid.in_(experiment_run_uids)) if experiment_run_instance_uids: query = query.where( ExperimentRunInstance.uid.in_(experiment_run_instance_uids) ) if experiment_run_phase_uids: query = query.where( ExperimentRunPhase.uid.in_(experiment_run_phase_uids) ) if modes: query = query.where(MuscleAction.mode.in_(modes)) if episodes: query = query.where(MuscleAction.episode.in_(episodes)) if agent_names: query = query.where(Agent.name.in_(agent_names)) query = predicate(query) return query def _parse_objects_in_row(x): a = [ ( jsonpickle.loads(x_i.replace("'", '"').replace(": None", ": null")) if type(x_i) is str else (None if type(x_i) is NAType else x_i) ) for x_i in x ] return a def _query_to_dataframe( session: sqlalchemy.orm.Session, query: sqlalchemy.sql.select ) -> Union[pd.DataFrame, dd.DataFrame]: if query._limit_clause is not None or query._offset_clause is not None: return pd.read_sql_query( query, session.bind, index_col="muscle_action_id" ) else: import dask.config as dc import dask.dataframe as dd # The query NEEDS to be wrapped with an outer 'select * from ...' # because in the tested case of having a label for # MuscleAction.id ("muscle_action_id") and used as the index_col, the # limit that dask injects into the query does not reference this # column. In the case of an unlabeled column and trying to reference # it by "MuscleAction.id" it neither works, because calling # "result.keys()" on the result of a sql query execution, the label # of the column gets shrinken to "id", which is ambiguous and does # not match with the provided index_col. query = sa.select("*").select_from(query) _debug_print_bound_stmt(query, session) dc.set({"dataframe.convert-string": False}) df = dd.read_sql_query( query, session.bind.url.render_as_string(hide_password=False), index_col="muscle_action_id", engine_kwargs={"json_deserializer": jsonpickle.loads}, ).compute() # The json deserialisation only works for postgres but not with dask # read_sql_query on sqlite3 dbs, that's why the parsing needs to be # handled externally if len(df) > 0: df = df.apply(_parse_objects_in_row) return df
[docs] def muscle_actions( session: Optional[sqlalchemy.orm.Session] = None, like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]] = None, experiment_ids: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, modes: Optional[List[str]] = None, episodes: Optional[List[int]] = None, agent_names: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> Union[pd.DataFrame, dd.DataFrame]: """All action data of a ::`~.Muscle`: readings, setpoints, and rewards The resulting dataframe contains information about: * muscle sensor readings * muscle actuator setpoints * muscle rewards * experiment run phases * experiment run instances * experiment runs * experiments Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established like_dataframe : Optional[Union[pd.DataFrame, dd.DataFrame]] = None Uses the given dataframe to construct a search predicate. Refer to the parameter documentation of ::`~experiments_and_runs_configurations` experiment_ids : Optional[List[str]] An iterable containing experiment IDs to filter for experiment_run_uids : Optional[List[str]] An iterable containing experiment run UIDs to filter for experiment_run_instance_uids : Optional[List[str]] An iterable containing experiment run instance UIDs to filter for experiment_run_phase_uids : Optional[List[str]] An iterable containing experiment run phase UIDs to filter for modes : Optional[List[str]] = None An iterable containing MuscleAction modes to filter for episodes : Optional[List[int]] = None An iterable containing MuscleAction episodes to filter for agent_names : Optional[List[str]] = None An iterable containing agent names to filter for predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query after all other predicates have been applied Returns ------- Union[pd.DataFrame, dd.DataFrame]: This method returns a dask dataframe by default, unless the predicate adds a ``LIMIT`` or ``OFFSET`` clause. The dataframe contains the following columns: * muscle_action_id * muscle_action_walltime * muscle_action_simtimes * muscle_sensor_readings * muscle_actuator_setpoints * muscle_action_rewards * muscle_action_objective * mode * episode * agent_id * agent_uid * agent_name * rollout_worker_uid * experiment_run_phase_id * experiment_run_phase_uid * experiment_run_phase_configuration * experiment_run_instance_uid * experiment_run_id * experiment_run_uid * experiment_id * experiment_name """ if session is None: session = Session() experiment_ids = (experiment_ids or []) + _like_df_list( like_dataframe, "experiment_id" ) experiment_run_uids = (experiment_run_uids or []) + _like_df_list( like_dataframe, "experiment_run_uid" ) experiment_run_instance_uids = ( experiment_run_instance_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_instance_uid") experiment_run_phase_uids = ( experiment_run_phase_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_phase_uid") query = make_muscle_actions_query( experiment_ids=experiment_ids, experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, modes=modes, episodes=episodes, agent_names=agent_names, predicate=predicate, ) return _query_to_dataframe(session, query)
def _latest_ids( session: Optional[sqlalchemy.orm.Session] = None, experiment_name: Optional[str] = None, experiment_run_uid: Optional[str] = None, ): assert experiment_name is not None assert experiment_run_uid is not None if session is None: session = Session() ( max_experiment_run_instance_uid, erc_max_experiment_run_instance_uid, ) = get_max_experiment_run_instance_uid( session=session, experiment_name=experiment_name, experiment_run_uid=experiment_run_uid, ) experiment_ids = [ str(erc_max_experiment_run_instance_uid.experiment_id.iloc[0]) ] experiment_run_uids = [ str(erc_max_experiment_run_instance_uid.experiment_run_uid.iloc[0]) ] experiment_run_instance_uids = [ str( erc_max_experiment_run_instance_uid.experiment_run_instance_uid.iloc[ 0 ] ) ] return experiment_ids, experiment_run_uids, experiment_run_instance_uids def muscle_action_values( session: Optional[sqlalchemy.orm.Session] = None, like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]] = None, experiment_name: Optional[str] = None, experiment_run_uid: Optional[str] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, modes: Optional[List[str]] = None, episodes: Optional[List[int]] = None, agent_names: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> Union[pd.DataFrame, dd.DataFrame]: """The action data of a ::`~.Muscle`: readings, setpoints, and rewards with the plain values of the information objects. If no ``experiment_run_instance_uid`` is provided, the latest instance (max run instance id) is queried with. Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established like_dataframe : Optional[Union[pd.DataFrame, dd.DataFrame]] = None Uses the given dataframe to construct a search predicate. Refer to the parameter documentation of ::`~experiments_and_runs_configurations` experiment_name : Optional[str] An str for the experiment name to filter for experiment_run_uid : Optional[str] An str for the run uid to filter for experiment_run_instance_uids : Optional[List[str]] An iterable containing experiment run instance UIDs to filter for experiment_run_phase_uids : Optional[List[str]] An iterable containing experiment run phase UIDs to filter for modes : Optional[List[str]] An iterable containing MuscleAction modes to filter for episodes : Optional[List[int]] An iterable containing MuscleAction episodes to filter for agent_names : Optional[List[str]] = None An iterable containing agent names to filter for predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query after all other predicates have been applied Returns ------- Union[pd.DataFrame, dd.DataFrame]: This method returns a dask dataframe by default, unless the predicate adds a ``LIMIT`` or ``OFFSET`` clause. The dataframe contains the following columns: * muscle_action_id * muscle_action_walltime * muscle_action_simtimes * muscle_action_simtime_ticks * muscle_action_simtime_timestamp * muscle_sensor_readings * muscle_actuator_setpoints * muscle_action_rewards * muscle_action_objective * mode * episode * agent_id * agent_uid * agent_name * experiment_run_phase_id * experiment_run_phase_uid * experiment_run_phase_configuration * experiment_run_instance_uid * experiment_run_id * experiment_run_uid * experiment_id * experiment_name """ ( experiment_ids, experiment_run_uids, max_experiment_run_instance_uid_list, ) = _latest_ids(session, experiment_name, experiment_run_uid) if experiment_run_instance_uids is None: experiment_run_instance_uids = max_experiment_run_instance_uid_list erc: Union[pd.DataFrame, dd.DataFrame] = muscle_actions( session, like_dataframe=like_dataframe, experiment_ids=experiment_ids, experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=( experiment_run_instance_uids if not None else max_experiment_run_instance_uid_list ), experiment_run_phase_uids=experiment_run_phase_uids, modes=modes, episodes=episodes, agent_names=agent_names, predicate=predicate, ) LOG.debug("Latest muscle action values (short): " + str(erc)) LOG.debug("Latest muscle action values (long) : " + erc.to_string()) def value_dict_extract_func(information_object_list): return ( { information_object.uid: information_object.value for information_object in information_object_list if type(information_object) != str } if information_object_list is not None else None ) erc.muscle_sensor_readings = erc.muscle_sensor_readings.apply( value_dict_extract_func ) erc.muscle_actuator_setpoints = erc.muscle_actuator_setpoints.apply( value_dict_extract_func ) erc.muscle_action_rewards = erc.muscle_action_rewards.apply( value_dict_extract_func ) # Try to extract nicely formatted timestamps: if len(erc.muscle_action_simtimes) > 0: ticks_envs = [ k for k, v in erc.muscle_action_simtimes.iloc[0].items() if ( hasattr(v, "simtime_ticks") or ("simtime_ticks" in v and v["simtime_ticks"] is not None) ) ] if ticks_envs: ticks_env = ticks_envs[0] erc["muscle_action_simtime_ticks"] = erc[ "muscle_action_simtime_ticks" ] = erc.muscle_action_simtimes.apply( lambda muscle_action_simtime: ( muscle_action_simtime[ticks_env].simtime_ticks if hasattr( muscle_action_simtime[ticks_env], "simtime_ticks" ) else muscle_action_simtime[ticks_env]["simtime_ticks"] ) ) timestamp_envs = [ k for k, v in erc.muscle_action_simtimes.iloc[-1].items() if ( hasattr(v, "simtime_timestamp") or ( "simtime_timestamp" in v and v["simtime_timestamp"] is not None ) ) ] if timestamp_envs: timestamp_env = timestamp_envs[0] erc["muscle_action_simtime_timestamp"] = ( erc.muscle_action_simtimes.apply( lambda muscle_action_simtime: ( muscle_action_simtime[timestamp_env].simtime_timestamp if hasattr( muscle_action_simtime[timestamp_env], "simtime_timestamp", ) else muscle_action_simtime[timestamp_env][ "simtime_timestamp" ] ) ) ) return erc def muscle_action_values_non_empty_multi_index( session: Optional[sqlalchemy.orm.Session] = None, like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]] = None, experiment_name: Optional[str] = None, experiment_run_uid: Optional[str] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, modes: Optional[List[str]] = None, episodes: Optional[List[int]] = None, agent_names: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> Union[pd.DataFrame, dd.DataFrame]: """The action data of a ::`~.Muscle`: readings, setpoints, and rewards with the plain values of the information objects. If no ``experiment_run_instance_uid`` is provided, the latest instance (max run instance id) is queried with. The returned ::`~.DataFrame` is in a multi index (column) format, because the sensor readings and actuator setpoints have dynamically many columns. Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established like_dataframe : Optional[Union[pd.DataFrame, dd.DataFrame]] = None Uses the given dataframe to construct a search predicate. Refer to the parameter documentation of ::`~experiments_and_runs_configurations` experiment_name : Optional[str] An str for the experiment name to filter for experiment_run_uid : Optional[str] An str for the run uid to filter for experiment_run_phase_uids : Optional[List[str]] An iterable containing experiment run phase UIDs to filter for modes : Optional[List[str]] An iterable containing MuscleAction modes to filter for episodes : Optional[List[int]] An iterable containing MuscleAction episodes to filter for agent_names : Optional[List[str]] An iterable containing agent names to filter for predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query after all other predicates have been applied Returns ------- Union[pd.DataFrame, dd.DataFrame]: This method returns a dask dataframe by default, unless the predicate adds a ``LIMIT`` or ``OFFSET`` clause. The dataframe with the index column of 'muscle_action_id's contains only non-empty rows with the following columns: * muscle_action_walltime * muscle_action_simtime_ticks * muscle_action_simtime_timestamp * muscle_sensor_readings With each sensor as a separate subcolumn * muscle_actuator_setpoints With each actuator as a separate subcolumn * muscle_action_rewards With each reward metric as a separate subcolumn * muscle_action_objective * agent_uid * experiment_run_phase_uid """ if session is None: session = Session() erc = muscle_action_values( session=session, like_dataframe=like_dataframe, experiment_name=experiment_name, experiment_run_uid=experiment_run_uid, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, modes=modes, episodes=episodes, agent_names=agent_names, predicate=predicate, ) erc_non_empty = erc[ erc.apply( lambda x: x["muscle_actuator_setpoints"] is not None and x["muscle_sensor_readings"] is not None and x["muscle_action_rewards"] is not None and len(x["muscle_actuator_setpoints"]) > 0 and len(x["muscle_sensor_readings"]) > 0 and len(x["muscle_action_rewards"]) > 0, axis=1, ) ] key_list = list( set(erc_non_empty.columns) - { "muscle_actuator_setpoints", "muscle_sensor_readings", "muscle_action_rewards", } ) dict_erc_non_empty = {(key, key): erc_non_empty[key] for key in key_list} erc_non_empty_sensor_readings_dict = { ("muscle_sensor_readings", env): [ sensor_reading[env] for sensor_reading in erc_non_empty.muscle_sensor_readings ] for env in erc_non_empty.muscle_sensor_readings.iloc[0].keys() } dict_erc_non_empty.update(erc_non_empty_sensor_readings_dict) erc_non_empty_actuator_setpoints_dict = { ("muscle_actuator_setpoints", env): [ actuator_setpoints[env] for actuator_setpoints in erc_non_empty.muscle_actuator_setpoints ] for env in erc_non_empty.muscle_actuator_setpoints.iloc[0].keys() } dict_erc_non_empty.update(erc_non_empty_actuator_setpoints_dict) erc_non_empty_rewards_dict = { ("muscle_action_rewards", key): [ rewards[key] for rewards in erc_non_empty.muscle_action_rewards ] for key in erc_non_empty.muscle_action_rewards.iloc[0].keys() } dict_erc_non_empty.update(erc_non_empty_rewards_dict) df_erc_non_empty = pd.DataFrame( dict_erc_non_empty, index=erc_non_empty.index ) erc_non_empty = df_erc_non_empty return erc_non_empty def _make_unnest_subquery( query: Any, filter_subquery: Optional[str], # e.g.: "%.vm_pu" predicate: Predicate = lambda query: query, ): lateral_join = ( func.jsonb_to_recordset( func.jsonb_path_query_array( query.c.muscle_sensor_readings, '$[*]."py/state"' ) ) .table_valued( sa.column("uid", postgresql.TEXT), sa.column("value", postgresql.REAL), joins_implicitly=True, ) .render_derived("sensr", with_types=True) .lateral() ) query = sa.select( query.c.muscle_action_simtimes, lateral_join.c.uid.label("sensor_uid"), lateral_join.c.value.label("sensor_value"), ).select_from(lateral_join) if filter_subquery: query = query.filter(lateral_join.c.uid.like(filter_subquery)) query = predicate(query) return query def make_percentile_query( session: Optional[sqlalchemy.orm.Session] = None, experiment_ids: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, agent_names: Optional[List[str]] = None, agent_ids: Optional[List[int]] = None, filter_subquery: Optional[str] = None, predicate: Predicate = lambda query: query, ) -> sqlalchemy.sql.expression.Select: query = _make_generic_selectable_query( ( MuscleAction.simtimes.label("muscle_action_simtimes"), MuscleAction.sensor_readings.label("muscle_sensor_readings"), ), experiment_ids=experiment_ids, experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, agent_names=agent_names, agent_ids=agent_ids, ).cte("muscle_actions") _debug_print_bound_stmt(query, session) query = _make_unnest_subquery(query, filter_subquery, predicate).cte( "subquery" ) _debug_print_bound_stmt(query, session) query = sa.select( ( func.regexp_replace( query.c.sensor_uid, r"^.*bus-(\d+).vm_pu$", "Bus \\1" ) if filter_subquery is not None and "vm_pu" in filter_subquery else query.c.sensor_uid ).label("sensor_uid"), query.c.sensor_value, ).select_from(query) _debug_print_bound_stmt(query, session) query = sa.select( query.c.sensor_uid, func.percentile_cont(0.25) .within_group(query.c.sensor_value) .label("percentile_25"), func.percentile_cont(0.50) .within_group(query.c.sensor_value) .label("percentile_50"), func.percentile_cont(0.75) .within_group(query.c.sensor_value) .label("percentile_75"), func.percentile_cont(1.00) .within_group(query.c.sensor_value) .label("percentile_100"), ).group_by(query.c.sensor_uid) _debug_print_bound_stmt(query, session) return query def quartiles( session: Optional[sqlalchemy.orm.Session] = None, like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]] = None, experiment_name: Optional[str] = None, experiment_run_uid: Optional[str] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, agent_names: Optional[List[str]] = None, agent_ids: Optional[List[int]] = None, filter_subquery: Optional[str] = None, predicate: Predicate = lambda query: query, ) -> Union[pd.DataFrame, dd.DataFrame]: """The quartiles over all values of the ::`~.SensorInformation`: readings of the specified experiment -run, -instance and -phase for the specified ::`~.Agent`:. The continuous percentiles for [0.25, 0.50, 0.75, 1.00] are retrieved using the integrated function ``percentile_cont`` from postgresql `Ordered-Set Aggregate Functions https://www.postgresql.org/docs/14/functions-aggregate.html`. If no ``experiment_run_instance_uid`` is provided, the latest instance (max run instance id) is queried with. Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established like_dataframe : Optional[Union[pd.DataFrame, dd.DataFrame]] = None Uses the given dataframe to construct a search predicate. Refer to the parameter documentation of ::`~experiments_and_runs_configurations` experiment_name : Optional[str] An str for the experiment name to filter for experiment_run_uid : Optional[str] An str for the run uid to filter for experiment_run_instance_uids : Optional[List[str]] An iterable containing experiment run instance UIDs to filter for experiment_run_phase_uids : Optional[List[str]] An iterable containing experiment run phase UIDs to filter for agent_names : Optional[List[str]] = None An iterable containing agent UIDs to filter for agent_ids : Optional[List[int]] = None An iterable containing agent IDs to filter for filter_subquery : Optional[str] = None A string to filter the unnesting subquery for predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query after all other predicates have been applied Returns ------- Union[pd.DataFrame, dd.DataFrame]: This method returns a dask dataframe by default, unless the predicate adds a ``LIMIT`` or ``OFFSET`` clause. The dataframe with the index column of 'muscle_action_id's contains only non-empty rows with the following columns: * muscle_action_walltime * muscle_action_simtime_ticks * muscle_action_simtime_timestamp * muscle_sensor_readings With each sensor as a separate subcolumn * muscle_actuator_setpoints With each actuator as a separate subcolumn * muscle_action_rewards With each reward metric as a separate subcolumn * muscle_action_objective * agent_uid * experiment_run_phase_uid """ if session is None: session = Session() assert session.bind.dialect.name == "postgresql", ( "Quartile calculation is currently only supported for postgresql", ) experiment_ids = None experiment_run_uids = None experiment_name = (experiment_name or "") + _like_df_str( like_dataframe, "experiment_name" ) experiment_run_uid = (experiment_run_uid or "") + _like_df_str( like_dataframe, "experiment_run_uid" ) experiment_run_instance_uids = ( experiment_run_instance_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_instance_uid") experiment_run_phase_uids = ( experiment_run_phase_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_phase_uid") if agent_ids is None or len(agent_ids) == 0: ( experiment_ids, experiment_run_uids, latest_experiment_run_instance_uids, ) = _latest_ids(session, experiment_name, experiment_run_uid) if len(experiment_run_instance_uids) == 0: experiment_run_instance_uids = latest_experiment_run_instance_uids query = make_percentile_query( session=session, experiment_ids=experiment_ids, experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, agent_names=agent_names, agent_ids=agent_ids, filter_subquery=filter_subquery, predicate=predicate, ) return pd.read_sql_query(query, session.bind, index_col="sensor_uid") def make_muscles_cumulative_objective_query( experiment_ids: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, experiment_run_phase_numbers: Optional[List[int]] = None, modes: Optional[List[str]] = None, muscle_action_episodes: Optional[List[int]] = None, agent_names: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> sqlalchemy.sql.expression.Select: query = ( sa.select( sa.func.max(MuscleAction.rollout_worker_uid).label( "rollout_worker_uid" ), MuscleAction.episode.label("muscle_actions_episode"), sa.func.sum(MuscleAction.objective).label( "muscle_cumulative_objective" ), Agent.uid.label("agent_uid"), Agent.name.label("agent_name"), ExperimentRunPhase.id.label("experiment_run_phase_id"), ExperimentRunPhase.uid.label("experiment_run_phase_uid"), ExperimentRunPhase.configuration.label( "experiment_run_phase_configuration" ), ExperimentRunInstance.uid.label("experiment_run_instance_uid"), ExperimentRun.id.label("experiment_run_id"), ExperimentRun.uid.label("experiment_run_uid"), Experiment.id.label("experiment_id"), Experiment.name.label("experiment_name"), ) .select_from(MuscleAction) .join(Agent) .join(ExperimentRunPhase) .join(ExperimentRunInstance) .join(ExperimentRun) .join(Experiment) .where(MuscleAction.actuator_setpoints != sa.JSON.NULL) .group_by( Agent.uid, Agent.name, ExperimentRunPhase.id, ExperimentRunPhase.uid, ExperimentRunPhase.configuration, ExperimentRunInstance.uid, ExperimentRun.id, ExperimentRun.uid, Experiment.id, Experiment.name, MuscleAction.episode, ) ) if experiment_ids: query = query.where(Experiment.id.in_(experiment_ids)) if experiment_run_uids: query = query.where(ExperimentRun.uid.in_(experiment_run_uids)) if experiment_run_instance_uids: query = query.where( ExperimentRunInstance.uid.in_(experiment_run_instance_uids) ) if experiment_run_phase_uids: query = query.where( ExperimentRunPhase.uid.in_(experiment_run_phase_uids) ) if experiment_run_phase_numbers: query = query.where( ExperimentRunPhase.number.in_(experiment_run_phase_numbers) ) if modes: query = query.where(ExperimentRunPhase.mode.in_(modes)) if muscle_action_episodes: query = query.where(MuscleAction.episode.in_(muscle_action_episodes)) if agent_names: query = query.where(Agent.name.in_(agent_names)) query = predicate(query) return query
[docs] def muscles_cumulative_objective( session: Optional[sqlalchemy.orm.Session] = None, like_dataframe: Optional[Union[pd.DataFrame, dd.DataFrame]] = None, experiment_ids: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, experiment_run_phase_numbers: Optional[List[int]] = None, modes: Optional[List[str]] = None, muscle_action_episodes: Optional[List[int]] = None, agent_names: Optional[List[str]] = None, predicate: Predicate = lambda query: query, ) -> pd.DataFrame: """Cumulative object values of rollout workers (i.e., per-worker rewards) The resulting dataframe lists the cumulative reward of each worker of agents in phases of the experiment. Results can be filtered by providing the respective parameters, e.g., to get the cumulative objective values of agents in one particular phase, use the ``experiment_run_phase_uids`` parameter. The ``like_dataframe`` will probably be the most convenient method for filtering. Supplying both a dataframe via ``like_dataframe`` and any other filter parameter filters according to both. Parameters ---------- session : sqlalchemy.orm.Session, optional An session object created by ::`palaestrai.store.Session()`. If not given (``None``), a new session will be automatically established like_dataframe : Optional[Union[pd.DataFrame, dd.DataFrame]] = None Uses the given dataframe to construct a search predicate. Refer to the parameter documentation of ::`~experiments_and_runs_configurations` experiment_ids : Optional[List[str]] An iterable containing experiment IDs to filter for experiment_run_uids : Optional[List[str]] An iterable containing experiment run UIDs to filter for experiment_run_instance_uids : Optional[List[str]] An iterable containing experiment run instance UIDs to filter for experiment_run_phase_uids : Optional[List[str]] An iterable containing experiment run phase UIDs to filter for experiment_run_phase_numbers : Optional[List[int]] An iterable containing experiment run phase numbers to filter for modes : Optional[List[str]] An iterable containing modes to filter for muscle_action_episodes : Optional[List[int]] An iterable containing muscle action episodes to filter for agent_names : Optional[List[str]] = None An iterable containing agent UIDs to filter for predicate : Predicate = lambda query: query An additional predicate (cf. ::`sqlalchemy.sql.expression`) applied to the database query after all other predicates have been applied Returns ------- pd.DataFrame: The dataframe contains the following columns: * agent_id * agent_uid * agent_name * rollout_worker_uid * muscle_cumulative_objective * experiment_run_phase_id * experiment_run_phase_uid * experiment_run_phase_configuration * experiment_run_instance_uid * experiment_run_id * experiment_run_uid * experiment_id * experiment_name """ if session is None: session = Session() experiment_ids = (experiment_ids or []) + _like_df_list( like_dataframe, "experiment_id" ) experiment_run_uids = (experiment_run_uids or []) + _like_df_list( like_dataframe, "experiment_run_uid" ) experiment_run_instance_uids = ( experiment_run_instance_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_instance_uid") experiment_run_phase_uids = ( experiment_run_phase_uids or [] ) + _like_df_list(like_dataframe, "experiment_run_phase_uid") experiment_run_phase_numbers = ( experiment_run_phase_numbers or [] ) + _like_df_list(like_dataframe, "experiment_run_phase_numbers") modes = (modes or []) + _like_df_list(like_dataframe, "modes") muscle_action_episodes = (muscle_action_episodes or []) + _like_df_list( like_dataframe, "muscle_action_episodes" ) query = make_muscles_cumulative_objective_query( experiment_ids=experiment_ids, experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, experiment_run_phase_numbers=experiment_run_phase_numbers, muscle_action_episodes=muscle_action_episodes, modes=modes, agent_names=agent_names, predicate=predicate, ) _debug_print_bound_stmt(query, session) return pd.read_sql_query( query, session.bind, index_col="rollout_worker_uid" )
def _make_generic_selectable_query( selectables: Optional[Any] = None, experiment_ids: Optional[List[str]] = None, experiment_names: Optional[List[str]] = None, experiment_run_uids: Optional[List[str]] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, agent_uids: Optional[List[str]] = None, agent_ids: Optional[List[int]] = None, agent_names: Optional[List[str]] = None, ): query = ( sa.select(selectables) .select_from(MuscleAction) .join(Agent) .join(ExperimentRunPhase) .join(ExperimentRunInstance) .join(ExperimentRun) .join(Experiment) ) if experiment_ids: query = query.where(Experiment.id.in_(experiment_ids)) if experiment_names: query = query.where(Experiment.name.in_(experiment_names)) if experiment_run_uids: query = query.where(ExperimentRun.uid.in_(experiment_run_uids)) if experiment_run_instance_uids: query = query.where( ExperimentRunInstance.uid.in_(experiment_run_instance_uids) ) if experiment_run_phase_uids: query = query.where( ExperimentRunPhase.uid.in_(experiment_run_phase_uids) ) if agent_uids: query = query.where(Agent.uid.in_(agent_uids)) if agent_ids: query = query.where(Agent.id.in_(agent_ids)) if agent_names: query = query.where(Agent.name.in_(agent_names)) return query def _make_brain_state_query( agent_uids: Optional[List[str]] = None, ): max_walltime_query = ( sa.select( func.max(BrainState.walltime).label("brain_state_walltime"), BrainState.tag.label("brain_state_tag"), ) .select_from(BrainState) .join(Agent) ) if agent_uids: max_walltime_query = max_walltime_query.where( BrainState.agent_id.in_(agent_uids) ) max_walltime_query = ( max_walltime_query.group_by(BrainState.tag.label("brain_state_tag")) ).subquery() query = ( sa.select( BrainState.id.label("brain_state_id"), BrainState.walltime.label("brain_state_walltime"), BrainState.simtime_ticks.label("brain_state_simtime_ticks"), BrainState.state.label("brain_state_state"), BrainState.tag.label("brain_state_tag"), BrainState.simtime_timestamp.label( "brain_state_simtime_timestamp" ), BrainState.agent_id.label("brain_agent_id"), ) .select_from(BrainState) .join( max_walltime_query, (max_walltime_query.c.brain_state_tag == BrainState.tag) & ( max_walltime_query.c.brain_state_walltime == BrainState.walltime ), ) .filter( (BrainState.tag.like("%_actor")) | (BrainState.tag.like("%_actor_target")) | (BrainState.tag.like("%_critic")) | (BrainState.tag.like("%_critic_target")) ) ) return query def _load_harl_modules(x): bio = BytesIO() bio.write(x) bio.seek(0) import torch # type: ignore[import-not-found] actor = torch.load( bio, map_location=torch.device("cpu"), weights_only=False ) return actor def brain_state( session: Optional[sqlalchemy.orm.Session] = None, experiment_name: Optional[str] = None, experiment_run_uid: Optional[str] = None, experiment_run_instance_uids: Optional[List[str]] = None, experiment_run_phase_uids: Optional[List[str]] = None, agent_names: Optional[List[str]] = None, extract: bool = False, file_store: bool = False, ): if session is None: session = Session() if experiment_run_instance_uids is None: ( experiment_ids, experiment_run_uids, experiment_run_instance_uids, ) = _latest_ids(session, experiment_name, experiment_run_uid) else: assert experiment_name is not None experiment_run_uids = [experiment_run_uid] exp_id_and_run_id_query = _make_generic_selectable_query( (Experiment.id.label("experiment_id"),), experiment_names=[experiment_name], experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, agent_names=agent_names, ) _debug_print_bound_stmt(exp_id_and_run_id_query, session) df_agent_id = pd.read_sql_query(exp_id_and_run_id_query, session.bind) experiment_ids = df_agent_id["experiment_id"].to_list() agent_id_query = _make_generic_selectable_query( (sa.distinct(Agent.id).label("agent_id"),), experiment_ids=experiment_ids, experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, agent_names=agent_names, ) _debug_print_bound_stmt(agent_id_query, session) df_agent_id = pd.read_sql_query(agent_id_query, session.bind) agent_id_list = df_agent_id["agent_id"].to_list() phase_num_query = _make_generic_selectable_query( ( sa.distinct(ExperimentRunPhase.number).label( "experiment_run_phase_number" ), ), experiment_ids=experiment_ids, experiment_run_uids=experiment_run_uids, experiment_run_instance_uids=experiment_run_instance_uids, experiment_run_phase_uids=experiment_run_phase_uids, agent_names=agent_names, ) _debug_print_bound_stmt(phase_num_query, session) df_exp_run_phase_id = pd.read_sql_query(phase_num_query, session.bind) experiment_run_phase_numbers = df_exp_run_phase_id[ "experiment_run_phase_number" ].to_list() query = _make_brain_state_query(agent_id_list) _debug_print_bound_stmt(query, session) df = pd.read_sql_query(query, session.bind) brain_state_record = df.brain_state_state.transform( _load_harl_modules, axis=0 ) df = df.assign(brain_state_state=brain_state_record) brain = None if extract: from ..util.dynaloader import load_with_params # TODO: Iterate over all agents df["brain_state_tag_class"] = ( df.brain_state_tag.str.replace("_actor", "") .str.replace("_critic", "") .str.replace("_target", "") ) classes = list(df.groupby("brain_state_tag_class").groups.keys()) assert isinstance(classes, list) assert len(classes) == 2 or len(classes) == 1, ( "There must either be one or two classes (normal + 'best-*') in the queried brain_state_tag, but " "there are: {}".format(classes) ) g = None use_best = False for k in list(classes): k = str(k) if k.startswith("best-"): g = k.removeprefix("best-") use_best = True break if g is None: g = list(classes)[0] # type: ignore[assignment] assert isinstance(g, str) g = g.upper() brain = load_with_params(f"harl:{g}Brain", {}) def get_state(_df, _suffix, _use_best=False): mask = _df.brain_state_tag.str.endswith(_suffix) if _use_best: mask &= _df.brain_state_tag.str.startswith("best-") return _df[mask].brain_state_state.iloc[0] brain.actor = get_state(df, "_actor", use_best) brain.critic = get_state(df, "_critic", use_best) brain.actor_target = get_state(df, "_actor_target", use_best) brain.critic_target = get_state(df, "_critic_target", use_best) assert agent_names is not None and len(agent_names) == 1 if file_store: dumper = FileBrainDumper( dump_to=BrainLocation( agent_name=agent_names[0], experiment_run_uid=experiment_run_uids[0], experiment_run_phase=experiment_run_phase_numbers[0], repeat=1, ) ) brain._dumpers = [dumper] brain.store() return df, brain