Source code for palaestrai.core.major_domo_client

""""Majordomo Protocol Client API, Python version.
Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
Author: Min RK <benjaminrk@gmail.com>
Based on Java example by Arkadiusz Orzechowski
"""

import logging

import zmq
import zmq.asyncio

from . import MDP
from .runtime_config import RuntimeConfig
from .serialisation import deserialize, serialize
from .zhelpers import ContextGuard

LOG = logging.getLogger(__name__)


[docs] class MajorDomoClient: """Client object for distributing tasks to workers and receiving results The major domo protocol client is the initiator of tasks. It sends messages that are distributed according to a service name to worksers. The main method of this class, :py:func:`MajorDomoClient.send`, sends such a request and waits for the reply of the corresponding worker. Major domo clients strictly adhere to the request-response pattern: A client always awaits for a reply when sending a request. Majordomo Protocol Client API, Python version. Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7. """ def __init__(self, broker_uri: str): self._socket = None self._broker_uri = broker_uri self._poller = zmq.asyncio.Poller() self.timeout = RuntimeConfig().major_domo_client_timeout self.retries = RuntimeConfig().major_domo_client_retries
[docs] def reconnect_to_broker(self): """Connect or reconnect to broker Attempts to connect to the broker. This method can be called multiple times: For initial connection, or for reconnection. """ if self._socket: self._poller.unregister(self._socket) self._socket.close() self._socket = zmq.asyncio.Socket( context=ContextGuard.asyncio_context(), socket_type=zmq.REQ ) # self.ctx.socket(zmq.REQ) self._socket.linger = 0 self._socket.connect(self._broker_uri) self._poller.register(self._socket, zmq.POLLIN) LOG.debug( "MajorDomoClient(id=%s) connected to MajorDomoBroker(uri=%s)", id(self), self._broker_uri, )
[docs] async def destroy(self): if not self._socket: self.reconnect_to_broker() request = [MDP.C_CLIENT, MDP.W_DESTROY] LOG.debug( "MajorDomoClient(id=%s) sending destroy message " "to MajorDomoBroker(uri=%s)", id(self), self._broker_uri, ) await self._socket.send_multipart(request)
[docs] async def send(self, service, request): """Send request to broker and get reply by hook or crook. Takes ownership of request message and destroys it when sent. Returns the reply message or None if there was no reply. """ assert not isinstance(request, bytes) request = serialize(request) if not isinstance(service, bytes): service = bytes(str(service), "ascii") if not self._socket: self.reconnect_to_broker() if not isinstance(request, list): request = [request] request = [MDP.C_CLIENT, service] + request retries = self.retries while retries > 0: LOG.debug( "MajorDomoClient(id=%s) sending request to service '%s'", id(self), service, ) await self._socket.send_multipart(request) try: items = await self._poller.poll(self.timeout) except KeyboardInterrupt: return None except SystemExit: return None if items: msg = await self._socket.recv_multipart() LOG.debug("MajorDomoClient(id=%s) received reply", id(self)) if len(msg) < 3: LOG.error( "MajorDomoClient(id=0x%x) received message " "of length = %d, should be >= 3; ignoring", id(self), len(msg), ) break header = msg.pop(0) if header != MDP.C_CLIENT: LOG.error( "MajorDomoClient(id=0x%x) received message " "with header type '%s', but expected '%s'; ignoring", id(self), header, MDP.C_CLIENT, ) break reply_service = msg.pop(0) if service != reply_service: LOG.error( "MajorDomoClient(id=0x%x) received reply " "for service '%s', but waited for service '%s'; " "returning for the caller to enjoy the unwanted gift", id(self), service, reply_service, ) return deserialize(msg) else: if retries: LOG.warning( "MajorDomoClient(id=%s) " "received no response from service '%s' " "for message %s " "after %s of %s attempt(s) with timeout %s, " "reconnecting and retrying", id(self), service, str(request), self.retries - retries, self.retries, self.timeout, ) self.reconnect_to_broker() else: LOG.error( "MajorDomoClient(id=0x%x) suffering from permanent " "connection error: Tried service '%s' %s times, " "but got no response for request %s, " "abondoning this message", id(self), service, self.retries, str(request), ) break retries -= 1 return None