Source code for palaestrai.core.major_domo_worker

"""Majordomo Protocol Worker API, Python version
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""

import logging

import time
import zmq
import zmq.asyncio

from . import MDP
from .serialisation import deserialize, serialize
from .zhelpers import pprint_message, ContextGuard

LOG = logging.getLogger(__name__)


[docs] class MajorDomoWorker: """A task receiver with heartbeats The Major Domo Worker connects to a broker and listens to requests to designated service. Thiese it picks up, returns them to the calling loop, and also sends replies. The worker has a separate heartbeat to find out whether the connection to the broker has become stale. This is the Major Domo Protocol Worker API, Python version. Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7. """ HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable DEFAULT_HEARTBEAT_DELAY = 2500 # Heartbeat delay, msecs reconnect = 2500 # Reconnect delay, msecs timeout = 2500 # poller timeout def __init__(self, broker_uri: str, service): self._socket = None self._broker_uri = broker_uri if isinstance(service, str): self._service = bytes(service, "ascii") elif isinstance(service, bytes): self._service = service else: self._service = bytes(str(service), "ascii") self._reply_to = None self._expect_reply = False self._poller = zmq.asyncio.Poller() self._liveness = 0 self._heartbeat_at = 0 self.heartbeat_delay = MajorDomoWorker.DEFAULT_HEARTBEAT_DELAY async def _connect(self): """Connect or reconnect to broker""" if self._socket: self._poller.unregister(self._socket) self._socket.close() self._socket = zmq.asyncio.Socket( context=ContextGuard.asyncio_context(), socket_type=zmq.DEALER ) self._socket.linger = 0 self._socket.connect(self._broker_uri) self._poller.register(self._socket, zmq.POLLIN) LOG.debug( "MajorDomoWorker(id=0x%x, service=%s, broker_uri=%s) " "connecting to MajorDomoBroker", id(self), self._service, self._broker_uri, ) # Register service with broker await self._send(MDP.W_READY, self._service, []) # If liveness hits zero, queue is considered disconnected self._liveness = self.HEARTBEAT_LIVENESS self._heartbeat_at = time.time() + 1e-3 * self.heartbeat_delay async def _send(self, command, option=None, msg=None): """Send message to broker. If no msg is provided, creates one internally """ if not self._socket: await self._connect() if msg is None: msg = [] elif not isinstance(msg, list): msg = [msg] if option: msg = [option] + msg LOG.debug( "MajorDomoWorker(id=0x%x, service=%s, uri=%s) " "sending to broker: %s", id(self), self._service, self._broker_uri, msg, ) msg = [b"", MDP.W_WORKER, command] + msg await self._socket.send_multipart(msg)
[docs] async def transceive(self, reply=None, skip_recv=False): """Send and receive main method of the major domo worker This method does it both: First, it sends a reply to the last message that was recived --- the major domo worker keeps track of that ---, then it waits for the next message. This is the normal operation. There can be exceptions, which are only sensible for the first or the last message. One can skip sending a reply, which is sensible for the very first message (`reply=None`), or one can skip the receiving part (`skip_recv=True`), which is sensible when the parent object of the worker shuts down and sends its last ACK. :param reply: The reply to send to the last message received. :param skip_recv: If `True`, the worker will only send and skip the receiving part. :return: A message body in serialized form """ LOG.debug( "MajorDomoWorker(id=0x%x, broker_uri=%s, service=%s) " "in recv(reply=%s, skip_recv=%s), starting to poll", id(self), self._broker_uri, self._service, reply, skip_recv, ) if reply is None and self._expect_reply: LOG.error( "MajorDomoWorker(id=%s, broker_uri=%s, service=%s) " "is expected to send a reply, but has none on recv; cowardly " "refusing to continue", id(self), self._broker_uri, self._service, ) return if reply: assert not isinstance(reply, bytes) reply = serialize(reply) if not isinstance(reply, list): reply = [reply] reply = [self._reply_to, b""] + reply await self._send(MDP.W_REPLY, msg=reply) self._expect_reply = True # We may recv only once without reply. if skip_recv: return while True: # Poll socket for a reply, with timeout try: LOG.debug( "MajorDomoWorker(id=0x%x, service=%s, broker_uri=%s) " "waiting for messages", id(self), self._service, self._broker_uri, ) items = await self._poller.poll(self.timeout) except KeyboardInterrupt: break # Interrupted if items: msg = await self._socket.recv_multipart() LOG.debug( "MajorDomoWorker(id=0x%x, service=%s, broker_uri=%s) " "received message from broker: %s", id(self), self._service, self._broker_uri, pprint_message(msg), ) self._liveness = self.HEARTBEAT_LIVENESS # Don't try to handle errors, just assert noisily if len(msg) < 3: LOG.error( "MajorDomoWorker(id=0x%x, service=%s, " "broker_uri=%s) " "of length = %d, should be >= 3; ignoring", id(self), self._service, self._broker_uri, len(msg), ) continue empty = msg.pop(0) if empty != b"": LOG.error( "MajorDomoWorker(id=0x%x, service=%s, " "broker_uri=%s) received message that " "violates the MDP: empty != ''", id(self), self._service, self._broker_uri, ) continue header = msg.pop(0) if header != MDP.W_WORKER: LOG.error( "MajorDomoWorker(id=0x%x, service=%s, " "broker_uri=%s) received header '%s', but expected" " '%s'; ignoring message", id(self), self._service, self._broker_uri, header, MDP.W_WORKER, ) continue command = msg.pop(0) if command == MDP.W_REQUEST: # We should pop and save as many addresses as there are # up to a null part, but for now, just save one... self._reply_to = msg.pop(0) # pop empty empty = msg.pop(0) assert empty == b"" return deserialize(msg) # We have a request to process elif command == MDP.W_HEARTBEAT: # Do nothing for heartbeats pass elif command == MDP.W_DISCONNECT: await self._connect() else: LOG.error( "MajorDomoWorker(id=0x%x, service=%s, " "broker_uri=%s) " "received invalid input message: %s", id(self), self._service, self._broker_uri, pprint_message(msg), ) else: self._liveness -= 1 if self._liveness <= 0: if self._socket: # Reconnect: sleep a bit LOG.warning( "MajorDomoWorker(id=0x%x, service=%s, uri=%s) " "disconnected from broker, reconnecting", id(self), self._service, self._broker_uri, ) try: time.sleep(1e-3 * self.reconnect) except KeyboardInterrupt: break await self._connect() # Send HEARTBEAT if it's time if time.time() > self._heartbeat_at: await self._send(MDP.W_HEARTBEAT) self._heartbeat_at = time.time() + 1e-3 * self.heartbeat_delay LOG.warning("Worker received interrupt, committing seppuku") return None
[docs] def disconnect(self): """Disconnects the worker from the broker.""" self._send(MDP.W_DISCONNECT)