Source code for palaestrai.core.zhelpers

# encoding: utf-8
"""
Helper module for example applications. Mimics ZeroMQ Guide's zhelpers.h.
"""

from __future__ import print_function

import binascii
import os
import socket
from random import randint

import zmq
import zmq.asyncio


[docs] def socket_set_hwm(socket, hwm=-1): """libzmq 2/3/4 compatible sethwm""" try: socket.sndhwm = socket.rcvhwm = hwm except AttributeError: socket.hwm = hwm
[docs] def pprint_message(msg): """Pretty-prints a message to a string. :param msg: A message of the palaestrai zmq core protocol :return: The pretty-printed message """ def str_or_hex(p): try: return p.decode("ascii") except UnicodeDecodeError: return r"0x%s" % binascii.hexlify(p).decode("ascii") return "\n".join( ["[%03d] %s" % (len(p), str_or_hex(p)) for p in [x for x in msg]] )
[docs] def dump(msg_or_socket): """Receives all message parts from socket, printing each frame neatly""" if isinstance(msg_or_socket, zmq.Socket): # it's a socket, call on current message msg = msg_or_socket.recv_multipart() else: msg = msg_or_socket print("----------------------------------------") for part in msg: print("[%03d]" % len(part), end=" ") is_text = True try: print(part.decode("ascii")) except UnicodeDecodeError: print(r"0x%s" % (binascii.hexlify(part).decode("ascii")))
[docs] def set_id(zsocket): """Set simple random printable identity on socket""" identity = "%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000)) zsocket.setsockopt_string(zmq.IDENTITY, identity)
[docs] def zpipe(ctx): """build inproc pipe for talking to threads mimic pipe used in czmq zthread_fork. Returns a pair of PAIRs connected via inproc """ a = ctx.socket(zmq.PAIR) b = ctx.socket(zmq.PAIR) a.linger = b.linger = 0 a.hwm = b.hwm = 1 iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) a.bind(iface) b.connect(iface) return a, b
class _ContextGuard: """Guards ZMQ contexts: Creates exactly one per process This guard class makes sure that only ever one ZMQ Context is created per process. It automagically creates a new one when there's a need to (e.g., after fork()), but returns the current one otherwise. """ def __init__(self): self._id = None self._context = None self._asyncio_context = None @staticmethod def _generate_id(): return "%s@%s" % (os.getpid(), socket.gethostname()) def _update_contexts(self): fresh_id = self._generate_id() if not self._context or not self._id or self._id != fresh_id: self._id = fresh_id self._context = zmq.Context() self._asyncio_context = zmq.asyncio.Context() def context(self): self._update_contexts() return self._context def asyncio_context(self): self._update_contexts() return self._asyncio_context ContextGuard = _ContextGuard()