Pub/Sub
From Wikipedia:
Publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Instead, published messages are characterized into classes, without knowledge of what, if any, subscribers there may be. Similarly, subscribers express interest in one or more classes, and only receive messages that are of interest, without knowledge of what, if any, publishers there are.
The classes mentioned in this description can also be referred to as topics or filters.
NetMQ comes with support for Pub/Sub by way of two socket types:
PublisherSocket
SubscriberSocket
Topics
ZeroMQ/NetMQ uses multipart messages to convey topic information. Topics are expressed as an array of bytes, though you may use a string and suitable System.Text.Encoding
.
A publisher must include the topic in the message's' first frame, prior to the message payload. For example, to publish a status message to subscribers of the status
topic:
// send a message on the 'status' topic pub.SendMoreFrame("status").SendFrame("All is well");
Subscribers specify which topics they are interested in via the Subscribe
method of SubscriberSocket
:
// subscribe to the 'status' topic sub.Subscribe("status");
Topic heirarchies
A message's topic is compared against subscribers' subscription topics using a prefix check.
That is, a subscriber who subscribed to topic
would receive messages with topics:
topic
topic/subtopic
topical
However it would not receive messages with topics:
topi
TOPIC
(remember, it's a byte-wise comparison)
A consequence of this prefix matching behavious is that you can receive all published messages by subscribing with an empty topic string:
sub.Subscribe(""); // subscribe to all topics
An Example
Time for an example. This example is very simple, and follows these rules.
- There is one publisher process, who randomly publishes a message to either
TopicA
orTopicB
every 500ms. - There may be many subscribers. The topic name is passed as a command line argument.
Publisher
using System; using System.Threading; using NetMQ; using NetMQ.Sockets; namespace Publisher { class Program { static void Main(string[] args) { Random rand = new Random(50); using (var pubSocket = new PublisherSocket()) { Console.WriteLine("Publisher socket binding..."); pubSocket.Options.SendHighWatermark = 1000; pubSocket.Bind("tcp://*:12345"); for (var i = 0; i < 100; i++) { var randomizedTopic = rand.NextDouble(); if (randomizedTopic > 0.5) { var msg = "TopicA msg-" + i; Console.WriteLine("Sending message : {0}", msg); pubSocket.SendMoreFrame("TopicA").SendFrame(msg); } else { var msg = "TopicB msg-" + i; Console.WriteLine("Sending message : {0}", msg); pubSocket.SendMoreFrame("TopicB").SendFrame(msg); } Thread.Sleep(500); } } } } }
Subscriber
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; using NetMQ; using NetMQ.Sockets; namespace SubscriberA { class Program { public static IList<string> allowableCommandLineArgs = new [] { "TopicA", "TopicB", "All" }; static void Main(string[] args) { if (args.Length != 1 || !allowableCommandLineArgs.Contains(args[0])) { Console.WriteLine("Expected one argument, either " + "'TopicA', 'TopicB' or 'All'"); Environment.Exit(-1); } string topic = args[0] == "All" ? "" : args[0]; Console.WriteLine("Subscriber started for Topic : {0}", topic); using (var subSocket = new SubscriberSocket()) { subSocket.Options.ReceiveHighWatermark = 1000; subSocket.Connect("tcp://localhost:12345"); subSocket.Subscribe(topic); Console.WriteLine("Subscriber socket connecting..."); while (true) { string messageTopicReceived = subSocket.ReceiveFrameString(); string messageReceived = subSocket.ReceiveFrameString(); Console.WriteLine(messageReceived); } } } } }
To run this, these three BAT files may be useful, though you will need to change them to suit your code location should you choose to copy this example code into a new set of projects.
RunPubSub.bat
start RunPublisher.bat start RunSubscriber "TopicA" start RunSubscriber "TopicB" start RunSubscriber "All"
RunPublisher.bat
cd Publisher\bin\Debug Publisher.exe
RunSubscriber.bat
set "topic=%~1" cd Subscriber\bin\Debug Subscriber.exe %topic%
When run, you should see something like this:
Other Considerations
High water mark
The SendHighWaterMark
/ReceiveHighWaterMark
options set the high water mark for the specified socket. The high water mark is a hard limit on the maximum number of outstanding messages NetMQ shall queue in memory for any single peer that the specified socket is communicating with.
If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, NetMQ shall take appropriate action such as blocking or dropping sent messages.
The default SendHighWaterMark
/ReceiveHighWaterMark
value is 1000. The value of zero means "no limit".
You would set these 2 options using the xxxxSocket.Options
property as follows:
pubSocket.Options.SendHighWatermark = 1000;
pubSocket.Options.ReceiveHighWatermark = 1000;
Slow subscribers
This is covered in the ZeroMQ guide
Late joining subscribers
This is covered in the ZeroMQ guide