Gérer plusieurs Sockets

Pourquoi vouloir gérer plusieurs sockets en même temps ? Il existe plusieurs raison à cela :

  • Vous avez plusieurs socket dans un même process qui sont reliées les unes aux autres et vous devez savoir quand chacunes d'elles est prête à recevoir des données.
  • Vous voulez avoir une requete ainsi qu'un publisher dans un même process.

Des fois vous allez avoir besoin de gérer plusieurs socket dans un même process. Et souvent vous voudrez utilisez les sockets que lorsqu'elles sont prêtes à être utilisées.

ZeroMq instaure ici le concept de Poller pour savoir si une socket est prête.

NetMQ a une implémentation du Poller, et il peut être utilisé pour :

  • Gérer une socket, pour savoir si la lecture est prête.
  • Gérer une liste de socket (IEnumerable) pour savoir si la lecture est prête.
  • Autoriser des NetMQSocket(s) a être ajouté dynamiquement et déterminer si les nouvelles requêtes sont prêtres à être lues.
  • Autorisé des NetMQSocket(s) à être supprimées dynamiquement.
  • Lever un evennement quand une socket est prête.

Les méthodes du Poller

Le Poller possède plusieurs méthodes qui vont vous aider à gerer tout ca. Plus précisément les méthodes AddSocket(..)/RemoveSocket(..) et Start()/Stop().

Vous devez utiliser la méthode AddSocket pour ajouter dynamiquement une socket et vérifier son état de "prête pour la lecture", et ensuite appeller la méthode Poller.Start().A ce moment, le Poller va appelle tous les evennements ReceiveReady.

Poller Exemple

Maintenant que l'on sait à quoi sert le Poller, voyons un petit exemple.

Dans le code suivant, une seule requête est ajouter au Poller. Nous pouvons voir que l'evennement ReceiveReady est gérer. Le Poller va automatiquement appeler l'evennement quand la requete sera "prête".

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using NetMQ;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            using (NetMQContext contex = NetMQContext.Create())
            {
                using (var rep = contex.CreateResponseSocket())
                {
                    rep.Bind("tcp://127.0.0.1:5002");

                    using (var req = contex.CreateRequestSocket())
                    using (Poller poller = new Poller())
                    {
                        req.Connect("tcp://127.0.0.1:5002");

                        //The ReceiveReady event is raised by the Poller
                        rep.ReceiveReady += (s, a) =>
                        {
                            bool more;
                            string messageIn = a.Socket.ReceiveString(out more);
                            Console.WriteLine("messageIn = {0}", messageIn);
                            a.Socket.Send("World");
                        };

                        poller.AddSocket(rep);

                        Task pollerTask = Task.Factory.StartNew(poller.Start);
                        req.Send("Hello");

                        bool more2;
                        string messageBack = req.ReceiveString(out more2);
                        Console.WriteLine("messageBack = {0}", messageBack);

                        poller.Stop();

                        Thread.Sleep(100);
                        pollerTask.Wait();
                    }
                }
            }
            Console.ReadLine();
        }
    }
}

Vous devriez voir dans la console :

messageIn = Hello
messageBack = World

A partir de cet exemple, nous pouvons maintenant supprimer la ResponseSocket du Poller dès que nous voyons le premiers message, ce qui veux dire que nous ne devrions plus recevoir aucunes informations sur la ResponseSocket.

Voici le nouveau code :

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using NetMQ;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {

            using (NetMQContext contex = NetMQContext.Create())
            {
                using (var rep = contex.CreateResponseSocket())
                {
                    rep.Bind("tcp://127.0.0.1:5002");

                    using (var req = contex.CreateRequestSocket())
                    using (Poller poller = new Poller())
                    {
                        req.Connect("tcp://127.0.0.1:5002");

                        //The ReceiveReady event is raised by the Poller
                        rep.ReceiveReady += (s, a) =>
                        {
                            bool more;
                            string messageIn = a.Socket.ReceiveString(out more);
                            Console.WriteLine("messageIn = {0}", messageIn);
                            a.Socket.Send("World");


                            //REMOVAL
                            //This time we remove the Socket from the Poller, so it should not receive any more messages
                            poller.RemoveSocket(a.Socket);
                        };

                        poller.AddSocket(rep);



                        Task pollerTask = Task.Factory.StartNew(poller.Start);

                        req.Send("Hello");

                        bool more2;
                        string messageBack = req.ReceiveString(out more2);
                        Console.WriteLine("messageBack = {0}", messageBack);



                        //This should not do anything, as we removed the ResponseSocket
                        //the 1st time we sent a message to it
                        req.Send("Hello Again");


                        Console.WriteLine("Carrying on doing the rest");

                        poller.Stop();

                        Thread.Sleep(100);
                        pollerTask.Wait();
                    }
                }
            }
            Console.ReadLine();
        }
    }
}

Ce qui donne :

messageIn = Hello
messageBack = World
Carrying on doing the rest

Nous n'avons pas le message "Hello Again" que nous avons envoyés car la ResponseSocket a été supprimé du Poller avant.

Timer(s)

Une autre fonctionnalité du Poller est de pouvoir gérer des Timer en appellant les méthodes AddTimer(..) / RemoveTimer(..).

Voici un exemple qui montre comment on ajoute un NetMQTimer qui attend 5 secondes. Le NetMQTimer est ajouter au Poller, qui va s'occuper de déclencher le NetMQTimer.Elapsed.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using NetMQ;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {

            using (Poller poller = new Poller())
            {
                NetMQTimer timer = new NetMQTimer(TimeSpan.FromSeconds(5));
                timer.Elapsed += (s, a) =>
                        {
                            Console.WriteLine("Timer done");
                        }; ;
                poller.AddTimer(timer);


                Task pollerTask = Task.Factory.StartNew(poller.Start);


                //give the poller enough time to run the timer (set at 5 seconds)
                Thread.Sleep(10000);

            }

            Console.ReadLine();
        }
    }
}

Ce qui donne :

Timer done

Pour aller plus loin

Pour plus d'utilisation du Poller vous pouvez aller sur Poller tests