palaestrai.core package#

Submodules#

palaestrai.core.MDP module#

Majordomo Protocol definitions

palaestrai.core.MDP.commands = [None, b'READY', b'REQUEST', b'REPLY', b'HEARTBEAT', b'DISCONNECT', b'DESTROY']#

C_CLIENT = b’MDPC01’

# This is the version of MDP/Worker we implement W_WORKER = b’MDPW01’

# MDP/Server commands, as strings W_READY = b’’ W_REQUEST = b’’ W_REPLY = b’’ W_HEARTBEAT = b’’ W_DISCONNECT = b’’

commands = [None, b’READY’, b’REQUEST’, b’REPLY’, b’HEARTBEAT’, b’DISCONNECT’]

palaestrai.core.basic_state module#

class palaestrai.core.basic_state.BasicState(value)[source]#

Bases: Enum

Basis for submodule states

This enumeration is the basis for most submodules’ states. It covers their basic live cycle.

CANCELLED = 6#
ERROR = 7#
FINISHED = 5#
INITIALIZED = 2#
INITIALIZING = 1#
PRISTINE = 0#
RUNNING = 3#
SIGABRT = 9#
SIGINT = 8#
SIGTERM = 10#
STOPPING = 4#

palaestrai.core.major_domo_broker module#

class palaestrai.core.major_domo_broker.MajorDomoBroker(uri=None)[source]#

Bases: object

Distributes messages between clients and workers according to services

This Major Domo Protocol broker is a minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8.

HEARTBEAT_EXPIRY = 7500#
HEARTBEAT_INTERVAL = 2500#
HEARTBEAT_LIVENESS = 3#
INTERNAL_SERVICE_PREFIX = b'mmi.'#
bind(endpoint)[source]#

Bind broker to endpoint, can call this multiple times.

We use a single socket for both clients and workers.

async dispatch(service, msg)[source]#

Dispatch requests to waiting workers as possible

async mediate()[source]#

Mediation loop for message distribution

This method is an infinite loop that receives and distributes messages received from clients and workers.

async process_client(sender, msg)[source]#

Process a request coming from a client.

async purge_workers()[source]#

Look for & kill expired workers.

require_worker(address) Worker[source]#

Finds the worker (creates if necessary).

async send_heartbeats()[source]#

Send heartbeats to idle workers if neccessary.

This method checks for the current time elapsed being past the next designated checkpoint, and if yes, sends a heartbeat message to all workers.

async send_to_worker(worker, command, option, msg=None)[source]#

Send message to worker.

If message is provided, sends that message.

async service_internal(service, msg)[source]#

Handle internal service according to 8/MMI specification

async worker_waiting(worker)[source]#

This worker is now waiting for work.

class palaestrai.core.major_domo_broker.Service(name)[source]#

Bases: object

A single service managed by this broker

class palaestrai.core.major_domo_broker.Worker(identity, address, lifetime)[source]#

Bases: object

Represents an external Worker in the major domo protocol.

The major domo protocol knows clients (who send out requests) and workers, who process these requests. This internal data structure represents such an external worker and is used for bookkeeping.

palaestrai.core.major_domo_client module#

“Majordomo Protocol Client API, Python version. Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7. Author: Min RK <benjaminrk@gmail.com> Based on Java example by Arkadiusz Orzechowski

class palaestrai.core.major_domo_client.MajorDomoClient(broker_uri: str)[source]#

Bases: object

Client object for distributing tasks to workers and receiving results

The major domo protocol client is the initiator of tasks. It sends messages that are distributed according to a service name to worksers. The main method of this class, MajorDomoClient.send(), sends such a request and waits for the reply of the corresponding worker.

Major domo clients strictly adhere to the request-response pattern: A client always awaits for a reply when sending a request.

Majordomo Protocol Client API, Python version. Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

async destroy()[source]#
reconnect_to_broker()[source]#

Connect or reconnect to broker

Attempts to connect to the broker. This method can be called multiple times: For initial connection, or for reconnection.

async send(service, request)[source]#

Send request to broker and get reply by hook or crook.

Takes ownership of request message and destroys it when sent. Returns the reply message or None if there was no reply.

palaestrai.core.major_domo_worker module#

Majordomo Protocol Worker API, Python version Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7. Author: Min RK <benjaminrk@gmail.com> Based on Java example by Arkadiusz Orzechowski

class palaestrai.core.major_domo_worker.MajorDomoWorker(broker_uri: str, service)[source]#

Bases: object

A task receiver with heartbeats

The Major Domo Worker connects to a broker and listens to requests to designated service. Thiese it picks up, returns them to the calling loop, and also sends replies. The worker has a separate heartbeat to find out whether the connection to the broker has become stale.

This is the Major Domo Protocol Worker API, Python version. Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

DEFAULT_HEARTBEAT_DELAY = 2500#
HEARTBEAT_LIVENESS = 3#
disconnect()[source]#

Disconnects the worker from the broker.

reconnect = 2500#
timeout = 2500#
async transceive(reply=None, skip_recv=False)[source]#

Send and receive main method of the major domo worker

This method does it both: First, it sends a reply to the last message that was recived — the major domo worker keeps track of that —, then it waits for the next message. This is the normal operation.

There can be exceptions, which are only sensible for the first or the last message. One can skip sending a reply, which is sensible for the very first message (reply=None), or one can skip the receiving part (skip_recv=True), which is sensible when the parent object of the worker shuts down and sends its last ACK.

Parameters:
  • reply – The reply to send to the last message received.

  • skip_recv – If True, the worker will only send and skip the receiving part.

Returns:

A message body in serialized form

palaestrai.core.runtime_config module#

class palaestrai.core.runtime_config.DebugLogFilter[source]#

Bases: object

Only allows debug messages; works as a filter for the debug_handler

palaestrai.core.runtime_config.RuntimeConfig()[source]#

palaestrai.core.serialisation module#

palaestrai.core.serialisation.deserialize(response)[source]#
palaestrai.core.serialisation.serialize(request)[source]#

palaestrai.core.zhelpers module#

Helper module for example applications. Mimics ZeroMQ Guide’s zhelpers.h.

palaestrai.core.zhelpers.dump(msg_or_socket)[source]#

Receives all message parts from socket, printing each frame neatly

palaestrai.core.zhelpers.pprint_message(msg)[source]#

Pretty-prints a message to a string.

Parameters:

msg – A message of the palaestrai zmq core protocol

Returns:

The pretty-printed message

palaestrai.core.zhelpers.set_id(zsocket)[source]#

Set simple random printable identity on socket

palaestrai.core.zhelpers.socket_set_hwm(socket, hwm=-1)[source]#

libzmq 2/3/4 compatible sethwm

palaestrai.core.zhelpers.zpipe(ctx)[source]#

build inproc pipe for talking to threads mimic pipe used in czmq zthread_fork. Returns a pair of PAIRs connected via inproc

Module contents#