from __future__ import annotations
import asyncio
import asyncio.exceptions
import collections
import dataclasses
import enum
import logging
import multiprocessing
import os
import signal
import uuid
from collections import deque
from datetime import datetime
from typing import Dict, List, Optional, Union, Sequence, TYPE_CHECKING, Deque
import aiomultiprocess
import setproctitle
import zmq.error
from palaestrai.version import __version__
from palaestrai.core.protocol import (
ExperimentRunShutdownRequest,
ExperimentRunShutdownResponse,
ExperimentRunStartRequest,
ExperimentRunStartResponse,
)
from palaestrai.util import LogServer, spawn_wrapper
from palaestrai.core import MajorDomoBroker, MajorDomoClient, RuntimeConfig
from palaestrai.types import ExperimentRunInstanceStatus
if TYPE_CHECKING:
import palaestrai.experiment
from palaestrai.store.receiver import StoreReceiver
LOG = logging.getLogger(__name__)
[docs]
class ExecutorState(enum.Enum):
PRISTINE = 0
INITIALIZED = 1
RUNNING = 2
SHUTDOWN = 3
EXITED = 4
SIGINT = 5
SIGABRT = 6
SIGTERM = 7
@dataclasses.dataclass
class _RunGovernorPCB:
"""Simple Process Control Block for RunGovernor control."""
run_governor_id: str
started_at: datetime
run_governor_process: aiomultiprocess.Process
experiment_run: palaestrai.experiment.ExperimentRun
experiment_run_id: Optional[str] = None
# Set once we have successfully persisted the ``RUNNING`` status for this
# run's experiment run instance. The instance row is created
# asynchronously by the StoreReceiver, so the write may have to be
# retried across a few main-loop iterations until the row exists.
instance_status_running_marked: bool = False
[docs]
class ExperimentRunStartError(RuntimeError):
def __init__(self, experiment_run_id, run_governor_id, message):
super().__init__(message)
self.experiment_run_id = experiment_run_id
self.run_governor_id = run_governor_id
self.message = message
[docs]
class DeadMajorDomoBrokerError(RuntimeError):
def __init__(self):
super().__init__()
[docs]
class InterruptSignal(RuntimeError):
def __init__(self):
super().__init__()
async def _execute_run_governor(uid: str):
"""Executes the ::`RunGovernor` main loop, catching errors
This is a wrapper function around that creates a ::`RunGovernor` and
handles ::`RunGovernor.run`. It takes care of clearing signal handlers,
setting the proctitle, and generally catching errors in a meaningful in
order to report it to the Executor without simply dying.
This method belongs to the ::`Executor` logically, but is not part of the
class in order to avoid serialization/deserialization of the whole Executor
each time a new ::`RunGovernor` process is spawned.
Parameters
----------
uid : str
UID of the new ::`RunGovernor`
broker_uri : str
URI of the ::`MajorDomoBroker` instance to connect to for further
communication
Return
------
Nothing.
"""
pid = os.getpid()
os.setpgrp()
try:
from .run_governor import RunGovernor
run_gov = RunGovernor(
uid=uid,
)
await run_gov.run() # type: ignore
except Exception as e:
LOG.critical("Execution of RunGovernor(uid=%s) failed: %s.", uid, e)
LOG.debug("Execution of RunGovernor(uid=%s) failed:", exc_info=e)
try:
os.killpg(pid, signal.SIGKILL)
except ProcessLookupError:
pass
raise e
[docs]
class Executor:
"""The executor is the entrypoint for every run execution.
The role of the executor is to receive new experiment runs and
distribute them to existing :class:`RunGovernor` instances. If
palaestrAI is used in local run mode, the executor will initialize
a :class:`RunGovernor`.
Furthermore, the Executor can stop running experiment runs.
Parameters
----------
parallel_runs : int, default: 1
Number of experiment runs the executor executes in
parallel.
is_service : bool, default: False
Controls the main-loop termination behaviour. When ``False`` (the
default), the executor behaves as it traditionally has: as soon as
there are no more scheduled runs and no active run governors, it
transitions to ``SHUTDOWN`` and ``execute()`` returns. When ``True``,
the executor is run as a long-lived *service* (as used by
``palaestrai serve``): it stays in the ``RUNNING`` state even when
idle, yielding control via ``asyncio.sleep(0.2)`` so that newly
scheduled runs (added through :meth:`schedule` at runtime) are picked
up. It only shuts down on an explicit :meth:`shutdown` call or on
receipt of SIGINT/SIGABRT/SIGTERM.
Notes
-----
At some point, when the core protocol has progressed far enough,
we will be able to run several experiments at once from an
executor. But, until we're sure we can, the public API accepts
only one experiment.
"""
def __init__(
self,
parallel_runs: int = 1,
is_service: bool = False,
command_channel=None,
):
# (At some point, when the core protocol has progressed far enough,
# we will be able to run several experiments at once from an executor.
# But, until we're sure we can, the public API accepts only one
# experiment.)
self._state: ExecutorState = ExecutorState.PRISTINE
self.is_service: bool = is_service
# Optional parent<->executor command channel (one end of a
# ``multiprocessing.Pipe``). When running as a service, the API
# parent process sends ``("SCHEDULE", experiment_run)`` commands over
# this connection; the executor drains them into ``_runs_scheduled``
# in its main loop. It is purely a liveness/optimization signal: the
# authoritative state lives in the database, so a dropped command at
# most delays a run, it never corrupts data. Defaults to ``None`` to
# keep the traditional, non-service behaviour fully backwards
# compatible.
self._command_channel = command_channel
self._store: Optional[StoreReceiver] = None
self._store_process: Optional[aiomultiprocess.Process] = None
self._messsage_storage_queue: Optional[multiprocessing.Queue] = None
self._client: Union[None, MajorDomoClient] = None
self._broker: Union[None, MajorDomoBroker] = None
self._broker_process: Union[None, aiomultiprocess.Process] = None
self._broker_ctrl = multiprocessing.Pipe()
self._num_parallel_runs = parallel_runs
self._run_governors: Dict[str, _RunGovernorPCB] = {}
self._runs_scheduled: Deque[palaestrai.experiment.ExperimentRun] = (
deque()
)
self._log_server = LogServer("127.0.0.1", RuntimeConfig().logger_port)
# Set default start method:
try:
multiprocessing.set_start_method(RuntimeConfig().fork_method)
aiomultiprocess.set_start_method(RuntimeConfig().fork_method)
except RuntimeError as e:
# When running nested or in tests, the context has already been
# set, so we cannot do this anymore.
# This is okay and not an error, we just need to roll with it.
LOG.debug("Cannot set start method: %s", e)
def _handle_signal_interrupt(self, signum):
"""Handle interrupting signals by notifying of the state change."""
LOG.info(
"palaestrAI executor has received signal %s, shutting down", signum
)
if self._state in {
ExecutorState.SIGINT,
ExecutorState.SIGABRT,
ExecutorState.SIGTERM,
}:
# We already received a signal to interrupt and are probably
# waiting for another process, but the user wants to exit in any
# case. Well, let's do it, then.
LOG.info("Killing all stale run governor processes...")
for pcb in self._run_governors.values():
try:
os.killpg(pcb.run_governor_process.pid, signal.SIGKILL)
except ProcessLookupError:
pass # Might've already ended in between - fine.
LOG.info(
"%s kills MajorDomoBroker process (PID: %s)",
self,
self._broker_process.pid,
)
self._broker_process.kill()
try:
_ = asyncio.get_running_loop().create_task(
self._broker_process.join(5)
)
except (asyncio.TimeoutError, multiprocessing.TimeoutError):
LOG.critical(
"%s could not kill MajorDomoBroker (pid: %s); "
"see if you have to kill it yourself. "
"Here is my spear: -->",
self,
self._broker_process.pid,
)
if self._store_process is not None:
LOG.info(
"%s kills StoreReceiver process (PID: %s)",
self,
self._store_process.pid,
)
self._store_process.kill()
try:
_ = asyncio.get_running_loop().create_task(
self._store_process.join(5)
)
except (asyncio.TimeoutError, multiprocessing.TimeoutError):
LOG.critical(
"%s could not kill StoreReceiver (pid: %s); "
"see if you have to kill it yourself. "
"Here is my spear: -->",
self,
self._store_process.pid,
)
else:
if signum not in {signal.SIGABRT, signal.SIGINT, signal.SIGTERM}:
return
old_state = self._state
self._state = {
signal.SIGINT.value: ExecutorState.SIGINT,
signal.SIGABRT.value: ExecutorState.SIGABRT,
signal.SIGTERM.value: ExecutorState.SIGTERM,
}[signum]
LOG.info(
"%s changed state from %s to %s.", self, old_state, self._state
)
def _init_signal_handler(self):
"""Sets handlers for interrupting signals in the event loop."""
signals = {signal.SIGINT, signal.SIGABRT, signal.SIGTERM}
LOG.debug(
"Executor(id=0x%x) registering signal handlers for signals %s.",
id(self),
signals,
)
loop = asyncio.get_running_loop()
for signum in signals:
loop.add_signal_handler(
signum, self._handle_signal_interrupt, signum
)
async def _init_logging(self):
"""Starts the log server and configures log filters"""
await self._log_server.start()
RuntimeConfig().logger_port = self._log_server.listen_port
def filter_record_above_debug(record: logging.LogRecord):
if record.levelname != "DEBUG":
return False
return True
for h in [h for h in logging.root.handlers if "debug" in h.name]:
h.addFilter(filter_record_above_debug)
# Service mode (``palaestrai serve``) automatically persists log
# records to a separate SQLite database. Because every subprocess
# forwards its records to this LogServer, which re-injects them into
# this process' loggers, a single handler on the root logger here
# captures the whole process tree. The handler itself only stores
# records that carry an ``experiment_run_instance_uid`` (stamped by
# the RunGovernor's MetadataLogFilter); everything else is dropped by
# the handler and only reaches stdout via the console handlers. This
# is NOT enabled for the normal ``experiment-start`` path.
if self.is_service:
from palaestrai.util.sqlite_log_handler import SQLiteLogHandler
if not any(
isinstance(h, SQLiteLogHandler) for h in logging.root.handlers
):
logging.root.addHandler(
SQLiteLogHandler(
db_uri=RuntimeConfig().log_store_uri,
level="INFO",
)
)
def _init_store(self):
LOG.info("Starting storage backend...")
# We load & init the storage backend in any case, even if it is
# disabled. If we don't, we create a queue that eats messages but
# is never able to release them.
from palaestrai.store.receiver import StoreReceiver
self._messsage_storage_queue = multiprocessing.Queue()
self._store = StoreReceiver(message_queue=self._messsage_storage_queue)
if not self._store._is_enabled:
LOG.info("Storage backend disabled by runtime configuration.")
if self._messsage_storage_queue is not None:
self._messsage_storage_queue.close()
self._messsage_storage_queue = None
self._store = None
return
self._store_process = aiomultiprocess.Process(
target=spawn_wrapper,
args=(
"StoreReceiver",
RuntimeConfig().to_dict(),
self._store.run,
),
name="Executor-StoreReceiver",
)
self._store_process.start()
def _init_communication(self):
"""Initialization of all core components"""
LOG.info("Starting Major Domo Broker...")
self._broker = MajorDomoBroker(
uri=None,
ctrl=self._broker_ctrl[1],
store_queue=self._messsage_storage_queue,
)
self._broker_process = aiomultiprocess.Process(
daemon=True,
target=spawn_wrapper,
args=(
"MajorDomoBroker",
RuntimeConfig().to_dict(),
self._broker.mediate,
),
name="Executor-MajorDomoBroker",
)
self._broker_process.start()
broker_uri = self._broker_ctrl[0].recv()
RuntimeConfig().broker_uri = broker_uri
LOG.info(
"Major Domo Broker is bound to %s.",
RuntimeConfig().broker_uri,
)
self._client = MajorDomoClient(RuntimeConfig().broker_uri)
@staticmethod
def _set_instance_status(instance_uid: str, status) -> bool:
"""Persist the lifecycle status of an experiment run instance.
This writes the given
:class:`~palaestrai.types.ExperimentRunInstanceStatus` to the
``experiment_run_instances`` row identified by ``instance_uid``. It is
the authoritative status writer used while running as a service: the
REST API only ever *reads* this column.
The instance row itself is created asynchronously by the
:class:`~palaestrai.store.receiver.StoreReceiver` (in a separate
process) once it sees the ``ExperimentRunStartRequest``. There is
therefore an inherent race when transitioning to ``RUNNING``: the row
may not exist yet. This method tolerates that gracefully -- it returns
``False`` if the row is not (yet) present or if the store is disabled,
and ``True`` on a successful update. It never raises, so a status
bookkeeping problem can never take down the executor's main loop.
Parameters
----------
instance_uid : str
UID of the :class:`ExperimentRunInstance` to update.
status : palaestrai.types.ExperimentRunInstanceStatus
The new status to persist.
Returns
-------
bool
``True`` if a row was found and updated, ``False`` otherwise.
"""
if not RuntimeConfig().store_uri:
return False
try:
from palaestrai.store import Session
from palaestrai.store import database_model as dbm
session = Session()
try:
instance = (
session.query(dbm.ExperimentRunInstance)
.filter(dbm.ExperimentRunInstance.uid == instance_uid)
.one_or_none()
)
if instance is None:
return False
instance.status = status
session.commit()
LOG.debug(
"Executor(id=0x%x) set status of "
"ExperimentRunInstance(uid=%s) to %s.",
id(Executor),
instance_uid,
status,
)
return True
finally:
session.close()
except Exception as e: # noqa: BLE001 -- never break the main loop
LOG.warning(
"Could not update status of ExperimentRunInstance(uid=%s) "
"to %s: %s. Continuing regardless.",
instance_uid,
status,
e,
)
return False
@staticmethod
def _join_process(process: aiomultiprocess.Process):
while process.is_alive():
try:
process.join(0.3)
except (asyncio.TimeoutError, multiprocessing.TimeoutError):
pass
[docs]
def experiment_runs(self) -> List[ExperimentRunRuntimeInformation]:
runs = list()
for run in self._runs_scheduled:
runs.append(ExperimentRunRuntimeInformation(experiment_run=run))
for pcb in self._run_governors.values():
runs.append(
ExperimentRunRuntimeInformation(
experiment_run=pcb.experiment_run,
started_at=pcb.started_at,
experiment_run_id=pcb.experiment_run_id,
)
)
return runs
async def _launch_run_governor(
self, experiment_run: palaestrai.experiment.ExperimentRun
) -> _RunGovernorPCB:
"""Launches a new :py:class:`RunGovernor` process
This method creates a new `aiomultiprocess.Process` and launches it,
returning the appropriate PCB object.
:rtype: _RunGovernorPCB
"""
run_gov_uid = str(uuid.uuid4())
run_gov_process = aiomultiprocess.Process(
target=spawn_wrapper,
args=(
f"palaestrAI[RunGovernor-{run_gov_uid[-6:]}",
RuntimeConfig().to_dict(),
_execute_run_governor,
[run_gov_uid],
),
)
run_gov_process.start()
pcb = _RunGovernorPCB(
experiment_run=experiment_run,
started_at=datetime.now(),
run_governor_id=run_gov_uid,
run_governor_process=run_gov_process,
)
LOG.debug(
"Launched new RunGovernor %s",
)
return pcb
async def _deploy_experiment_run(
self, experiment_run: palaestrai.experiment.ExperimentRun
):
"""Creates a :py:class:`RunGovernor` for a given experiment
run.
This method creates a new :py:class:`RunGovernor` and deploys
the provided :py:class:`ExperimentRun` to it, pending start.
The new :py:class:`RunGovernor` is started as a "python-
parallel" process. All information about the running process is
saved in the `self._run_governors` list, where we can also take
care to stop all run governors should we need to.
Parameters
----------
experiment_run: ExperimentRun
The experiment that should be executed.
Returns
-------
str
An opaque :py:class:`RunGovernor` identification string.
"""
pcb = await self._launch_run_governor(experiment_run)
self._run_governors[pcb.run_governor_id] = pcb
return pcb.run_governor_id
async def _start_experiment_run(self, run_governor_id):
"""Starts a previously deployed experiment run.
When the experiment run is deployed via
:py:meth:`_deploy_experiment_run`, the :py:class:`RunGovernor`
is ready, but does not yet start it. This is done through this
method, which sends the appropriate messages over the executor
communications bus.
Parameters
----------
run_governor_id: str
ID of the :class:`RunGovernor` that shall commence the
experiment run.
"""
pcb = self._run_governors[run_governor_id]
experiment_run_id = pcb.experiment_run.uid
msg = ExperimentRunStartRequest(
sender_executor_id=str(id(self)),
receiver_run_governor_id=run_governor_id,
experiment_run_id=experiment_run_id,
experiment_run=pcb.experiment_run,
)
LOG.info(
'Starting experiment run "%s". Our business is life itself',
experiment_run_id,
)
response = await self._client.send(run_governor_id, msg)
if isinstance(response, ExperimentRunStartResponse):
if response.successful is True:
pcb.experiment_run_id = experiment_run_id
LOG.debug(
"Executor(id=0x%x) received ExperimentRunStart"
"Response from RunGovernor(uid=%s) for "
"ExperimentRun(run_id=%s).",
id(self),
run_governor_id,
experiment_run_id,
)
else:
LOG.error("Could not start experiment run: %s", response.error)
raise ExperimentRunStartError(
experiment_run_id=experiment_run_id,
run_governor_id=run_governor_id,
message=response.error,
)
else:
LOG.error(
"Executor expected ExperimentRunStartResponse, but got "
"'%s' instead",
response,
)
raise TypeError
LOG.debug(
"Executor(id=0x%x) started experiment run (experiment_run_id=%s), "
"got ExperimentStartResponse(successful=%s).",
id(self),
experiment_run_id,
response.successful,
)
return experiment_run_id
[docs]
async def cancel(self, experiment_run_id):
"""Shuts an experiment run down prematurely.
This method sends a :py:class:`ExperimentShutdownRequest` to
the :py:class:`RunGovernor` responsible for executing the
associated experiment run. This allows for a graceful, yet
premature shutdown of a running experiment run. Normally,
experiment runs terminate when their termination condition is
met, so this method provides a way for an external user to
interfere with a running experiment.
Parameters
----------
experiment_run_id: str
UID of the experiment run to shut down.
"""
rg_dict_item = next(
filter(
lambda i: i[1].experiment_run_id == experiment_run_id,
self._run_governors.items(),
),
None,
)
run_governor_id = rg_dict_item[0]
if run_governor_id is None:
LOG.error(
"Executor (id=0x%x) cannot terminate ExperimentRun("
"run_id=%s): Cannot find RunGovernor",
id(self),
experiment_run_id,
)
return
LOG.debug(
"Executor (id=0x%x) sending ExperimentRunShutdownRequest for "
"ExperimentRun(run_id=%s).",
id(self),
experiment_run_id,
)
msg = ExperimentRunShutdownRequest(
sender_executor_id=str(id(self)),
receiver_run_governor_id=run_governor_id,
experiment_run_id=experiment_run_id,
)
response = await self._client.send(run_governor_id, msg)
LOG.debug("Executor(id=0x%x) received %s", id(self), response)
if not isinstance(response, ExperimentRunShutdownResponse):
return
if not response.successful:
# This may lead to an error when performed during an
# iteration over the run_governors:
# self._run_governors.pop(run_governor_uid, None)
raise RuntimeError(response.error)
[docs]
def schedule(
self,
experiment_run: Union[
palaestrai.experiment.ExperimentRun,
Sequence[palaestrai.experiment.ExperimentRun],
],
):
"""Schedules an experiment run to be executed.
This method schedules experiment runs, i.e., puts them in the
waiting queue. The main loop (started by ::`execute`) picks up
experiment run objects and executes them.
Parameters
----------
experiment_run : Union[palaestrai.experiment.ExperimentRun,
Sequence[palaestrai.experiment.ExperimentRun]]
One or many :class:`palaestrai.experiment.ExperimentRun` objects,
which are added to the queue.
"""
if isinstance(experiment_run, collections.abc.Sequence):
self._runs_scheduled.extend(experiment_run)
else:
self._runs_scheduled.append(experiment_run)
def _drain_command_channel(self):
"""Non-blockingly drains scheduling commands from the parent process.
When the executor runs as a service (see :class:`Executor`'s
``command_channel`` parameter), the API parent process enqueues new
runs by sending ``("SCHEDULE", experiment_run)`` tuples over the
command channel. This method polls that channel without blocking and
moves any pending runs into the scheduling deque so the main loop can
pick them up. It never raises: a broken channel is logged and ignored
(the run can always be re-scheduled), so command-channel trouble can
never take down the executor.
"""
if self._command_channel is None:
return
try:
while self._command_channel.poll():
command = self._command_channel.recv()
try:
verb, payload = command
except (TypeError, ValueError):
LOG.warning(
"Executor(id=0x%x) received malformed command %r; "
"ignoring.",
id(self),
command,
)
continue
if verb == "SCHEDULE":
LOG.info(
"Executor(id=0x%x) received SCHEDULE command for "
"ExperimentRun(uid=%s).",
id(self),
getattr(payload, "uid", "?"),
)
self.schedule(payload)
elif verb == "SHUTDOWN":
LOG.info(
"Executor(id=0x%x) received SHUTDOWN command.",
id(self),
)
self.shutdown()
else:
LOG.warning(
"Executor(id=0x%x) received unknown command verb "
"%r; ignoring.",
id(self),
verb,
)
except (EOFError, OSError) as e:
LOG.warning(
"Executor(id=0x%x) lost its command channel: %s. "
"Continuing without it.",
id(self),
e,
)
self._command_channel = None
except Exception as e: # noqa: BLE001 -- never break the main loop
LOG.warning(
"Executor(id=0x%x) could not drain command channel: %s. "
"Continuing regardless.",
id(self),
e,
)
[docs]
def shutdown(self):
"""Performs an oderly shutdown"""
# We need to discriminate via state. If we're currently running, but
# receive a call to "shutdown", then its coming from outside and we
# need to change the flag to have an orderly shutdown on the main loop
# in .run().
# Otherwise, we do the actual shutdown and cleanup.
if self._state.value > ExecutorState.RUNNING.value:
LOG.warning(
"Executor(id=0x%x) is already shuttdown down.", id(self)
)
return # We're already shutting down.
LOG.info("palaestrAI executor received command to shut down.")
self._runs_scheduled.clear()
self._state = ExecutorState.SHUTDOWN
async def _monitor_state(self):
known_state = self._state
while known_state.value == self._state.value:
try:
await asyncio.sleep(0.2)
except asyncio.CancelledError:
break
LOG.debug(
"Executor(id=0x%x) state changed from %s to %s",
id(self),
known_state,
self._state,
)
async def _check_major_domo_broker_state(self):
if (
self._state != ExecutorState.RUNNING
or self._broker_process.is_alive()
):
return
n_broker_checks = 0
if n_broker_checks == 3:
LOG.fatal(
"Executor(id=%s) lost the MajorDomoBroker, "
"committing seppuku.",
id(self),
)
raise DeadMajorDomoBrokerError()
else:
wait_time = 1 + n_broker_checks
LOG.debug(
"MajorDomoBroker is not yet online, waiting for" "%s seconds",
wait_time,
)
n_broker_checks += 1
await asyncio.sleep(wait_time)
async def _try_start_next_scheduled_experiment(self):
"""Starts all experiments that are scheduled, monitoring the start
All experiments that are currently scheduled via
::`Executor.schedule` are started. This method initializes a new
::`RunGovernor`, deploys the experiment to it, signals the
::`RunGovernor` to start the experiment and waits for positive
response to this ::`ExperimentStartRequest`. If no such response
arrives, the ::`RunGovernor` is killed and the experiment run
cancelled.
The method returns when all scheduled experiments have successfully
started. It does not keep track of the overall experimentation
process.
"""
LOG.debug(
"Executor(id=0x%x) checks whether to start new experiments.",
id(self),
)
if (
len(self._run_governors) >= self._num_parallel_runs
or len(self._runs_scheduled) == 0
):
return
experiment = self._runs_scheduled.pop()
try:
run_governor_id = await self._deploy_experiment_run(experiment)
start_experiment_run_task = asyncio.create_task(
self._start_experiment_run(run_governor_id)
)
process_monitor_task = asyncio.create_task(
self._run_governors[
run_governor_id
].run_governor_process.join()
)
tasks_done, _ = await asyncio.wait(
{
start_experiment_run_task,
process_monitor_task,
},
return_when=asyncio.FIRST_COMPLETED,
)
if (
start_experiment_run_task in tasks_done
and start_experiment_run_task.exception()
):
raise start_experiment_run_task.exception()
if process_monitor_task in tasks_done:
raise ExperimentRunStartError(
self._run_governors[run_governor_id].experiment_run_id,
run_governor_id,
"No ExperimentStartResponse received",
)
except (ExperimentRunStartError, asyncio.CancelledError) as e:
LOG.fatal(
"Executor(id=%s) "
"could not launch ExperimentRun(experiment_id=%s): "
"%s; killing associated RunGovernor(uid=%s).",
id(self),
experiment.uid,
e,
e.run_governor_id,
)
pcb = self._run_governors[e.run_governor_id]
# The run never made it past start-up: mark the corresponding
# experiment run instance as failed so the REST API reports
# ``ERROR`` rather than leaving it stuck at ``SCHEDULED``.
self._set_instance_status(
pcb.experiment_run.instance_uid,
ExperimentRunInstanceStatus.ERROR,
)
pcb.run_governor_process.terminate()
try:
await pcb.run_governor_process.join(3)
except (asyncio.TimeoutError, asyncio.CancelledError):
if pcb.run_governor_process.is_alive():
pcb.run_governor_process.kill()
await pcb.run_governor_process.join(3)
[docs]
async def execute(self):
"""Executes an experiment run.
This method starts the whole experiment run execution process.
It initializes the :class:`RunGovernor` for the experiment run
and sets up communication. This method returns only if:
1. The experiment run has terminated successfully;
2. an error has occurred, in which case an exception is raised;
3. the user has terminated the process (e.g., by hitting ^C).
Returns
-------
ExecutorState
The state the executor is now in, either "SHUTDOWN" if
everything exited normally, or one of the SIG* states if a
signal was received.
"""
self._init_signal_handler()
setproctitle.setproctitle("palaestrAI[Executor]")
await self._init_logging()
LOG.info("This is palaestrAI, version %s", __version__)
self._init_store()
self._init_communication()
self._state = ExecutorState.INITIALIZED
LOG.debug("Executor(id=0x%x) starting main execution loop", id(self))
self._state = ExecutorState.RUNNING
state_change_monitor_task = asyncio.create_task(self._monitor_state())
while self._state == ExecutorState.RUNNING:
# Pick up any runs the parent process asked us to schedule.
self._drain_command_channel()
try:
await self._check_major_domo_broker_state()
except DeadMajorDomoBrokerError:
self._state = ExecutorState.SHUTDOWN
continue
start_experiment_task = asyncio.create_task(
self._try_start_next_scheduled_experiment()
)
tasks_done, tasks_pending = await asyncio.wait(
{state_change_monitor_task, start_experiment_task},
return_when=asyncio.FIRST_COMPLETED,
)
if (
state_change_monitor_task in tasks_done
and start_experiment_task in tasks_pending
):
start_experiment_task.cancel()
LOG.debug(
"Executor(id=0x%x) checks for finished RunGovernors",
id(self),
)
for pcb in self._run_governors.values():
# An alive run governor whose RUNNING status we have not yet
# persisted: try (again) to do so. The instance row is created
# asynchronously by the StoreReceiver, so the very first
# attempts right after start may find no row yet; we simply
# retry on subsequent loop iterations until it succeeds.
if (
not pcb.instance_status_running_marked
and pcb.run_governor_process.is_alive()
):
if self._set_instance_status(
pcb.experiment_run.instance_uid,
ExperimentRunInstanceStatus.RUNNING,
):
pcb.instance_status_running_marked = True
try:
await pcb.run_governor_process.join(0.1)
except (asyncio.TimeoutError, TimeoutError):
pass # This is actually expected in most cases.
if pcb.run_governor_process.is_alive():
continue
# The run governor has exited. Record the terminal status of
# the associated experiment run instance: a clean exit (code 0)
# means the run FINISHED; any other exit code is an ERROR.
if pcb.run_governor_process.exitcode != 0:
LOG.error("RunGovernor process %s died.", pcb)
self._set_instance_status(
pcb.experiment_run.instance_uid,
ExperimentRunInstanceStatus.ERROR,
)
else:
self._set_instance_status(
pcb.experiment_run.instance_uid,
ExperimentRunInstanceStatus.FINISHED,
)
self._run_governors = {
uid: pcb
for uid, pcb in self._run_governors.items()
if pcb.run_governor_process.is_alive()
}
LOG.debug(
"Executor(id=0x%x) has %d active run governors and %d "
"experiment runs scheduled.",
id(self),
len(self._run_governors),
len(self._runs_scheduled),
)
if (
self._state == ExecutorState.RUNNING
and len(self._run_governors) + len(self._runs_scheduled) == 0
):
if self.is_service:
# Service mode: stay online even when idle. Yield control
# so that runs scheduled at runtime (e.g., via the REST
# API) get picked up, and so we don't busy-spin. We only
# leave RUNNING on an explicit shutdown() or a signal.
await asyncio.sleep(0.2)
else:
self._state = ExecutorState.SHUTDOWN
state_change_monitor_task.cancel()
await self._shutdown()
return self._state
async def _shutdown_all_run_governors(self):
"""Shuts all ::`RunGovernor` instances down, cleaning up
This method terminates all running ::`RunGovernor` instances. They
get a nice message first, but are forcefully terminated if they don't
react. Also, the internal data structures of the ::`Executor` are
cleaned up.
"""
for run_gov_uid in self._run_governors:
pcb = self._run_governors[run_gov_uid]
if not pcb.run_governor_process.is_alive():
continue
LOG.info(
"Signalling RunGovernor for experiment run '%s' to shut down.",
pcb.experiment_run_id,
)
if pcb.experiment_run_id:
try:
await asyncio.wait_for(
self.cancel(pcb.experiment_run_id), timeout=5
)
except asyncio.TimeoutError:
LOG.debug(
"Executor(id=%s) has encountered a "
"RunGovernor(uid=%s, run_id=%s) "
"that seems to be still active, trying to abort. "
"Did you hit ^C? Oh, you bad boy...",
id(self),
pcb.run_governor_id,
pcb.experiment_run_id,
)
LOG.info(
"Sending SIGTERM to process group %d...",
pcb.run_governor_process.pid,
)
pgpid = pcb.run_governor_process.pid
try:
os.killpg(pgpid, signal.SIGTERM)
except ProcessLookupError:
pass # Already down - ok, don't race here.
try:
await pcb.run_governor_process.join(3)
except asyncio.TimeoutError:
pass # Okay, we send SIGKILL anyways...
try:
# Zombies will be shot:
os.killpg(pgpid, signal.SIGKILL)
await pcb.run_governor_process.join(3)
except (asyncio.TimeoutError, ProcessLookupError):
pass # Just a shot in the dark.
async def _shutdown(self):
"""Conducts an orderly shutdown."""
LOG.info("palaestrAI executor initiating shutdown procedure.")
await self._shutdown_all_run_governors()
try:
await self._client.destroy()
except zmq.error.ZMQError:
# This can happen on ^C. It's actually not that bad, so we just
# do a debug log entry here.
LOG.debug(
"%s could not send destroy message via MajorDomoClient to "
"MajorDomoBroker; ignoring that anyways and dragging "
"on.",
self,
)
self._broker_process.terminate()
while self._broker_process.is_alive():
try:
await self._broker_process.join(1)
except asyncio.TimeoutError:
await asyncio.sleep(0.05)
LOG.info("Major domo broker has shut down.")
if self._store_process is not None:
while self._store_process.is_alive():
try:
await self._store_process.join(1)
except asyncio.TimeoutError:
LOG.info("Waiting for the storage backend to shut down...")
await asyncio.sleep(0.05)
if self._messsage_storage_queue is not None:
self._messsage_storage_queue.close()
LOG.info("Storage backend has shut down.")
if self._state == ExecutorState.SHUTDOWN:
self._state = ExecutorState.EXITED
await self._log_server.stop()
def __str__(self):
return "Executor(id=%s)" % id(self)