Pub/Pub

NetMQ gère la pattern Pub/Sub avec ces deux socket :

  • PublisherSocket
  • SubscriberSocket

Comme d'habitude, on les crées à partir du NetMQContext avec la méthode .CreateXXXSocket(). Ce qui donne :

  • CreatePublisherSocket()
  • CreateSubscriberSocket()

Topics

NetMQ permet l'utilisation de topics (canaux), PublisherSocket envoie la "frame" 1( messages documentation page) du message qui contiendra le topic et dans la frame 2 le message en lui même, ce qui donne :

Frame 1 Frame 2
TopicA C'est un message dans le 'TopicA'



Voici comment envoyer les deux frames (Vous pouvez aussi utiliser un NetMQMessage et ajouter les frames une par une au message):

pubSocket.SendMore("TopicA").Send("C'est un message dans le 'TopicA'");



La SubscriberSocket peut s'abonner à un certain topic pour recevoir les messages, ce qui est faisable en passant le nom du topic dans la méthode Subscribe() de SubscriberSocket.

Voici un exemple:

subSocket.Subscribe("TopicA");



Comment s'abonner à tous les messages ?

Il est possible de s'abonner à tous les messages en mettant une chaine de caractère vide à la méthode subscriberSocket.Subscribe().

Exemple

Cet exemple est très simple et suis ces rêgles :

  • Il y a un publisher qui crée, soit des messages pour le topicA', soit des messages pour le 'topicB' (dépend d'un nombre aléatoire)
  • Il y a un Subscriber générique (le nom du topic auquel il s'abonne est passé en paramètre dans la ligne de commande)

Voici le code:

Publisher

using System;
using System.Threading;
using NetMQ;

namespace Publisher
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rand = new Random(50);

            using (var context = NetMQContext.Create())
            {
                using (var pubSocket = context.CreatePublisherSocket())
                {
                    Console.WriteLine("Publisher socket binding...");
                    pubSocket.Options.SendHighWatermark = 1000;
                    pubSocket.Bind("tcp://localhost:12345");

                    for (var i = 0; i < 100; i++)
                    {
                        var randomizedTopic = rand.NextDouble();
                        if (randomizedTopic > 0.5)
                        {
                            var msg = "TopicA msg-" + i;
                            Console.WriteLine("Sending message : {0}", msg);
                            pubSocket.SendMore("TopicA").Send(msg);
                        }
                        else
                        {
                            var msg = "TopicB msg-" + i;
                            Console.WriteLine("Sending message : {0}", msg);
                            pubSocket.SendMore("TopicB").Send(msg);
                        }

                        Thread.Sleep(500);
                    }
                }
            }
        }
    }
}

Subscriber

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;

namespace SubscriberA
{
    class Program
    {

        public static List<string> allowableCommandLineArgs = new List<string>();

        static Program()
        {
            allowableCommandLineArgs.Add("TopicA");
            allowableCommandLineArgs.Add("TopicB");
            allowableCommandLineArgs.Add("All");
        }

        static void PrintUsageAndExit()
        {
            Console.WriteLine("Subscriber is expected to be started with either 'TopicA', 'TopicB' or 'All'");
            Console.ReadLine();
            Environment.Exit(-1);
        }

        static void Main(string[] args)
        {

            if (args.Length != 1)
            {
                PrintUsageAndExit();
            }

            if (!allowableCommandLineArgs.Contains(args[0]))
            {
                PrintUsageAndExit();
            }

            string topic = args[0] == "All" ? "" : args[0];
            Console.WriteLine("Subscriber started for Topic : {0}", topic);

            using (var context = NetMQContext.Create())
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect("tcp://localhost:12345");
                    subSocket.Subscribe(topic);
                    Console.WriteLine("Subscriber socket connecting...");
                    while (true)
                    {
                        string messageTopicReceived = subSocket.ReceiveString();
                        string messageReceived = subSocket.ReceiveString();
                        Console.WriteLine(messageReceived);
                    }
                }
            }            
        }
    }
}

Pour lancer le programme, ces 3 fichiers BAT seront utiles.

RunPubSub.bat
start RunPublisher.bat


start RunSubscriber "TopicA"
start RunSubscriber "TopicB"
start RunSubscriber "All"

RunPublisher.bat

cd Publisher\bin\Debug
Publisher.exe

RunSubscriber.bat

set "topic=%~1"
cd Subscriber\bin\Debug
Subscriber.exe %topic%

Vous devriez voir ceci :



Autres Considérations

HighWaterMark

Les options SendHighWaterMark/ReceiveHighWaterMark spécifie le 'high water mark' pour la socket. Le 'high water mark' est une limite spécifiant le nombre maximum de message pouvant être empilés entre deux socket.

Si cette limite est atteinte et suivant le type de socket, la socket rentre dans un état spéciale et soit elle bloque les messages, soit elle supprime les messages suivants.

La valeur SendHighWaterMark/ReceiveHighWaterMark par defaut est 0 ce qui veux dire "sans limite".

Vous pouvez voir ces deux options dans xxxxSocket.Options :

  • pubSocket.Options.SendHighWatermark = 1000;
  • pubSocket.Options.ReceiveHighWatermark = 1000;

Slow Subscribers

Cette partie est couverte dans le guide ZeroMQ

Late Joining Subscribers

Cette partie est couverte dans le guide ZeroMQ