Summer of Sockets part 3: Pub Sub With ZeroMQ

S

o far we have looked at PUSH/PULL and REQ/REP. One of the more interested socket combinations in ZeroMQ is PUB/SUB. PUB/SUB has multitude of real world applications in distributed systems, Ranging from remote Work Queues, Push notifications for real time web applications, inter-application communications, etc. Like other socket types found in ZeroMQ either each the pub and sub socket types can either bind or connect to an endpoint. It is really a matter of your applications use case. In general, the part of the application that will have the greatest up time would bind to a port and all others would connect.

There are two primary difference with the PUB/SUB sockets over the previously mentioned. Firstly, the publisher does not queue messages, even when their are no subscribers. It is very much a fire and forget kind of a socket. And secondly, the publisher will send each message to all connected subscribers who have subscribed to a matching message ( more on that later ). The code for this would looks something like this.

// pub_a.js
var zmq = require('zmq')
  , socket = zmq.socket( 'pub' )
  , endpoint = 'tcp://0.0.0.0:5101';
  , count = 0;

// binding is async
socket.bind(endpoint, function( err ){
    // send a periodic message
    setInterval( function(){
        console.log( 'sending %s', ++count )
        socket.send('hello world:pub_a.js:'+process.pid + ':' + (count))
    }, 300 )
})
// sub_a.js
var zmq = require('zmq')
  , socket = zmq.socket( 'sub' )
  , endpoint = 'tcp://0.0.0.0:5101';

// Add the event handler before we connect
socket.on('message', function( msg ){
    msg = msg.toString().split(':');
    var message = msg[0];
    var file    = msg[1];
    var pid     = msg[2];
    var count   = msg[3];
    console.log("message `%s` no. %s from %s[%s]", message, count, file, pid);

});
// subscribe too all messages
socket.subscribe('');

// connect
socket.connect( endpoint );

Very straight forward. The subscriber sockets connect, so you can start as many of these as you like. Either the publisher or subscriber can be started / stopped / restarted at any time, and in any order - ZMQ takes care connection negotiation for us, unlike the built in net sever and connection of Node.js. ZMQ socket are very fault tolerant and resilient. Another point of interest is that a single subscriber socket can, in fact, connect to multiple endpoint allowing it to receive discrete messages from multiple publishers by calling connect multiple times.

// pub_b.js
var zmq = require('zmq')
  , socket = zmq.socket( 'pub' )
  , endpoint = 'tcp://0.0.0.0:5102'
  , count = 0;

// bind to he endpoint
socket.bind(endpoint, function( err ){
	setInterval( function(){
		console.log( 'sending %s', ++count )
		socket.send('goodbye world:pub_b.js:'+process.pid + ':' + (count))
	}, 300 )
})
// sub_b.js
var zmq = require('zmq')
  , socket = zmq.socket( 'sub' )
  , endpointA = 'tcp://0.0.0.0:5101'
  , endpointB = 'tcp://0.0.0.0:5102';

// add the message event handler
socket.on('message', function( msg ){
    msg = msg.toString().split(':');
    var message = msg[0];
    var file    = msg[1];
    var pid     = msg[2];
    var count   = msg[3];
    console.log("message `%s` no. %s from %s[%s]", message, count, file, pid);
});

// subscribe to all messages
socket.subscribe('');

// connect to both endpoints
socket.connect( endpointA );
socket.connect( endpointB );

Now you can start up both publishers, pub_a.js, pub_b.js and the single subscriber, sub_b.js and the subscriber will start getting messages from both devices. This opens to door for very interesting application architectures and topologies where subscribers can connect to multiple data streams and make decisions based on incoming messages.

In the above example, although we can connect to multiple endpoints, the subscriber has to know all of the publisher endpoints that it wants to get messages from. More over, publishers have to bind, meaning their can only be one of each. Both of these points make adding additional or new publishers and bit of a pain. We are forced to modify N number of subscribers to keep up.

Fortunately, ZMQ has a simple solution in the XPUB / XSUB socket types. These socket types allow for subscription and message forwarding between two ends of an application. A little confusing at first, but you can think of it as a way to make PUB/SUB Hubs.  A simple device is made that creates an XPUB and an XSUB socket, both of which bind on a unique endpoint.

device [dee']vice -n, noun

  1. A piece of software that connects one or more sockets together to deliver messages
  2. An application

All these sockets do are send traffic from one to the other. Then N number of PUB sockets connect to the XSUB endpoint and N number of SUB sockets can connect to the XPUB endpoint. This makes adding new and different publishers and subscribers significantly easier. The only piece of this puzzle that needs to be durable is the dumb forwarding device

This is what makes applications build on top of ZeroMQ the cost of changing the application topology, adding new applications or increasing the number of pieces is low. More Importantly, the cost of building these applications is low because ZeroMQ eliminates the complexities of managing the low level networking layers.

// pubsubhub.js
var zmq = require('zmq')
  , xpub = zmq.socket('xpub')
  , xsub = zmq.socket('xsub')
  , xpendpoint = 'tcp://0.0.0.0:5101'
  , xsendpoint = 'tcp://0.0.0.0:5102';

// sub sends to pub
xsub.on('message', xpub.send.bind( xpub ));

// pub sends to sub
xpub.on('message', xsub.send.bind( xsub ));

// sit on some endpoints
xsub.bindSync( xsendpoint );
xpub.bindSync( xpendpoint );

The publisher code stays pretty much the same as it did before

// pub_c.js
var zmq = require('zmq')
  , socket = zmq.socket( 'pub' )
  , endpoint = 'tcp://0.0.0.0:5102'
  , count = 0;

//connect instead of bind
socket.connect(endpoint)
 setInterval( function(){
    console.log( 'sending %s', ++count )
    socket.send('goodbye world:pub_c.js:'+process.pid + ':' + (count))
 }, 150 )

There is a slight difference on how you subscribe to messages from an XPUB socket type. Instead of calling the subscribe method on the socket, you have to set a subscription option on the socket.

var zmq = require('zmq')
  , socket = zmq.socket( 'sub' )
  , endpoint = 'tcp://localhost:5101'

socket.on('message', function( msg ){
    msg = msg.toString().split(':');
    console.log(msg);
});
socket.connect( endpoint );

// subscribe to all of the messages
socket.setsockopt( zmq.ZMQ_SUBSCRIBE, new Buffer('') );

Both the publishers and subscribers can safely come and go with out disrupting the flow of data.  The forwarding device can also come and go. Of course messages will be lost, and your application should account for that if it is important to you. However, the hub being down doesn't bring down anything else.

Right now our subscribers are pretty generic as the all listen for everything. But changing the subscription names is easy enough and would allow you to create specialized types of subscribers. ZMQ subscription filtering is based on an explicit match on the first part of the message. Instead of subscribing to "", you could subscribe to "Hello" and only get the 'Hello world' messages. The central hub set up we have here is similar to a more traditional centralized, broker based messaging systems, like RabbitMQ where all of the messages go to a single point and are redistributed to connected clients.

Between the PUB/SUB an XPUB/XSUB socket types, you can easily create distributed messaging systems that are fault tolerant, easy to scale and significantly easier to write over managing low level networking code or trying to set up large monolithic applications to deal with message distribution. As always, the source code for these examples and all of the examples for the summer of sockets can be found on my bitbucket

javascript zmq tcp pubsub socket summer of sockets node.js