from __future__ import annotations
import logging
from io import TextIOWrapper
from os import getcwd
from pathlib import Path
from typing import Any, Dict, Union, TextIO
import appdirs
import ruamel.yaml
LOG = logging.getLogger(__name__)
[docs]
class DebugLogFilter:
"""Only allows debug messages; works as a filter for the debug_handler"""
def __call__(self, log_message):
if log_message.levelno == logging.DEBUG:
return True
else:
return False
class _RuntimeConfig:
"""Application-wide runtime configuration.
This singleton class provides an application-wide runtime configuration
and transparently hides all sources from the rest of the application.
"""
CONFIG_FILE_PATHS = [
"%s/runtime-conf.yaml"
% appdirs.site_config_dir("palaestrai", "OFFIS"),
"%s/runtime-conf.yaml"
% appdirs.user_config_dir("palaestrai", "OFFIS"),
"%s/palaestrai-runtime.conf.yaml" % getcwd(),
"%s/runtime.conf.yaml" % getcwd(),
]
DEFAULT_CONFIG = {
"store_uri": "sqlite:///palaestrai.db",
"time_series_store_uri": "influx+localhost:8086",
"store_buffer_size": 20,
"data_path": "./_outputs",
"executor_bus_port": 4242,
"logger_port": 4243,
"public_bind": False,
"major_domo_client_timeout": 300_000,
"major_domo_client_retries": 3,
"logging": {
"version": 1,
"formatters": {
"simple": {
"format": "%(asctime)s %(name)s[%(process)d]: "
"%(levelname)s - %(message)s"
},
"debug": {
"format": "%(asctime)s %(name)s[%(process)d]: "
"%(levelname)s - %(message)s (%(module)s.%(funcName)s "
"in %(filename)s:%(lineno)d)"
},
},
"filters": {
"debug_filter": {
"()": "palaestrai.core.runtime_config.DebugLogFilter",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"console_debug": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "debug",
"filters": ["debug_filter"],
"stream": "ext://sys.stdout",
},
},
"loggers": {
"palaestrai.agent": {"level": "ERROR"},
"palaestrai.agent.brain": {"level": "ERROR"},
"palaestrai.agent.muscle": {"level": "ERROR"},
"palaestrai.agent.agent_conductor": {"level": "ERROR"},
"palaestrai.core": {"level": "ERROR"},
"palaestrai.experiment": {"level": "ERROR"},
"palaestrai.store": {"level": "ERROR"},
"palaestrai.environment": {"level": "ERROR"},
"palaestrai.simulation": {"level": "ERROR"},
"palaestrai.types": {"level": "ERROR"},
"palaestrai.util": {"level": "ERROR"},
"palaestrai.visualization": {"level": "ERROR"},
"sqlalchemy.engine": {"level": "ERROR"},
},
"root": {
"level": "ERROR",
"handlers": ["console", "console_debug"],
},
},
"profile": False,
}
_instance = None
def __init__(self):
self._config_file_path = None
self.config_search_path = _RuntimeConfig.CONFIG_FILE_PATHS
# The loaded configuration is what RuntimeConfig.load gave us. It
# remains immutable after loading.
self._loaded_configuration = {}
def _get(self, key: str, default=None, exception=None) -> Any:
"""Retrieves a config key
Retrieves any config key; if not set, it queries the config dictionary;
if it isn't present there, it returns the given default value. It also
sets the value in the current object as a side-effect.
"""
lkey = "_%s" % key
if lkey not in self.__dict__:
try:
self.__dict__[lkey] = self._loaded_configuration[key]
except KeyError:
if default:
self.__dict__[lkey] = default
else:
self.__dict__[lkey] = _RuntimeConfig.DEFAULT_CONFIG[key]
if exception:
raise KeyError(exception)
return self.__dict__[lkey]
def reset(self):
"""Resets the runtime configuration to empty state"""
for key in list(self._loaded_configuration.keys()) + list(
_RuntimeConfig.DEFAULT_CONFIG.keys()
):
try:
del self.__dict__[f"_{key}"]
except KeyError:
pass
self._loaded_configuration = {}
self._config_file_path = None
@property
def logging(self) -> Dict:
"""Configuration of all subsystem loggers
:return: A logging configuration that can be fed into
`logging.DictConfig`.
:rtype: dict
"""
return self._get(
"logging",
exception="Sorry, no logging config in the config file",
)
@property
def time_series_store_uri(self) -> str:
"""URI to the time series database for results
This must be any standards-compliant string in the form of
`influxdb+user:password@host-or-path:port/db`. For example,
`elasticsearch+myuser:mypass@localhost/arl`.
Returns
-------
"""
return self._get("time_series_store_uri")
@property
def store_uri(self) -> str:
"""URI to the store database for results
This must be any standards-compliant string in the form of
`transport://user:password@host-or-path:port/db`. For example,
`postgresql://myuser:mypass@localhost/arl`.
:return: The URI string.
:rtype: str
"""
return self._get("store_uri")
@property
def store_buffer_size(self) -> int:
"""Number of messages buffered before writing to the store
palaestrAI buffers data before flushing it in bulk to the store.
This number defines a factor; the number of messages being buffered
is a multiple of the number of agents and number of environments,
for technical reasons.
Writing data in bulk to the database makes the results storage more
efficient.
However, caching too many messages before writing will also increase
the local memory consumption.
To find a balance, we make this number user-configurable, but the
default should be sane for any modern machine.
"""
return self._get("store_buffer_size")
@property
def data_path(self) -> Path:
"""File system path for data storage
palaestrAI needs a path to store data in the file system, e.g., for
brain dumps.
:returns: The file path, default is: "." (current directory)
:rtype: pathlib.Path
"""
return Path(self._get("data_path"))
@property
def executor_bus_port(self) -> int:
"""Port of the executor's messaging bus
palaestrai needs one bus to start it all, which is managed by the
executor. All other buses and topics can be communicated over this
initial bus.
:return: The bus port, default: 4242
:rtype: int
"""
return self._get("executor_bus_port")
@property
def public_bind(self) -> bool:
"""Indicates whether to bind to all public adresses or to localhost
This configuration setting allows the Executor and all other message
buses to bind to all public IP addresses if set to `True`. If not,
the buses will bind to `localhost` only.
:return: Whether to bind to all available IP adresses (`True`) or not.
:rtype: bool
"""
return self._get("public_bind")
@property
def profile(self) -> bool:
"""Whether to enable profiling or not."""
return self._get("profile")
@property
def major_domo_client_timeout(self) -> int:
"""Timeout used by the MajorDomoClient
Returns
-------
int
The timeout, in seconds
"""
return self._get("major_domo_client_timeout")
@property
def major_domo_client_retries(self) -> int:
"""The number of retries the MajorDomoClient will try
:return: the number of retries
:rtype: int
"""
return self._get("major_domo_client_retries")
@property
def logger_port(self) -> int:
"""Destination port the internal log server should use
All spawned submodules of palaestrAI communicate their log messages
back to the main process. This log message receiver binds to the given
port.
Returns
-------
int
The port of the internal log server
"""
return self._get("logger_port")
def load(
self, stream_or_dict: Union[dict, TextIO, str, Path, None] = None
):
"""Loads the configuration from an external source.
The runtime configuration is initialized from the default configuration
in ::`_RuntimeConfig.DEFAULT_CONFIG`. This method then iterates through
the list in ::`_RuntimeConfig.CONFIG_FILE_PATHS`, subsequently updating
the existing configuration with new values found. Finally, the given
::`stream_or_dict` parameter is used if present, ultimately taking
preference over all other values.
That means that each config file can contain only a portion of the
overall configuration; it gets updated subsequently.
Parameters
----------
stream_or_dict : Union[dict, TextIO, str, Path, None]
Loads the runtime configuration directly from a dictionary or as
YAML-encoded stream. If no stream is given, the default files in
::`.CONFIG_FILE_PATHS` will be tried as described.
"""
if not isinstance(self._loaded_configuration, dict):
self._loaded_configuration = {}
if not stream_or_dict and len(self._loaded_configuration) > 0:
# Don't load a default config if we already have something; use
# reset() instead.
return
yml = ruamel.yaml.YAML(typ="safe")
has_seen_nondefault_config = False
self._loaded_configuration.update(_RuntimeConfig.DEFAULT_CONFIG)
for file in _RuntimeConfig.CONFIG_FILE_PATHS:
try:
LOG.debug("Trying to open configuration file: %s", file)
with open(file, "r") as fp:
deserialized = yml.load(fp)
if not isinstance(deserialized, dict):
LOG.warning(
"The contents of %s could not be deserialized "
"to dict, skipping it.",
file,
)
continue
self._loaded_configuration.update(deserialized)
self._config_file_path = file
has_seen_nondefault_config = True
except IOError:
continue
if isinstance(stream_or_dict, dict):
self._loaded_configuration.update(stream_or_dict)
self._config_file_path = "(dict)"
return
if isinstance(stream_or_dict, str):
stream_or_dict = Path(stream_or_dict)
if isinstance(stream_or_dict, Path):
try:
stream_or_dict = open(stream_or_dict, "r")
except OSError:
LOG.warning(
"Failed to load runtime configuration from file at %s, "
"ignoring.",
stream_or_dict,
)
if stream_or_dict is not None:
try:
deserialized = yml.load(stream_or_dict) # Can raise
if not isinstance(deserialized, dict):
raise TypeError
self._loaded_configuration.update(deserialized)
try:
self._config_file_path = stream_or_dict.name
except AttributeError:
self._config_file_path = str(stream_or_dict)
has_seen_nondefault_config = True
except TypeError:
LOG.warning(
"Failed to load runtime configuration from stream "
'at "%s", ignoring.',
repr(stream_or_dict),
)
finally:
if isinstance(stream_or_dict, TextIOWrapper):
stream_or_dict.close()
if not has_seen_nondefault_config:
LOG.info(
"No runtime configuration given, loaded built-in defaults."
)
self._config_file_path = "(DEFAULT)"
def to_dict(self) -> Dict:
return {key: self._get(key) for key in _RuntimeConfig.DEFAULT_CONFIG}
def __str__(self):
return "<RuntimeConfig id=0x%x> at %s" % (
id(self),
self._config_file_path,
)
def __repr__(self):
return str(self.to_dict())
[docs]
def RuntimeConfig():
if _RuntimeConfig._instance is None:
_RuntimeConfig._instance = _RuntimeConfig()
try:
_RuntimeConfig._instance.load()
except FileNotFoundError:
from copy import deepcopy
_RuntimeConfig._instance._loaded_configuration = deepcopy(
_RuntimeConfig.DEFAULT_CONFIG
)
return _RuntimeConfig._instance