[docs]defpprint_message(msg):"""Pretty-prints a message to a string. :param msg: A message of the palaestrai zmq core protocol :return: The pretty-printed message """defstr_or_hex(p):try:returnp.decode("ascii")exceptUnicodeDecodeError:returnr"0x%s"%binascii.hexlify(p).decode("ascii")return"\n".join(["[%03d] %s"%(len(p),str_or_hex(p))forpin[xforxinmsg]])
[docs]defdump(msg_or_socket):"""Receives all message parts from socket, printing each frame neatly"""ifisinstance(msg_or_socket,zmq.Socket):# it's a socket, call on current messagemsg=msg_or_socket.recv_multipart()else:msg=msg_or_socketprint("----------------------------------------")forpartinmsg:print("[%03d]"%len(part),end=" ")is_text=Truetry:print(part.decode("ascii"))exceptUnicodeDecodeError:print(r"0x%s"%(binascii.hexlify(part).decode("ascii")))
[docs]defset_id(zsocket):"""Set simple random printable identity on socket"""identity="%04x-%04x"%(randint(0,0x10000),randint(0,0x10000))zsocket.setsockopt_string(zmq.IDENTITY,identity)
[docs]defzpipe(ctx):"""build inproc pipe for talking to threads mimic pipe used in czmq zthread_fork. Returns a pair of PAIRs connected via inproc """a=ctx.socket(zmq.PAIR)b=ctx.socket(zmq.PAIR)a.linger=b.linger=0a.hwm=b.hwm=1iface="inproc://%s"%binascii.hexlify(os.urandom(8))a.bind(iface)b.connect(iface)returna,b
class_ContextGuard:"""Guards ZMQ contexts: Creates exactly one per process This guard class makes sure that only ever one ZMQ Context is created per process. It automagically creates a new one when there's a need to (e.g., after fork()), but returns the current one otherwise. """def__init__(self):self._id=Noneself._context=Noneself._asyncio_context=None@staticmethoddef_generate_id():return"%s@%s"%(os.getpid(),socket.gethostname())def_update_contexts(self):fresh_id=self._generate_id()ifnotself._contextornotself._idorself._id!=fresh_id:self._id=fresh_idself._context=zmq.Context()self._asyncio_context=zmq.asyncio.Context()defcontext(self):self._update_contexts()returnself._contextdefasyncio_context(self):self._update_contexts()returnself._asyncio_contextContextGuard=_ContextGuard()