Source code for palaestrai.core.runtime_config

from __future__ import annotations

import logging
from io import TextIOWrapper
from os import getcwd
from pathlib import Path
from typing import Any, Dict, Union, TextIO

import appdirs
import ruamel.yaml

LOG = logging.getLogger(__name__)


[docs] class DebugLogFilter: """Only allows debug messages; works as a filter for the debug_handler""" def __call__(self, log_message): if log_message.levelno == logging.DEBUG: return True else: return False
class _RuntimeConfig: """Application-wide runtime configuration. This singleton class provides an application-wide runtime configuration and transparently hides all sources from the rest of the application. """ CONFIG_FILE_PATHS = [ "%s/runtime-conf.yaml" % appdirs.site_config_dir("palaestrai", "OFFIS"), "%s/runtime-conf.yaml" % appdirs.user_config_dir("palaestrai", "OFFIS"), "%s/palaestrai.conf" % getcwd(), "%s/palaestrai-runtime.conf.yaml" % getcwd(), "%s/runtime.conf.yaml" % getcwd(), ] DEFAULT_CONFIG = { "store_uri": "sqlite:///palaestrai.db", "time_series_store_uri": "influx+localhost:8086", "store_buffer_size": 20, "data_path": "./_outputs", "broker_uri": "ipc://*", "executor_bus_port": 4242, "logger_port": 4243, "public_bind": False, "major_domo_client_timeout": 300_000, "major_domo_client_retries": 3, "fork_method": "spawn", "logging": { "version": 1, "formatters": { "simple": { "format": "%(asctime)s %(name)s[%(process)d]: " "%(levelname)s - %(message)s" }, "debug": { "format": "%(asctime)s %(name)s[%(process)d]: " "%(levelname)s - %(message)s (%(module)s.%(funcName)s " "in %(filename)s:%(lineno)d)" }, "terminal": { "class": "palaestrai.cli.terminal_formatter.TerminalFormatter", }, }, "filters": { "debug_filter": { "()": "palaestrai.core.runtime_config.DebugLogFilter", }, }, "handlers": { "console": { "class": "logging.StreamHandler", "level": "INFO", "formatter": "simple", "stream": "ext://sys.stdout", }, "console_debug": { "class": "logging.StreamHandler", "level": "DEBUG", "formatter": "debug", "filters": ["debug_filter"], "stream": "ext://sys.stdout", }, }, "loggers": { "palaestrai.agent": {"level": "ERROR"}, "palaestrai.agent.brain": {"level": "ERROR"}, "palaestrai.agent.muscle": {"level": "ERROR"}, "palaestrai.agent.agent_conductor": {"level": "ERROR"}, "palaestrai.core": {"level": "ERROR"}, "palaestrai.experiment": {"level": "ERROR"}, "palaestrai.store": {"level": "ERROR"}, "palaestrai.environment": {"level": "ERROR"}, "palaestrai.simulation": {"level": "ERROR"}, "palaestrai.types": {"level": "ERROR"}, "palaestrai.util": {"level": "ERROR"}, "palaestrai.visualization": {"level": "ERROR"}, "sqlalchemy.engine": {"level": "ERROR"}, }, "root": { "level": "ERROR", "handlers": ["console", "console_debug"], }, }, "profile": False, } _instance = None def __init__(self): self._config_file_path = None self.config_search_path = _RuntimeConfig.CONFIG_FILE_PATHS # The loaded configuration is what RuntimeConfig.load gave us. It # remains immutable after loading. self._loaded_configuration = {} def _get(self, key: str, default=None, exception=None) -> Any: """Retrieves a config key Retrieves any config key; if not set, it queries the config dictionary; if it isn't present there, it returns the given default value. It also sets the value in the current object as a side-effect. """ lkey = "_%s" % key if lkey not in self.__dict__: try: self.__dict__[lkey] = self._loaded_configuration[key] except KeyError: if default: self.__dict__[lkey] = default else: self.__dict__[lkey] = _RuntimeConfig.DEFAULT_CONFIG[key] if exception: raise KeyError(exception) return self.__dict__[lkey]
[docs] def reset(self): """Resets the runtime configuration to empty state""" for key in list(self._loaded_configuration.keys()) + list( _RuntimeConfig.DEFAULT_CONFIG.keys() ): try: del self.__dict__[f"_{key}"] except KeyError: pass self._loaded_configuration = {} self._config_file_path = None
@property def logging(self) -> Dict: """Configuration of all subsystem loggers The logging configuration determines how palaestrAI outputs its state information. The built-in configuration doesn't give much output except for warnings and errors (which is akin to the Unix philosophy). However, the log level of individual modules can be increased to, e.g., ``INFO``. The configuration is based on the Python ``logging`` module. Thus, generation information can be obtained from `Python's official documentation on logging <https://docs.python.org/3/library/logging.config.html#logging-config-dictschemia>`_. Returns ------- dict The logging configuration Examples -------- ``palaestrai runtime-config-show-default`` gives a complete, sane default logging config. To get a more colorful output, change it to:: logging: version: 1 root: handlers: [console] level: ERROR filters: debug_filter: {(): palaestrai.core.runtime_config.DebugLogFilter} formatters: debug: {format: '%(asctime)s %(name)s[%(process)d]: %(levelname)s - %(message)s (%(module)s.%(funcName)s in %(filename)s:%(lineno)d)'} simple: {format: '%(asctime)s %(name)s[%(process)d]: %(levelname)s - %(message)s'} terminal: {class: palaestrai.cli.terminal_formatter.TerminalFormatter} handlers: console: class: logging.StreamHandler formatter: terminal level: DEBUG stream: 'ext://sys.stdout' console_debug: class: logging.StreamHandler filters: [debug_filter] formatter: debug level: DEBUG stream: ext://sys.stdout loggers: palaestrai.agent: {level: ERROR} palaestrai.agent.agent_conductor: {level: INFO} palaestrai.agent.brain: {level: INFO} palaestrai.agent.muscle: {level: INFO} palaestrai.core: {level: INFO} palaestrai.environment: {level: INFO} palaestrai.experiment: {level: INFO} palaestrai.simulation: {level: INFO} palaestrai.store: {level: ERROR} palaestrai.types: {level: ERROR} palaestrai.util: {level: ERROR} palaestrai.visualization: {level: ERROR} sqlalchemy.engine: {level: ERROR} """ return self._get( "logging", exception="Sorry, no logging config in the config file", ) @property def time_series_store_uri(self) -> str: """URI to the time series database for results This must be any standards-compliant string in the form of `influxdb+user:password@host-or-path:port/db`. For example, `elasticsearch+myuser:mypass@localhost/arl`. Returns ------- """ return self._get("time_series_store_uri") @property def store_uri(self) -> str: """URI to the store database for results This must be any standards-compliant string in the form of `transport://user:password@host-or-path:port/db`. For example, `postgresql://myuser:mypass@localhost/arl`. Returns ------- str The connection string; default: ``sqlite:///palaestrai.db`` Example ------- ``store_uri: postgresql://palaestrai:Password@localhost:5432/arl`` """ return self._get("store_uri") @property def store_buffer_size(self) -> int: """Number of messages buffered before writing to the store palaestrAI buffers data before flushing it in bulk to the store. This number defines a factor; the number of messages being buffered is a multiple of the number of agents and number of environments, for technical reasons. Writing data in bulk to the database makes the results storage more efficient. However, caching too many messages before writing will also increase the local memory consumption. To find a balance, the number is user-configurable. The number of messages cached is equal to number_of_workers * number_of_environments * store_buffer_size Returns ------- int Multiplied for the storage buffer, default: 20 Example ------- ``store_buffer_size: 10`` """ return self._get("store_buffer_size") @property def data_path(self) -> Path: """File system path for data storage Although the results storage database is the main system for storing any result the comes out of a palaestrAI run, some data is also dumped to the local disk, either for convenience or as backup. A common example are brain dumps (i.e., trained agent models). Returns ------- pathlib.Path The data path, default: ``./_outputs`` Example ------- ``data_path: /home/myself/palaestrai/results`` """ return Path(self._get("data_path")) @property def broker_uri(self) -> str: """Full URI to the Major Domo Protocol broker palaestrAI is based on message passing. All subsystems communicate with each other through a architecture that is based on the ZeroMQ Major Domo Protocol. This architecture uses a central broker to route messagees between subsystems. This property retrives the full connect URI of the broker. It can be explicitly set in the runtime configuration; if not, it is derived from :py:attr:`executor_bus_port` and :py:attr:`public_bind` properties. The following values are recognized by palaestrAI: ``broker_uri: ~`` (i.e., ``None``) Derives from :py:attr:`executor_bus_port` and :py:attr:`public_bind`. If ``public_bind: False`` (the default), then the broker URI will be ``tcp://127.0.0.1:4242``. ``broker_uri: tcp://`` Instructs the major domo broker to pick a random free port. Uses :py:attr:`public_bind` to determine whether the broker binds to localhost or to all available interfaces. Equivalent to ``tcp://*:*``. ``broker_uri: tcp://*:4242`` Explicitly instructs the major domo broker to bind to the given port 4242 on all available interfaces. ``broker_uri: ipc://`` Lets the major domo broker pick a random file in a temporary directory to use the inter-process communication protocol instead of TCP. Ignores the settings in :py:attr:`executor_bus_port` and :py:attr:`public_bind`. Equivalent to ``ipc://*``. ``broker_uri: ipc:///some/file`` Explicitly sets a file for IPC transport. If the ``broker_uri`` property is set, it overwrites the effects of setting :py:attr:`executor_bus_port` and :py:attr:`public_bind`. Examples -------- The default sets ``public_bind: False`` and ``executor_bus_port: 4242`` in the runtime config, which creates the broker URI ``tcp://127.0.0.1:4242``. This is equivalent to creating the following runtime config file entry:: broker_uri: tcp://127.0.0.1:4242 """ address_part = "*" if self.public_bind else "127.0.0.1" default_uri = f"tcp://{address_part}:{self.executor_bus_port}" uri = self._get("broker_uri", default=default_uri) or default_uri self._broker_uri = uri return uri @broker_uri.setter def broker_uri(self, broker_uri: str): self.__dict__["_broker_uri"] = broker_uri @property def executor_bus_port(self) -> int: """Port of the executor's messaging bus palaestrai needs one bus to start it all, which is managed by the executor. All other buses and topics can be communicated over this initial bus. Returns ------- int The bus port; default: 4242 Example ------- ``executor_bus_port: 4242`` """ return self._get("executor_bus_port") @property def public_bind(self) -> bool: """Indicates whether to bind to all public adresses or to localhost This configuration setting allows the Executor and all other message buses to bind to all public IP addresses if set to True. If not, the buses will bind to ``127.0.0.1`` only`. Finer-grained control can be excerted by the :py:attr:`broker_uri` option, which overwrites whatever setting is made here. Returns ------- bool If True, the major domo broker binds to all available IP addresses. The default is False. Example ------- ``public_bind: False`` """ return self._get("public_bind") @property def profile(self) -> bool: """Whether to enable profiling or not. If enabled, executions of palaestrAI create profiling information in ``.prof`` files. This can be important for hunting performance bugs and is usually enabled by developers, but not by end users. Profiling is based on `Python's cProfile module <https://docs.python.org/3/library/profile.html>`_ and the resulting files can be inspected with tools like `Snakeviz <https://jiffyclub.github.io/snakeviz/>`_. Returns ------- bool If True, profiling is enabled and ``.prof`` files are generated; if False, nothing is additionally done. """ return self._get("profile") @property def major_domo_client_timeout(self) -> int: """Timeout used by the MajorDomoClient Returns ------- int The timeout, in seconds """ return self._get("major_domo_client_timeout") @property def major_domo_client_retries(self) -> int: """Number of connection attempts the major domo client will try Returns ------- int The number of (re-) tries """ return self._get("major_domo_client_retries") @property def fork_method(self) -> str: """The method for forking child processes palaestrAI should use palaestrAI creates a number of subprocesses when it runs. Python offers several ways to create new processes. The safest is to use the ``spawn`` method, which is the default. However, other methods like ``fork`` and ``forkserver`` offer higher speed, but are not available on every platform. The recommended way is to leave it at ``spawn`` unless you are running on a Linux system, e.g., in a docker container. Then, ``forkserver`` can offer you a performance boost. For more details, see the offical Python documentation on `start methods <https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods>`_. Returns ------- str Either ``spawn``, ``fork``, or ``forkserver``. If the value set in the loadable runtime config is not any of these strings, the default method of ``spawn`` is returned. Default: ``spawn``. """ method = self._get("fork_method") if not any( method == x for x in ( "fork", "forkserver", "spawn", ) ): method = "spawn" return method @property def logger_port(self) -> int: """Destination port the internal log server should use All spawned submodules of palaestrAI communicate their log messages back to the main process. This log message receiver binds to the given port. Returns ------- int The port of the internal log server, default: 4243 Example ------- ``logger_port: 4243`` """ return self._get("logger_port")
[docs] def load( self, stream_or_dict: Union[dict, TextIO, str, Path, None] = None ): """Loads the configuration from an external source. The runtime configuration is initialized from the default configuration in ::`_RuntimeConfig.DEFAULT_CONFIG`. This method then iterates through the list in ::`_RuntimeConfig.CONFIG_FILE_PATHS`, subsequently updating the existing configuration with new values found. Finally, the given ::`stream_or_dict` parameter is used if present, ultimately taking preference over all other values. That means that each config file can contain only a portion of the overall configuration; it gets updated subsequently. Parameters ---------- stream_or_dict : Union[dict, TextIO, str, Path, None] Loads the runtime configuration directly from a dictionary or as YAML-encoded stream. If no stream is given, the default files in :py:attr:`_RuntimeConfig.CONFIG_FILE_PATHS` will be tried as described. Examples -------- ``RuntimeConfig().load()`` Will initialize the runtime configuration from the default configuration, and then try all available configuration files. The configuration is updated in a cascading manner. ``RuntimeConfig.load({"store_uri": "sqlite:///test.db"})`` Loads the complete configuration is above, but additionally overwrites the ``store_uri`` parameter, setting it to ``sqlite:///test.db``. """ if not isinstance(self._loaded_configuration, dict): self._loaded_configuration = {} if not stream_or_dict and len(self._loaded_configuration) > 0: # Don't load a default config if we already have something; use # reset() instead. return yml = ruamel.yaml.YAML(typ="safe") has_seen_nondefault_config = False self._loaded_configuration.update(_RuntimeConfig.DEFAULT_CONFIG) for file in _RuntimeConfig.CONFIG_FILE_PATHS: try: LOG.debug("Trying to open configuration file: %s", file) with open(file, "r") as fp: deserialized = yml.load(fp) if not isinstance(deserialized, dict): LOG.warning( "The contents of %s could not be deserialized " "to dict, skipping it.", file, ) continue self._loaded_configuration.update(deserialized) self._config_file_path = file has_seen_nondefault_config = True except IOError: continue if isinstance(stream_or_dict, dict): self._loaded_configuration.update(stream_or_dict) self._config_file_path = "(dict)" return if isinstance(stream_or_dict, str): stream_or_dict = Path(stream_or_dict) if isinstance(stream_or_dict, Path): try: stream_or_dict = open(stream_or_dict, "r") except OSError: LOG.warning( "Failed to load runtime configuration from file at %s, " "ignoring.", stream_or_dict, ) if stream_or_dict is not None: try: deserialized = yml.load(stream_or_dict) # Can raise if not isinstance(deserialized, dict): raise TypeError self._loaded_configuration.update(deserialized) try: self._config_file_path = stream_or_dict.name except AttributeError: self._config_file_path = str(stream_or_dict) has_seen_nondefault_config = True except TypeError: LOG.warning( "Failed to load runtime configuration from stream " 'at "%s", ignoring.', repr(stream_or_dict), ) finally: if isinstance(stream_or_dict, TextIOWrapper): stream_or_dict.close() if not has_seen_nondefault_config: LOG.info( "No runtime configuration given, loaded built-in defaults." ) self._config_file_path = "(DEFAULT)"
def to_dict(self) -> Dict: return {key: self._get(key) for key in _RuntimeConfig.DEFAULT_CONFIG} def __str__(self): return "<RuntimeConfig id=0x%x> at %s" % ( id(self), self._config_file_path, ) def __repr__(self): return str(self.to_dict())
[docs] def RuntimeConfig(): """RuntimeConfig singleton instance Provides API access to palaestrAI's runtime configuration. At any time, the configuration is fed from one singleton instance. """ if _RuntimeConfig._instance is None: _RuntimeConfig._instance = _RuntimeConfig() try: _RuntimeConfig._instance.load() except FileNotFoundError: from copy import deepcopy _RuntimeConfig._instance._loaded_configuration = deepcopy( _RuntimeConfig.DEFAULT_CONFIG ) return _RuntimeConfig._instance