Pub/Sub

From Wikipedia:

發布/訂閱(Publish/subscribe 或pub/sub)是一種訊息規範,訊息的傳送者(發布者)不是計劃傳送其訊息給特定的接收者(訂閱者)。而是發布的訊息分為不同的類別,而不需要知道什麼樣的訂閱者訂閱。訂閱者對一個或多個類別表達興趣,於是只接收感興趣的訊息,而不需要知道什麼樣的發布者發布的訊息。這種發布者和訂閱者的解耦可以允許更好的可延伸性和更為動態的網路拓撲.

上述所謂的_類別_也可以當成是一個_"主題""過濾器"_。

NetMQ用兩種socket型別支援Pub/Sub模式:

  • PublisherSocket
  • SubscriberSocket

Topics

ZeroMQ/NetMQ使用多段訊息傳送主題資訊,可用byte陣列來表示主題,或是字串並加上適當的System.Text.Encoding

A publisher must include the topic in the message's' first frame, prior to the message payload. For example, to publish a status message to subscribers of the status topic:

// send a message on the 'status' topic
pub.SendMoreFrame("status").SendFrame("All is well");

訂閱者使用SubscriberSocketSubscribe函式指定他們有興趣的主題。

// subscribe to the 'status' topic
sub.Subscribe("status");

Topic heirarchies

一個訊息的主題會用prefix檢查和訂閱者的訂閱主題比較。

也就是說,訂閱主題的訂閱者會接收具有主題的訊息:

  • topic
  • topic/subtopic
  • topical

然而它不會接受這些主題:

  • topi
  • TOPIC (記住,它是以byte做為比較方式)

使用prefix比對行為的結果,可以讓你以空字串來訂閱所有發佈的訊息。

sub.Subscribe(""); // subscribe to all topics

An Example

到了介紹範例的時間了,這範例很簡單,並遵守下列規則:

  • 有一個發佈者的process,會以500ms的時間隨機發佈主題為TopicA或'TopicB`的訊息。
  • 可能會有很多訂閱者,欲訂閱的主題名稱會以命令列參數代入程式中。

Publisher

using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;

namespace Publisher
{
    class Program
    {
        static void Main(string[] args)
        {
            Random rand = new Random(50);

            using (var pubSocket = new PublisherSocket())
            {
                Console.WriteLine("Publisher socket binding...");
                pubSocket.Options.SendHighWatermark = 1000;
                pubSocket.Bind("tcp://localhost:12345");

                for (var i = 0; i < 100; i++)
                {
                    var randomizedTopic = rand.NextDouble();
                    if (randomizedTopic > 0.5)
                    {
                        var msg = "TopicA msg-" + i;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicA").SendFrame(msg);
                    }
                    else
                    {
                        var msg = "TopicB msg-" + i;
                        Console.WriteLine("Sending message : {0}", msg);
                        pubSocket.SendMoreFrame("TopicB").SendFrame(msg);
                    }

                    Thread.Sleep(500);
                }
            }
        }
    }
}

Subscriber

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace SubscriberA
{
    class Program
    {
        public static IList<string> allowableCommandLineArgs
            = new [] { "TopicA", "TopicB", "All" };

        static void Main(string[] args)
        {
            if (args.Length != 1 || !allowableCommandLineArgs.Contains(args[0]))
            {
                Console.WriteLine("Expected one argument, either " +
                                  "'TopicA', 'TopicB' or 'All'");
                Environment.Exit(-1);
            }

            string topic = args[0] == "All" ? "" : args[0];
            Console.WriteLine("Subscriber started for Topic : {0}", topic);

            using (var subSocket = new SubscriberSocket())
            {
                subSocket.Options.ReceiveHighWatermark = 1000;
                subSocket.Connect("tcp://localhost:12345");
                subSocket.Subscribe(topic);
                Console.WriteLine("Subscriber socket connecting...");
                while (true)
                {
                    string messageTopicReceived = subSocket.ReceiveFrameString();
                    string messageReceived = subSocket.ReceiveFrameString();
                    Console.WriteLine(messageReceived);
                }
            }
        }
    }
}

在這邊提供三個批次檔,讓你方便執行,不過要稍微修改一下路徑等一適合你的環境。

RunPubSub.bat

start RunPublisher.bat
start RunSubscriber "TopicA"
start RunSubscriber "TopicB"
start RunSubscriber "All"

RunPublisher.bat

cd Publisher\bin\Debug
Publisher.exe

RunSubscriber.bat

set "topic=%~1"
cd Subscriber\bin\Debug
Subscriber.exe %topic%

執行時輸出如下:

Other Considerations

High water mark

SendHighWaterMark / ReceiveHighWaterMark選項可設定指定socket的high water mark。High water mark是對未完成訊息的最大數量的限制,NetMQ會將正在與指定的socket通訊的任何端點的訊息排入佇列中。

如果到達此限制,socket會進入異常狀態,並且根據socket類型,NetMQ應採取適當的措施,如阻止或丟棄發送的訊息。

預設的SendHighWaterMark / ReceiveHighWaterMark值為1000.零值表示“無限制”。

你也可以使用xxxSocket.Options屬性值設定下列兩個屬性:

  • pubSocket.Options.SendHighWatermark = 1000;
  • pubSocket.Options.ReceiveHighWatermark = 1000;

Slow subscribers

This is covered in the ZeroMQ guide

Late joining subscribers

This is covered in the ZeroMQ guide