Source code for palaestrai.entrypoints.palaestrai_runner

from __future__ import annotations
from typing import Union, TextIO, Tuple, List

import sys
import asyncio
import logging
from pathlib import Path
from itertools import chain
from datetime import datetime

import ruamel.yaml

from palaestrai.core import RuntimeConfig
from palaestrai.experiment import ExperimentRun, Executor, ExecutorState

ExperimentRunInputTypes = Union[ExperimentRun, TextIO, str, Path]

LOG = logging.getLogger("palaestrai.runner")


[docs] async def run( experiment_run_definition: Union[ ExperimentRunInputTypes, List[ExperimentRunInputTypes] ], runtime_config: Union[str, TextIO, dict, None] = None, parallel_runs: int = 1, ) -> Tuple[List[str], ExecutorState]: """Provides a single-line command to start an experiment This is the asynchronous variant of :func:`.execute`. Use it if you have already an event loop running (e.g., in a Jupyter Notebook). """ if runtime_config: RuntimeConfig().load(runtime_config) # There is an implicit loading of the default config. The object returned # by RuntimeConfig() has at least the default loaded, and tries to load # from the search path. So there is no reason to have an explicit load() # here. if not isinstance(experiment_run_definition, List): experiment_run_definition = [experiment_run_definition] experiment_run_definition = [ Path(i) if isinstance(i, str) else i for i in experiment_run_definition ] experiment_run_definition = list( chain.from_iterable( i.rglob("*.y*ml") if isinstance(i, Path) and i.is_dir() else [i] for i in experiment_run_definition ) ) experiment_runs = [ ExperimentRun.load(i) if not isinstance(i, ExperimentRun) else i for i in experiment_run_definition ] executor = Executor(parallel_runs=parallel_runs) executor.schedule(experiment_runs) executor_final_state = await executor.execute() return [e.uid for e in experiment_runs], executor_final_state
[docs] def execute( experiment_run_definition: Union[ ExperimentRunInputTypes, List[ExperimentRunInputTypes] ], runtime_config: Union[str, TextIO, dict, None] = None, parallel_runs: int = 1, ) -> Tuple[List[str], ExecutorState]: """Provides a single-line command to start an experiment This function is a high-level wrapper that takes a number of experiment run defintions and optionally a runtime config, executes the runs, and returns the results. Note ---- This is a sync method. If you already have an event loop running, please use this function's sibling, :func:`.run` Parameters ---------- experiment_run_definition: 1. Already set ExperimentRun object 2. Any text stream 3. The path to a file The configuration from which the experiment is loaded. runtime_config: 1. Any text stream 2. dict 3. None The Runtime configuration applicable for the run. Note that even when no additional source is provided, runtime will load a minimal configuration from build-in defaults. parallel_runs : int, default: 1 Number of experiment runs that can be executed in parallel Returns ------- typing.Tuple[Sequence[str], ExecutorState] A tuple containing: 1. The list of all experiment run IDs 2. The final state of the executor """ loop = None try: loop = asyncio.get_running_loop() except RuntimeError: pass # No loop - this is good. if loop is not None and loop.is_running(): raise RuntimeError( "An event loop is already running. " "Please use `await palaestrai.run(...)` instead." ) try: import uvloop LOG.debug("Using uvloop.") if sys.version_info >= (3, 11): with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: return runner.run( run( experiment_run_definition, runtime_config, parallel_runs, ) ) else: uvloop.install() return asyncio.run( run(experiment_run_definition, runtime_config, parallel_runs) ) except ModuleNotFoundError: LOG.debug("uvloop not available.") return asyncio.run( run(experiment_run_definition, runtime_config, parallel_runs) )