Push / Pull

NetMq implémente les PushSocket et PullSocket. Voyons comment les utiliser

Normalement ces deux socket vont de pair. Une PushSocket va envoyer des données à une PullSocket, tandis qu'une PullSocket s'attend à recevoir des données d'une ou plusieurs PushSocket. Jusqu'ici pas de problèmes!

Vous pouvez utiliser cette configuration pour faire une architecture permettant par exemple de distribuer du travail, un peu comme la patterne divide and conquer.

L'idée est d'avoir une partie qui génère du travail et qui le distribue à 1-n "travailleurs". Chaque "travailleur" fait son boulot et renvoi ses résultats au process (peut aussi etre un thread) où les résultats sont traités.

Dans le guide ZeroMQ, il y a un exemple qui montre comment le ditributeur de tache dit aux travailleurs d'attendre pendant une certaine période.

Nous allons essayés de créer un exemple un peu plus élaboré, le distributeur de tache va envoyer le temps que chaque travailleur doit attendre (simulation d'un travail a faire).

En réalité , ce travail pourrait être n'importe quelle tache, du moment que ce sont des taches qui peuvent se découper en sous-tâches bien séparées, peut importe le nombre de "travailleurs".

Voici un diagramme de ce que nous essayons de faire :





et le code....

Ventilator

using System;
using NetMQ;


namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to send messages on
                using (var sender = ctx.CreatePushSocket())
                {
                    sender.Bind("tcp://*:5557");

                    using (var sink = ctx.CreatePushSocket())
                    {
                        sink.Connect("tcp://localhost:5558");

                        Console.WriteLine("Press enter when worker are ready");
                        Console.ReadLine();

                        //the first message it "0" and signals start of batch
                        //see the Sink.csproj Program.cs file for where this is used
                        Console.WriteLine("Sending start of batch to Sink");
                        sink.Send("0");



                        Console.WriteLine("Sending tasks to workers");

                        //initialise random number generator
                        Random rand = new Random(0);

                        //expected costs in Ms
                        int totalMs = 0;

                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                        {
                            //Random workload from 1 to 100 msec
                            int workload = rand.Next(0, 100);
                            totalMs += workload;
                            Console.WriteLine("Workload : {0}", workload);
                            sender.Send(workload.ToString());
                        }
                        Console.WriteLine("Total expected cost : {0} msec", totalMs);
                        Console.WriteLine("Press Enter to quit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

Worker

using System;
using System.Threading;
using NetMQ;


namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");


            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Connect("tcp://localhost:5557");

                    //socket to send messages on
                    using (var sender = ctx.CreatePushSocket())
                    {
                        sender.Connect("tcp://localhost:5558");

                        //process tasks forever
                        while (true)
                        {
                            //workload from the vetilator is a simple delay
                            //to simulate some work being done, see
                            //Ventilator.csproj Proram.cs for the workload sent
                            //In real life some more meaningful work would be done
                            string workload = receiver.ReceiveString();

                            //simulate some work being done
                            Thread.Sleep(int.Parse(workload));

                            //send results to sink, sink just needs to know worker
                            //is done, message content is not important, just the precence of
                            //a message means worker is done. 
                            //See Sink.csproj Proram.cs 
                            Console.WriteLine("Sending to Sink");
                            sender.Send(string.Empty);
                        }
                    }

                }
            }

        }
    }
}

Sink

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;


namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {

            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Bind("tcp://localhost:5558");

                    //wait for start of batch (see Ventilator.csproj Program.cs)
                    var startOfBatchTrigger = receiver.ReceiveString();
                    Console.WriteLine("Seen start of batch");

                    //Start our clock now
                    Stopwatch watch = new Stopwatch();
                    watch.Start();

                    for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                    {
                        var workerDoneTrigger = receiver.ReceiveString();
                        if (taskNumber % 10 == 0)
                        {
                            Console.Write(":");
                        }
                        else
                        {
                            Console.Write(".");
                        }
                    }
                    watch.Stop();
                    //Calculate and report duration of batch
                    Console.WriteLine();
                    Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                    Console.ReadLine();
                }
            }
        }
    }
}

Pour lancer le programme, ces 3 fichiers Bat vous serons utiles.



Run1Worker.bat

cd Ventilator/bin/Debug
start Ventilator.exe
cd../../..
cd Sink/bin/Debug
start Sink.exe
cd../../..
cd Worker/bin/Debug
start Worker.exe

Vous devrier obtenir en résultat pour 1 travailleur :

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 5695 msec



Run2Workers.bat

cd Ventilator/bin/Debug
start Ventilator.exe
cd../../..
cd Sink/bin/Debug
start Sink.exe
cd../../..
cd Worker/bin/Debug
start Worker.exe
start Worker.exe

Vous devrier obtenir en résultat pour 2 travailleurs :

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 2959 msec



Run4Workers.bat

cd Ventilator/bin/Debug
start Ventilator.exe
cd../../..
cd Sink/bin/Debug
start Sink.exe
cd../../..
cd Worker/bin/Debug
start Worker.exe
start Worker.exe
start Worker.exe
start Worker.exe

Vous devrier obtenir en résultat pour 4 travailleurs :

====== SINK ======
Seen start of batch
:………:………:………:………:………:………:………:………
:………:………
Total elapsed time 1492 msec



On voit bien ici que plus on augmente le nombre de travailleurs, plus le temps d'execution des taches diminue.

Il y a cependant quelques points d'attentions sur cette paterne.

  • Le Ventilator utilise une NetMQ PushSocket pour distribuer le travail aux Workers, c'est du load balencing
  • Le Ventilator et le Sink sont les parties statiques de l'architecture, alors que les Workers sont dynamiques.
  • Nous devons synchroniser le debut du batch (Quand les Workers sont prêts), sinon le premier Worker quyi se connectera aura plus de messages que les autres, ce qui n'est pas vraiment du load balencing dans ce cas.
  • Le Sink utilise une NetMQ PullSocket pour traiter les résultats des Workers