Gérer plusieurs Sockets
Pourquoi vouloir gérer plusieurs sockets en même temps ? Il existe plusieurs raison à cela :
- Vous avez plusieurs socket dans un même process qui sont reliées les unes aux autres et vous devez savoir quand chacunes d'elles est prête à recevoir des données.
- Vous voulez avoir une requete ainsi qu'un publisher dans un même process.
Des fois vous allez avoir besoin de gérer plusieurs socket dans un même process. Et souvent vous voudrez utilisez les sockets que lorsqu'elles sont prêtes à être utilisées.
ZeroMq instaure ici le concept de Poller
pour savoir si une socket est prête.
NetMQ a une implémentation du Poller
, et il peut être utilisé pour :
- Gérer une socket, pour savoir si la lecture est prête.
- Gérer une liste de socket (
IEnumerable
) pour savoir si la lecture est prête. - Autoriser des
NetMQSocket
(s) a être ajouté dynamiquement et déterminer si les nouvelles requêtes sont prêtres à être lues. - Autorisé des
NetMQSocket
(s) à être supprimées dynamiquement. - Lever un evennement quand une socket est prête.
Les méthodes du Poller
Le Poller
possède plusieurs méthodes qui vont vous aider à gerer tout ca. Plus précisément les méthodes AddSocket(..)/RemoveSocket(..)
et Start()/Stop()
.
Vous devez utiliser la méthode AddSocket
pour ajouter dynamiquement une socket et vérifier son état de "prête pour la lecture", et ensuite appeller la méthode Poller.Start()
.A ce moment, le Poller
va appelle tous les evennements ReceiveReady
.
Poller Exemple
Maintenant que l'on sait à quoi sert le Poller
, voyons un petit exemple.
Dans le code suivant, une seule requête est ajouter au Poller
. Nous pouvons voir que l'evennement ReceiveReady
est gérer. Le Poller
va automatiquement appeler l'evennement quand la requete sera "prête".
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
using (NetMQContext contex = NetMQContext.Create())
{
using (var rep = contex.CreateResponseSocket())
{
rep.Bind("tcp://127.0.0.1:5002");
using (var req = contex.CreateRequestSocket())
using (Poller poller = new Poller())
{
req.Connect("tcp://127.0.0.1:5002");
//The ReceiveReady event is raised by the Poller
rep.ReceiveReady += (s, a) =>
{
bool more;
string messageIn = a.Socket.ReceiveString(out more);
Console.WriteLine("messageIn = {0}", messageIn);
a.Socket.Send("World");
};
poller.AddSocket(rep);
Task pollerTask = Task.Factory.StartNew(poller.Start);
req.Send("Hello");
bool more2;
string messageBack = req.ReceiveString(out more2);
Console.WriteLine("messageBack = {0}", messageBack);
poller.Stop();
Thread.Sleep(100);
pollerTask.Wait();
}
}
}
Console.ReadLine();
}
}
}
Vous devriez voir dans la console :
messageIn = Hello
messageBack = World
A partir de cet exemple, nous pouvons maintenant supprimer la ResponseSocket
du Poller
dès que nous voyons le premiers message, ce qui veux dire que nous ne devrions plus recevoir aucunes informations sur la ResponseSocket
.
Voici le nouveau code :
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
using (NetMQContext contex = NetMQContext.Create())
{
using (var rep = contex.CreateResponseSocket())
{
rep.Bind("tcp://127.0.0.1:5002");
using (var req = contex.CreateRequestSocket())
using (Poller poller = new Poller())
{
req.Connect("tcp://127.0.0.1:5002");
//The ReceiveReady event is raised by the Poller
rep.ReceiveReady += (s, a) =>
{
bool more;
string messageIn = a.Socket.ReceiveString(out more);
Console.WriteLine("messageIn = {0}", messageIn);
a.Socket.Send("World");
//REMOVAL
//This time we remove the Socket from the Poller, so it should not receive any more messages
poller.RemoveSocket(a.Socket);
};
poller.AddSocket(rep);
Task pollerTask = Task.Factory.StartNew(poller.Start);
req.Send("Hello");
bool more2;
string messageBack = req.ReceiveString(out more2);
Console.WriteLine("messageBack = {0}", messageBack);
//This should not do anything, as we removed the ResponseSocket
//the 1st time we sent a message to it
req.Send("Hello Again");
Console.WriteLine("Carrying on doing the rest");
poller.Stop();
Thread.Sleep(100);
pollerTask.Wait();
}
}
}
Console.ReadLine();
}
}
}
Ce qui donne :
messageIn = Hello
messageBack = World
Carrying on doing the rest
Nous n'avons pas le message "Hello Again" que nous avons envoyés car la ResponseSocket
a été supprimé du Poller
avant.
Timer(s)
Une autre fonctionnalité du Poller
est de pouvoir gérer des Timer
en appellant les méthodes AddTimer(..) / RemoveTimer(..)
.
Voici un exemple qui montre comment on ajoute un NetMQTimer
qui attend 5 secondes. Le NetMQTimer
est ajouter au Poller
, qui va s'occuper de déclencher le NetMQTimer.Elapsed
.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
using (Poller poller = new Poller())
{
NetMQTimer timer = new NetMQTimer(TimeSpan.FromSeconds(5));
timer.Elapsed += (s, a) =>
{
Console.WriteLine("Timer done");
}; ;
poller.AddTimer(timer);
Task pollerTask = Task.Factory.StartNew(poller.Start);
//give the poller enough time to run the timer (set at 5 seconds)
Thread.Sleep(10000);
}
Console.ReadLine();
}
}
}
Ce qui donne :
Timer done
Pour aller plus loin
Pour plus d'utilisation du Poller
vous pouvez aller sur Poller tests