Beacon

NetMQBeacon implements a peer-to-peer discovery service for local networks.

A beacon can broadcast and/or capture service announcements using UDP messages on the local area network. You can define the format of your outgoing beacons, and set a filter that validates incoming beacons. Beacons are sent and received asynchronously in the background.

We can use the NetMQBeacon to discover and connect to other NetMQ/CZMQ services in the network automatically without central configuration. Please note that to use NetMQBeacon your infrastructure must support broadcast. Most cloud providers doesn't support broadcast.

This implementation uses IPv4 UDP broadcasts, and is a port of zbeacon from czmq.

Example: Implementing a Bus

Following is a simple bus implementation that uses NetMQBeacon. This will allow a set of nodes to discover one another, configured only via a shared port number.

  • Each bus node binds a subscriber socket and connects to other nodes with a publisher socket.
  • Each node will use NetMQBeacon to announce its existence and to discover other nodes. We will also use NetMQActor to implement our node.

The Bus class

:::csharp
public class Bus
{
    // Actor Protocol
    public const string PublishCommand = "P";

    // Dead nodes timeout
    private readonly TimeSpan DeadNodeTimeout = TimeSpan.FromSeconds(10);

    // we will use this to check if we already know about the node
    class NodeKey
    {
        public NodeKey(string name, int port)
        {
            Name = name;
            Port = port;
        }

        public string Name { get; private set; }
        public int Port { get; private set; }

        protected bool Equals(NodeKey other)
        {
            return string.Equals(Name, other.Name) && Port == other.Port;
        }

        public override bool Equals(object obj)
        {
            if (ReferenceEquals(null, obj)) return false;
            if (ReferenceEquals(this, obj)) return true;
            if (obj.GetType() != this.GetType()) return false;
            return Equals((NodeKey)obj);
        }

        public override int GetHashCode()
        {
            unchecked
            {
                return ((Name != null ? Name.GetHashCode() : 0) * 397) ^ Port;
            }
        }
    }

    private readonly NetMQContext m_context;
    private readonly int m_broadcastPort;

    private NetMQActor m_actor;

    private PublisherSocket m_publisher;
    private SubscriberSocket m_subscriber;
    private NetMQBeacon m_beacon;
    private Poller m_poller;
    private PairSocket m_shim;
    private Dictionary<NodeKey, DateTime> m_nodes;

    private Bus(NetMQContext context, int broadcastPort)
    {
        m_nodes = new Dictionary<NodeKey, DateTime>();
        m_context = context;
        m_broadcastPort = broadcastPort;
        m_actor = NetMQActor.Create(context, RunActor);
    }

    /// <summary>
    /// Creates a new message bus actor. All communication with the bus is
    /// through the returned <see cref="NetMQActor"/>.
    /// </summary>
    public static NetMQActor Create(NetMQContext context, int broadcastPort)
    {
        Bus node = new Bus(context, broadcastPort);
        return node.m_actor;
    }

    private void RunActor(PairSocket shim)
    {
        // save the shim to the class to use later
        m_shim = shim;

        // create all subscriber, publisher and beacon
        using (m_subscriber = m_context.CreateSubscriberSocket())
        using (m_publisher = m_context.CreatePublisherSocket())
        using (m_beacon = new NetMQBeacon(m_context))
        {
            // listen to actor commands
            m_shim.ReceiveReady += OnShimReady;

            // subscribe to all messages
            m_subscriber.Subscribe("");

            // we bind to a random port, we will later publish this port
            // using the beacon
            int randomPort = m_subscriber.BindRandomPort("tcp://*");

            // listen to incoming messages from other publishers
            m_subscriber.ReceiveReady += OnSubscriberReady;

            // configure the beacon to listen on the broadcast port
            m_beacon.Configure(m_broadcastPort);

            // publishing the random port to all other nodes
            m_beacon.Publish(randomPort.ToString(), TimeSpan.FromSeconds(1));

            // Subscribe to all beacon on the port
            m_beacon.Subscribe("");

            // listen to incoming beacons
            m_beacon.ReceiveReady += OnBeaconReady;

            // Create and configure the poller with all sockets
            m_poller = new Poller(m_shim, m_subscriber, m_beacon);

            // Create a timer to clear dead nodes
            NetMQTimer timer = new NetMQTimer(TimeSpan.FromSeconds(1));
            timer.Elapsed += ClearDeadNodes;
            m_poller.AddTimer(timer);

            // signal the actor that we finished with configuration and
            // ready to work
            m_shim.SignalOK();

            // polling until cancelled
            m_poller.PollTillCancelled();
        }
    }

    private void OnShimReady(object sender, NetMQSocketEventArgs e)
    {
        // new actor command
        string command = m_shim.ReceiveString();

        // check if we received end shim command
        if (command == NetMQActor.EndShimMessage)
        {
            // we cancel the socket which dispose and exist the shim
            m_poller.Cancel();
        }
        else if (command == PublishCommand)
        {
            // it is a publish command
            // we just forward everything to the publisher until end of message
            NetMQMessage message = m_shim.ReceiveMessage();
            m_publisher.SendMessage(message);
        }
    }

    private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
    {
        // we got a new message from the bus
        // let's forward everything to the shim
        NetMQMessage message = m_subscriber.ReceiveMessage();
        m_shim.SendMessage(message);
    }

    private void OnBeaconReady(object sender, NetMQBeaconEventArgs e)
    {
        // we got another beacon
        // let's check if we already know about the beacon
        string nodeName;
        int port = Convert.ToInt32(m_beacon.ReceiveString(out nodeName));

        // remove the port from the peer name
        nodeName = nodeName.Replace(":" + m_broadcastPort, "");

        NodeKey node = new NodeKey(nodeName, port);

        // check if node already exist
        if (!m_nodes.ContainsKey(node))
        {
            // we have a new node, let's add it and connect to subscriber
            m_nodes.Add(node, DateTime.Now);
            m_publisher.Connect(string.Format("tcp://{0}:{1}", nodeName, port));
        }
        else
        {
            m_nodes[node] = DateTime.Now;
        }
    }

    private void ClearDeadNodes(object sender, NetMQTimerEventArgs e)
    {
        // create an array with the dead nodes
        var deadNodes = m_nodes.
            Where(n => DateTime.Now > n.Value + DeadNodeTimeout).
            Select(n => n.Key).ToArray();

        // remove all the dead nodes from the nodes list and disconnect
        // from the publisher
        foreach (var node in deadNodes)
        {
            m_nodes.Remove(node);
            m_publisher.Disconnect(
                string.Format("tcp://{0}:{1}", node.Name, node.Port));
        }
    }
}

Node implementation

A node on the bus might resemble:

:::csharp
using (NetMQContext context = NetMQContext.Create())
{
    // create a bus using broadcast port 9999
    var actor = Bus.Create(context, 9999);

    // beacons publish every second, so wait a little longer than that to
    // let all the other nodes connect to our new node
    Thread.Sleep(1100);

    // publish a hello message
    // note we can use NetMQSocket send and receive extension methods
    actor.SendMore(Bus.PublishCommand).Send("Hello?");

    // receive messages from other nodes on the bus
    while (true)
    {
        string message = actor.ReceiveString();

        if (message == "Hello?")
        {
            // another node is saying hello
            Console.WriteLine(message);

            // send back a welcome message
            actor.SendMore(Bus.PublishCommand).Send("Welcome!");
        }
        else
        {
            // it's probably a welcome message
            Console.WriteLine(message);
        }
    }
}

Further reading