Recently, I’ve begun exploring the depths of ZeroMQ, and the pyzmq bindings. This post is not an introduction to ZeroMQ, but for a basic rundown the “0MQ in A Hundred Words” blurb from the project site suffices:
ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pubsub, task distribution, and request-reply. It’s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.
For more detail I highly recommend reading 0MQ, The Guide.
As I was going over the various python code examples at github, I became interested in the taskvent / tasksink / taskwork examples. The pattern was recognizable as one I often use for distributed processing. In the past, I typically would have implemented such a work flow using the python multiprocessing library, using it’s Queue class to communicate between processes. Recently I’ve implemented several data processing pipelines for work using the same technique, but using zeromq channels for communication, and I’ve been extremely pleased with both the performance, and the ease of use. So, I decided to write a short blog post with a simple example for others working on the same sorts of problems.
For the example, I’ve implemented a small distributed system that calculates the squares of a series of numbers, based on the python examples. The pieces are as follows:
ventilator
- The ventilator sends messages containing the numbers to be squared.
- Uses a ZMQ_PUSH socket to send messages to workers.
workers
- The workers receive messages from the ventilator, do the work, and send the results down the pipe.
- Uses a ZMQ_PUSH socket to send answers to the results manager.
- Uses a ZMQ_SUB socket to receive the FINISH message from the results manager.
results manager
- The results manager receives all answers from all workers, prints them, and sends a message to the workers to shut down when all tasks are complete.
- Uses a ZMQ_PULL socket to receive answers from the workers.
- Uses a ZMQ_PUB socket to send the FINISH message to the workers.
A diagram of the basic flow:
So, without further ado, let’s look at the code!
First, we import zmq, time, and the multiprocessing Process class:
import time import zmq from multiprocessing import Process
ventilator
def ventilator():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to send work
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
# Give everything a second to spin up and connect
time.sleep(1)
# Send the numbers between 1 and ten thousand as work messages
for num in range(10000):
work_message = { 'num' : num }
ventilator_send.send_json(work_message)
time.sleep(1)
worker
def worker(wrk_num):
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive work from the ventilator
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
# Set up a channel to send result of work to the results reporter
results_sender = context.socket(zmq.PUSH)
results_sender.connect("tcp://127.0.0.1:5558")
# Set up a channel to receive control messages over
control_receiver = context.socket(zmq.SUB)
control_receiver.connect("tcp://127.0.0.1:5559")
control_receiver.setsockopt(zmq.SUBSCRIBE, "")
# Set up a poller to multiplex the work receiver and control receiver channels
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
poller.register(control_receiver, zmq.POLLIN)
# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll())
# If the message came from work_receiver channel, square the number
# and send the answer to the results reporter
if socks.get(work_receiver) == zmq.POLLIN:
work_message = work_receiver.recv_json()
product = work_message['num'] * work_message['num']
answer_message = { 'worker' : wrk_num, 'result' : product }
results_sender.send_json(answer_message)
# If the message came over the control channel, shut down the worker.
if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv()
if control_message == "FINISHED":
print("Worker %i received FINSHED, quitting!" % wrk_num)
break
results manager
def result_manager():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive results
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5558")
# Set up a channel to send control commands
control_sender = context.socket(zmq.PUB)
control_sender.bind("tcp://127.0.0.1:5559")
for task_nbr in range(10000):
result_message = results_receiver.recv_json()
print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])
# Signal to all workers that we are finsihed
control_sender.send("FINISHED")
time.sleep(5)
And away we go…
if __name__ == "__main__":
# Create a pool of workers to distribute work to
worker_pool = range(10)
for wrk_num in range(len(worker_pool)):
Process(target=worker, args=(wrk_num,)).start()
# Fire up our result manager...
result_manager = Process(target=result_manager, args=())
result_manager.start()
# Start the ventilator!
ventilator = Process(target=ventilator, args=())
ventilator.start()
The code from this example is available at: https://github.com/taotetek/blog_examples
You may have noticed that I am using the tcp transport for communication between my processes. This is what I personally find the most exciting about ZeroMQ: if you use ZeroMQ for communication between processes in a multiprocess system, it is close to trivial to scale the code to run on multiple servers.
This only begins to scratch the surface of ZeroMQ. ZeroMQ supports multiple transports and many more topologies, and the library is available for multiple languages. This is a fantastic tool for both distributed computing, and polygot programming.



