from __future__ import annotations
from typing import Union, TextIO, Tuple, List
import sys
import asyncio
import logging
from pathlib import Path
from itertools import chain
from datetime import datetime
import ruamel.yaml
from palaestrai.core import RuntimeConfig
from palaestrai.experiment import ExperimentRun, Executor, ExecutorState
ExperimentRunInputTypes = Union[ExperimentRun, TextIO, str, Path]
LOG = logging.getLogger("palaestrai.runner")
[docs]
async def run(
experiment_run_definition: Union[
ExperimentRunInputTypes, List[ExperimentRunInputTypes]
],
runtime_config: Union[str, TextIO, dict, None] = None,
parallel_runs: int = 1,
) -> Tuple[List[str], ExecutorState]:
"""Provides a single-line command to start an experiment
This is the asynchronous variant of :func:`.execute`. Use it if you
have already an event loop running (e.g., in a Jupyter Notebook).
"""
if runtime_config:
RuntimeConfig().load(runtime_config)
# There is an implicit loading of the default config. The object returned
# by RuntimeConfig() has at least the default loaded, and tries to load
# from the search path. So there is no reason to have an explicit load()
# here.
if not isinstance(experiment_run_definition, List):
experiment_run_definition = [experiment_run_definition]
experiment_run_definition = [
Path(i) if isinstance(i, str) else i for i in experiment_run_definition
]
experiment_run_definition = list(
chain.from_iterable(
i.rglob("*.y*ml") if isinstance(i, Path) and i.is_dir() else [i]
for i in experiment_run_definition
)
)
experiment_runs = [
ExperimentRun.load(i) if not isinstance(i, ExperimentRun) else i
for i in experiment_run_definition
]
executor = Executor(parallel_runs=parallel_runs)
executor.schedule(experiment_runs)
executor_final_state = await executor.execute()
return [e.uid for e in experiment_runs], executor_final_state
[docs]
def execute(
experiment_run_definition: Union[
ExperimentRunInputTypes, List[ExperimentRunInputTypes]
],
runtime_config: Union[str, TextIO, dict, None] = None,
parallel_runs: int = 1,
) -> Tuple[List[str], ExecutorState]:
"""Provides a single-line command to start an experiment
This function is a high-level wrapper that takes a number of experiment
run defintions
and optionally a runtime config,
executes the runs, and returns the results.
Note
----
This is a sync method. If you already have an event loop running, please
use this function's sibling, :func:`.run`
Parameters
----------
experiment_run_definition: 1. Already set ExperimentRun object
2. Any text stream
3. The path to a file
The configuration from which the experiment is loaded.
runtime_config: 1. Any text stream
2. dict
3. None
The Runtime configuration applicable for the run.
Note that even when no additional source is provided, runtime will load
a minimal configuration from build-in defaults.
parallel_runs : int, default: 1
Number of experiment runs that can be executed in parallel
Returns
-------
typing.Tuple[Sequence[str], ExecutorState]
A tuple containing:
1. The list of all experiment run IDs
2. The final state of the executor
"""
loop = None
try:
loop = asyncio.get_running_loop()
except RuntimeError:
pass # No loop - this is good.
if loop is not None and loop.is_running():
raise RuntimeError(
"An event loop is already running. "
"Please use `await palaestrai.run(...)` instead."
)
try:
import uvloop
LOG.debug("Using uvloop.")
if sys.version_info >= (3, 11):
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
return runner.run(
run(
experiment_run_definition,
runtime_config,
parallel_runs,
)
)
else:
uvloop.install()
return asyncio.run(
run(experiment_run_definition, runtime_config, parallel_runs)
)
except ModuleNotFoundError:
LOG.debug("uvloop not available.")
return asyncio.run(
run(experiment_run_definition, runtime_config, parallel_runs)
)