Pub/Pub
NetMQ gère la pattern Pub/Sub avec ces deux socket :
PublisherSocket
SubscriberSocket
Comme d'habitude, on les crées à partir du NetMQContext
avec la méthode .CreateXXXSocket()
.
Ce qui donne :
CreatePublisherSocket()
CreateSubscriberSocket()
Topics
NetMQ permet l'utilisation de topics (canaux), PublisherSocket
envoie la "frame" 1( messages documentation page) du message qui contiendra le topic et dans la frame 2 le message en lui même, ce qui donne :
Frame 1 | Frame 2 |
---|---|
TopicA | C'est un message dans le 'TopicA' |
Voici comment envoyer les deux frames (Vous pouvez aussi utiliser un NetMQMessage
et ajouter les frames une par une au message):
pubSocket.SendMore("TopicA").Send("C'est un message dans le 'TopicA'");
La SubscriberSocket
peut s'abonner à un certain topic pour recevoir les messages, ce qui est faisable en passant le nom du topic dans la méthode Subscribe()
de SubscriberSocket
.
Voici un exemple:
subSocket.Subscribe("TopicA");
Comment s'abonner à tous les messages ?
Il est possible de s'abonner à tous les messages en mettant une chaine de caractère vide à la méthode subscriberSocket.Subscribe()
.
Exemple
Cet exemple est très simple et suis ces rêgles :
- Il y a un publisher qui crée, soit des messages pour le topicA', soit des messages pour le 'topicB' (dépend d'un nombre aléatoire)
- Il y a un Subscriber générique (le nom du topic auquel il s'abonne est passé en paramètre dans la ligne de commande)
Voici le code:
Publisher
using System;
using System.Threading;
using NetMQ;
namespace Publisher
{
class Program
{
static void Main(string[] args)
{
Random rand = new Random(50);
using (var context = NetMQContext.Create())
{
using (var pubSocket = context.CreatePublisherSocket())
{
Console.WriteLine("Publisher socket binding...");
pubSocket.Options.SendHighWatermark = 1000;
pubSocket.Bind("tcp://localhost:12345");
for (var i = 0; i < 100; i++)
{
var randomizedTopic = rand.NextDouble();
if (randomizedTopic > 0.5)
{
var msg = "TopicA msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMore("TopicA").Send(msg);
}
else
{
var msg = "TopicB msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMore("TopicB").Send(msg);
}
Thread.Sleep(500);
}
}
}
}
}
}
Subscriber
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
namespace SubscriberA
{
class Program
{
public static List<string> allowableCommandLineArgs = new List<string>();
static Program()
{
allowableCommandLineArgs.Add("TopicA");
allowableCommandLineArgs.Add("TopicB");
allowableCommandLineArgs.Add("All");
}
static void PrintUsageAndExit()
{
Console.WriteLine("Subscriber is expected to be started with either 'TopicA', 'TopicB' or 'All'");
Console.ReadLine();
Environment.Exit(-1);
}
static void Main(string[] args)
{
if (args.Length != 1)
{
PrintUsageAndExit();
}
if (!allowableCommandLineArgs.Contains(args[0]))
{
PrintUsageAndExit();
}
string topic = args[0] == "All" ? "" : args[0];
Console.WriteLine("Subscriber started for Topic : {0}", topic);
using (var context = NetMQContext.Create())
{
using (var subSocket = context.CreateSubscriberSocket())
{
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://localhost:12345");
subSocket.Subscribe(topic);
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveString();
string messageReceived = subSocket.ReceiveString();
Console.WriteLine(messageReceived);
}
}
}
}
}
}
Pour lancer le programme, ces 3 fichiers BAT seront utiles.
RunPubSub.bat
start RunPublisher.bat
start RunSubscriber "TopicA"
start RunSubscriber "TopicB"
start RunSubscriber "All"
RunPublisher.bat
cd Publisher\bin\Debug
Publisher.exe
RunSubscriber.bat
set "topic=%~1"
cd Subscriber\bin\Debug
Subscriber.exe %topic%
Vous devriez voir ceci :
Autres Considérations
HighWaterMark
Les options SendHighWaterMark/ReceiveHighWaterMark
spécifie le 'high water mark' pour la socket. Le 'high water mark' est une limite spécifiant le nombre maximum de message pouvant être empilés entre deux socket.
Si cette limite est atteinte et suivant le type de socket, la socket rentre dans un état spéciale et soit elle bloque les messages, soit elle supprime les messages suivants.
La valeur SendHighWaterMark/ReceiveHighWaterMark
par defaut est 0 ce qui veux dire "sans limite".
Vous pouvez voir ces deux options dans xxxxSocket.Options
:
pubSocket.Options.SendHighWatermark = 1000;
pubSocket.Options.ReceiveHighWatermark = 1000;
Slow Subscribers
Cette partie est couverte dans le guide ZeroMQ
Late Joining Subscribers
Cette partie est couverte dans le guide ZeroMQ