Comments
FYI, the worker thread has lost its indentation and the break call results in an error.
The indentation in the Github file is correct, https://github.com/taotetek/blog_examples/blob/master/python_multiprocessing_with_zeromq/workqueue_example.py
Thanks for the heads up, I’ll fix the example on the site once I get a few spare minutes!
Hi Brian,
Running the code with Python 2.7 (32 and 64-bit), gevent 0.13.3, greenlet 0.3.1 and pyzmq 2.0.10.1 (although thats not important) results in the following error:
Traceback (most recent call last):
File “C:\Users\adam\workspace\python\ComPy\src\main.py”, line 125, in
result_manager.start()
File “C:\Program Files\Python27\lib\multiprocessing\process.py”, line 104, in start
self._popen = Popen(self)
File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 244, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 167, in dump
ForkingPickler(file, protocol).dump(obj)
File “C:\Program Files\Python27\lib\pickle.py”, line 224, in dump
self.save(obj)
File “C:\Program Files\Python27\lib\pickle.py”, line 331, in save
self.save_reduce(obj=obj, *rv)
File “C:\Program Files\Python27\lib\pickle.py”, line 419, in save_reduce
save(state)
File “C:\Program Files\Python27\lib\pickle.py”, line 286, in save
f(self, obj) # Call unbound method with explicit self
File “C:\Program Files\Python27\lib\pickle.py”, line 649, in save_dict
self._batch_setitems(obj.iteritems())
File “C:\Program Files\Python27\lib\pickle.py”, line 681, in _batch_setitems
save(v)
File “C:\Program Files\Python27\lib\pickle.py”, line 286, in save
f(self, obj) # Call unbound method with explicit self
File “C:\Program Files\Python27\lib\pickle.py”, line 753, in save_global
(obj, module, name))
pickle.PicklingError: Can’t pickle : it’s not the same object as __main__.result_manager
Traceback (most recent call last):
File “”, line 1, in
File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 347, in main
self = load(from_parent)
File “C:\Program Files\Python27\lib\pickle.py”, line 1378, in load
return Unpickler(file).load()
File “C:\Program Files\Python27\lib\pickle.py”, line 858, in load
dispatch[key](self)
File “C:\Program Files\Python27\lib\pickle.py”, line 880, in load_eof
raise EOFError
EOFError
It seems that the main function is getting confused by the multiple use of “result_manager” and “ventilator” as both a function and an object.
For example:
# Fire up our result manager…
result_manager = Process(target=result_manager, args=())
result_manager.start()
# Start the ventilator!
ventilator = Process(target=ventilator, args=())
ventilator.start()
The resolution is to give them different names. I’ve changed the functions to have _func after them. This resolves the issues.
Thanks for the heads up, I’ll fix the example once I get a few spare minutes!
I’ve been playing with similar ideas – one issue that I have is that the ventilator and result manager have to have a shared knowledge of how much work has to be done (10000 messages in this case), which is dissatisfying – ideally, I’d like the ventilator to signal when it’s through sending data, but I don’t see how to coordinate that properly …
Cheers,
Tim
I haven’t had time to whip up an example – but you can solve this by setting up a direct communication channel between the ventilator and the results manager. The ventilator can then tell the results manager how much work it should be expecting total, etc
Trackbacks
[...] Python Multiprocessing with ZeroMQ (tags: python zeromq programming) [...]
[...] Good example of ZMQ with Python and multiple processes. [...]