RabbitMQ AMQP

RabbitMQ技術說明與應用

【Roger】洪承孝(昕力DTD) 2025/12/23 10:43:46
26

RabbitMQ技術說明與應用

一、常用訊息代理軟體的基本介紹

 

image.png

二、RabbitMQ的特性與優點

  • 免費開源特性
  • 多種訊息協定:支援AMQP、MQTT、STOMP、HTTP。
  • 輕量級的系統:支援Linux、Windows、macOS等系統。
  • 支援開發語言:如Java、Python、JavaScript、.NET Core等。
  • 第三方插件支援:可透過如Logstash支援的插件將佇列訊息儲存到資料庫上。
  • 可視化GUI介面:提供豐富的UI介面,可於後台進行Exchange、Queue等操作。
  • 工具支援:可與市面上主要CI/CD工具配合使用,如GitLab、Docker等。

三、什麼是RabbitMQ?

 

image.png

RabbitMQ是由基礎的訊息架構所延伸的一套系統,如下圖所示為一個基本的訊息傳遞模型,可分為「生產者」、「消費者」及「訊息」,其說明如下:

  • 生產者:發送訊息的來源端,主要進行「發送」動作,將訊息發送到「訊息代理端」後進行分派。
  • 消費者:接收訊息的終端,主要進行「接收」動作,接著來自「訊息代理端」的分派資訊。
  • 訊息:生產者提供訊息並由訂閱的消費者接收,訊息內容可以為標頭、JSON、XML、純文字等。
 

image.png

RabbitMQ是一個訊息代理軟體,它是基於AMQP(Advanced Message Queuing Protocol),可分為「發佈者」、「消費者」、「交換器」及「佇列」,與「基本的訊息傳遞模型」不同的是AMQP提供更多樣組合選擇,其說明如下:

  • 交換器:提供「Direct」、「Fanout」、「Topic」、「Headers」四種交換方式,各交換器對訊息的過濾上有不同的作用。
  • 佇列:建立需要的訊息佇列,透過交換器進行綁定,則訊息僅能從該交換器傳送訊息。
  • 發佈者:提供相對應的訊息規則,將訊息內容發送至綁定的交換器,再由「交換器」分派給對應的消費者。
  • 消費者:透過訂閱佇列來接收對應交換器傳送出來的內容。

四、介紹RabbitMQ的幾種交換器和佇列

1. Direct交換類型

 

image.png

  • 生產者(Producer)向交換器(Exchange)發送訊息。
  • 佇列(Queue)使用路由鍵(Routing Key)與交換器(Exchange)進行綁定。
  • 傳送到交換器(Exchange)的訊息會根據路由鍵(Routing Key)將訊息轉送到一個或多個佇列(Queue)。
  • 消費者(Consumer)訂閱佇列(Queue)接收訊息。

備註:佇列可以與交換器綁定相同或不同路由鍵(Routing Key)

2. Fanout扇出交換類型

 

image.png

  • 生產者(Producer)向交換器(Exchange)發送訊息。
  • 一個或多個佇列綁定到沒有路由鍵(Routing Key)的扇出交換器。
  • 交換器(Exchange)將訊息無條件轉送到各個佇列。
  • 消費者(Consumer)訂閱佇列(Queue)接收訊息。

3. Headers交換類型

 

image.png

標頭匹配流程

  • 生產者(Producer)將帶有標頭屬性的訊息傳送到對應交換器(Exchange)。
  • 一或多個佇列(Queue)使用標頭屬性來綁定交換器(Exchange)
  • 如果標頭訊息的條件匹配,則將訊息轉送到佇列(Queue)。
  • 消費者(Consumer)訂閱佇列(Queue)接收訊息。

標頭匹配演算法說明

  • 標頭格式主要以「Key:Value」方式定義
  • 標頭的匹配類型可以為「OR」和「AND」,以本例來說分別設定為「any」和「all」
  • 發送的標頭中需要包含匹配條件,再來才是訊息內容
  • 以本例來說,假設生產者(Producer)傳入「any」且訊息內容為「h1」,則訂閱佇列(Queue)的消費者(Consumer)會接收到「HealthQ」和「EducationQ」的佇列訊息。

4. Topic交換類型

 

image.png

  • Topic Exchange中的路由鍵必須由零個或多個以點分隔的單字組成,例如「health.education」。
  • 主題交換中的路由鍵通常稱為路由模式。
  • 路由模式只包含「*」、「.」和「#」的正規表達式。
  • 星字符號「*」表示只允許一個單字。
  • 井字符號「#」表示允許零個或多個單字。
  • 點符號「.」表示單字分隔符號,多個關鍵術語由點分隔符號進行分隔。
  • 如果路由模式為「health.*」,則表示以路由鍵「health」作為第一個單字發送的任何訊息都會到達佇列。
  • 例如「health.education」將到達此佇列,但「sports.health」不會起作用。

五、Erlang安裝步驟

步驟 1 - 下載並安裝Erlang

首先打開 Erlang下載連結 ,在右邊選擇「Download Windows installer」並下載符合系統環境的位元

 

image.png

步驟 2 - 選擇要安裝的元件

確定安裝介面預設已勾選「Erlang」、「Associations」、「Erlang Documentation」後,單擊「Next」繼續。

 

image.png

步驟 3 - 選擇安裝路徑

通常預設路徑即可,接著單擊「Next」繼續。

 

image.png

步驟 4 - 進行安裝並等待安裝完成

單擊「Install」進行安裝,請等待所有程序安裝結束。

 

image.png

六、RabbitMQ安裝步驟

步驟 1 - 下載並安裝RabbitMQ

首先打開 RabbitMQ下載連結 並捲動到底下單擊「Windows Installer」 進行下載。

 

image.png

步驟 2 - 選擇要安裝的元件

確定安裝介面預設已勾選「RabbitMQ Service」、「Start Menu Shortcuts」後,單擊「Next」繼續。

 

image.png

步驟 3 - 進行安裝並等待安裝完成

單擊「Install」進行安裝,請等待所有程序安裝結束。

 

image.png

步驟 4 - 啟動RabbitMQ Service

開啟「RabbitMQ Service-start」。

 

image.png

開啟「RabbitMQ Command Prompt」,輸入「sc query "RabbitMQ"」,以確認服務已啟用。

步驟 5 - 確認RabbitMQ Service是否啟動

 

image.png

步驟 6 - 安裝RabbitMQ管理套件

開啟「RabbitMQ Command Prompt」。

 

image.png

輸入「rabbitmq-plugins.bat enable rabbitmq_management」,接著按下Enter。

 

image.png

步驟 7 - 登入後臺頁面

首先打開瀏覽器輸入RabbitMQ後臺預設網址,接著輸入帳號密碼單擊「Login」按鈕後即可進入後臺網址,登入網址及預設帳號密碼如下所示。

Server: http://localhost:15672/
Username: guest
Password: guest

 

image.png

 

image.png

七、實作 .NET Core及RabbitMQ

1. 開發環境說明

  • 作業系統:Windows 11
  • Server端:Erlang 26.1.2 & RabbitMQ 3.12.8
  • Client端:.NET Core Console
  • NuGet套件:
    • RabbitMQ.Client (6.6.0)
    • Microsoft.Extensions.Hosting.WindowsServices (7.0.1)
    • Microsoft.Extensions.Hosting.Abstractions (7.0.0)
    • Microsoft.AspNetCore.Hosting.WindowsServices (6.0.24)
    • NLog (5.2.5)
    • NLog.Extensions.Logging (5.3.5)
    • NLog.Web.AspNetCore (5.3.5)

2. 實作範例 - 建立消費者佇列接收模型

MessageReceiver.cs

using RabbitMQ.Client;
using System.Text;

namespace MQTTClientSample
{
    public class MessageReceiver : DefaultBasicConsumer
    {
        //定義通道物件
        private readonly IModel _channel;

        public MessageReceiver(IModel channel)
        {
            _channel = channel;
        }

        /// <summary>
        /// 覆寫消費者接收方法
        /// </summary>
        /// <param name="consumerTag">通道標籤</param>
        /// <param name="deliveryTag">訊息傳遞編號</param>
        /// <param name="redelivered">是否重回佇列</param>
        /// <param name="exchange">交換類型</param>
        /// <param name="routingKey">路由鍵</param>
        /// <param name="properties">屬性</param>
        /// <param name="body">內容</param>
        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

            Console.WriteLine($"Consuming Message");
            Console.WriteLine($"Message received from the exchange {exchange}");
            Console.WriteLine($"Consumer tag: {consumerTag}");
            Console.WriteLine($"Delivery tag: {deliveryTag}");
            Console.WriteLine($"RoutingKey tag: {routingKey}");
            Console.WriteLine($"Message: {Encoding.UTF8.GetString(body.ToArray())}");
            Console.WriteLine("");

            _channel.BasicAck(deliveryTag, false);
        }
    }
}

3. 實作範例 - Direct交換類型

  • 新增交換器名稱及類型
    • 交換器名稱:my-direct-exchange
    • 交換器類型:direct
 

錄製_2023_11_10_14_23_52_347

  • 新增佇列名稱及類型
    • 佇列名稱:ACQ、LightQ、MobileQ
    • 佇列類型:classic
 

錄製_2023_11_10_14_25_09_738

  • 交換器綁定佇列
    • 綁定佇列:
      • ACQ
        • 路由鍵:homeAppliance
      • LightQ
        • 路由鍵:homeAppliance
      • MobileQ
        • 路由鍵:personalDevice
 

錄製_2023_11_10_14_26_15_851

.NET Core Code

 
using MQTTClientSample;
using RabbitMQ.Client;
using System.Text;

public class Program
{
    static void Main()
    {
        //IP地址
        string hostName = "localhost";
        //使用者
        string userName = "guest";
        //密碼
        string password = "guest";

        //定義連線物件
        var connectionFactory = new ConnectionFactory()
        {
            UserName = userName,
            Password = password,
            HostName = hostName
        };

        //建立連線
        var connection = connectionFactory.CreateConnection();
        //建立模型
        var model = connection.CreateModel();
        //建立屬性
        var properties = model.CreateBasicProperties();
        
        //定義發佈訊息
        byte[] messagebuffer = Encoding.Default.GetBytes("Direct Message");
        //訊息發佈
        model.BasicPublish("my-direct-exchange", "homeAppliance", properties, messagebuffer
                     
        Console.WriteLine("Message Sent");
        Console.WriteLine("");
        
        //定義接收訊息物件
        MessageReceiver messageReceiver = new MessageReceiver(model);
                           
        //訂閱佇列
        model.BasicConsume("ACQ", true, "ACQ", messageReceiver);
        model.BasicConsume("LightQ", true, "LightQ", messageReceiver);
        model.BasicConsume("MobileQ", true, "MobileQ", messageReceiver);
    }
}

從實作結果可以發現,發佈者只有將訊息發佈到交換器「my-direct-exchange」上的路由鍵「homeAppliance」,因此在我們訂閱的三個佇列項目,只有ACQ、LightQ兩個佇列接收到訊息。

 

image

4. 實作範例 - Fanout扇出交換類型

  • 新增交換器名稱及類型
    • 交換器名稱:my-fanout-exchange
    • 交換器類型:fanout
 

錄製_2023_11_10_15_52_21_633

  • 交換器綁定佇列
    • 綁定佇列:ACQ、LightQ、MobileQ
 

錄製_2023_11_10_15_58_30_196

.NET Core Code

 
using MQTTClientSample;
using RabbitMQ.Client;
using System.Text;

public class Program
{
    static void Main()
    {
        //IP地址
        string hostName = "localhost";
        //使用者
        string userName = "guest";
        //密碼
        string password = "guest";

        //定義連線物件
        var connectionFactory = new ConnectionFactory()
        {
            UserName = userName,
            Password = password,
            HostName = hostName
        };

        //建立連線
        var connection = connectionFactory.CreateConnection();
        //建立模型
        var model = connection.CreateModel();
        //建立屬性
        var properties = model.CreateBasicProperties();
        
        //定義發佈訊息
        byte[] messagebuffer = Encoding.Default.GetBytes("Fanout Message");
        //訊息發佈
        model.BasicPublish("my-fanout-exchange", "", properties, messagebuffer
                           
        Console.WriteLine("Message Sent");
        Console.WriteLine("");
        
        //定義接收訊息物件
        MessageReceiver messageReceiver = new MessageReceiver(model);
                           
        //訂閱佇列
        model.BasicConsume("ACQ", true, "ACQ", messageReceiver);
        model.BasicConsume("LightQ", true, "LightQ", messageReceiver);
        model.BasicConsume("MobileQ", true, "MobileQ", messageReceiver);
    }
}

從實作結果可以發現,「Fanout」交換類型的特性是不帶「Routing Key」的方式將訊息發佈給有訂閱的消費者,因此在我們訂閱的三個佇列項目都接收的到訊息。

 

image

5. 實作範例 - Headers交換類型

 

錄製_2023_11_10_16_15_52_846

  • 新增交換器名稱及類型
    • 交換器名稱:my-headers-exchange
    • 交換器類型:headers
 

錄製_2023_11_10_16_31_53_599

  • 新增佇列名稱及類型
    • 佇列名稱:HealthQ、SportsQ、EducationQ
    • 佇列類型:classic
 

錄製_2023_11_10_16_35_11_438

  • 交換器綁定佇列
    • 綁定佇列:
      • HealthQ
        • 參數:
          • x-match:any
          • h1:Header1
          • h2:Header2
      • SportsQ
        • 參數:
          • x-match:all
          • h1:Header1
          • h2:Header2
      • EducationQ
        • 參數:
          • x-match:any
          • h1:Header1
          • h2:Header2

.NET Core Code

 
using MQTTClientSample;
using RabbitMQ.Client;
using System.Text;

public class Program
{
    static void Main()
    {
        //IP地址
        string hostName = "localhost";
        //使用者
        string userName = "guest";
        //密碼
        string password = "guest";

        //定義連線物件
        var connectionFactory = new ConnectionFactory()
        {
            UserName = userName,
            Password = password,
            HostName = hostName
        };

        //建立連線
        var connection = connectionFactory.CreateConnection();
        //建立模型
        var model = connection.CreateModel();
        //建立屬性
        var properties = model.CreateBasicProperties();
        
        //定義發佈訊息
        byte[] messagebuffer = Encoding.Default.GetBytes("Header Exchange example 1");
        //定義參數
        Dictionary<string, object> headersArg = new Dictionary<string, object>()
        {
            ["h1"] = "Header1",
            ["h3"] = "Header3"
        };
        //將參數設定給屬性標頭
        properties.Headers = headersArg;
        //訊息發佈
        model.BasicPublish("my-headers-exchange", "", properties, messagebuffer);

        //定義發佈訊息
        messagebuffer = Encoding.Default.GetBytes("Header Exchange example 2");
        //定義參數
        headersArg["h2"] = "Header2";
        //建立屬性
        properties = model.CreateBasicProperties();
        //將參數設定給屬性標頭
        properties.Headers = headersArg;
        //訊息發佈
        model.BasicPublish("my-headers-exchange", "", properties, messagebuffer);
        
        Console.WriteLine("Message Sent");
        Console.WriteLine("");
        
        //定義接收訊息物件
        MessageReceiver messageReceiver = new MessageReceiver(model);
        
        //訂閱佇列
        model.BasicConsume("HealthQ", true, "HealthQ", messageReceiver);
        model.BasicConsume("SportsQ", true, "SportsQ", messageReceiver);
        model.BasicConsume("EducationQ", true, "EducationQ", messageReceiver);
    }
}

從實作結果可以發現,Headers交換類型是透過「OR」、「AND」條件及定義標頭來篩選發佈的訊息,本例先定義「HealthQ」、「EducationQ」具備「Any」的條件,接著再定義「SportsQ」具備「All」的條件,最後再定義其他標頭來模擬篩選情況。

第一個發佈者先發送「h1:Header1」、「h3:Header3」訊息,只有「HealthQ」及「EducationQ」先符合條件,故成功接收到第一批訊息。

第二個發佈者接著發送「h2:Header2」訊息,此時「HealthQ」、「SportsQ」、「EducationQ」因為也符合條件,故成功接收到第二批訊息,其中「SportsQ」為「All」條件,只有當具備兩種標頭才能夠接收成功。

 

image

6. 實作範例 - Topic交換類型

 

錄製_2023_11_13_10_04_10_52

  • 新增交換器名稱及類型

    • 交換器名稱:my-topic-exchange
    • 交換器類型:topic
  • 新增佇列名稱及類型

    • 佇列名稱:HealthQ、SportsQ、EducationQ
    • 佇列類型:classic
 

錄製_2023_11_13_10_04_55_455

  • 交換器綁定佇列
    • 綁定佇列:
      • HealthQ
        • 路由鍵:health.*
      • SportsQ
        • 路由鍵:#.sports.*
      • EducationQ
        • 路由鍵:#.education

.NET Core Code

 
using MQTTClientSample;
using RabbitMQ.Client;
using System.Text;

public class Program
{
    static void Main()
    {
        //IP地址
        string hostName = "localhost";
        //使用者
        string userName = "guest";
        //密碼
        string password = "guest";

        //定義連線物件
        var connectionFactory = new ConnectionFactory()
        {
            UserName = userName,
            Password = password,
            HostName = hostName
        };

        //建立連線
        var connection = connectionFactory.CreateConnection();
        //建立模型
        var model = connection.CreateModel();
        //建立屬性
        var properties = model.CreateBasicProperties();
        
        //定義發佈訊息
        byte[] messagebuffer = Encoding.Default.GetBytes("Drink a lot of Water and stay Healthy!");
        //訊息發佈
        model.BasicPublish("my-topic-exchange", "health.education", null, messagebuffer);

        //定義發佈訊息
        messagebuffer = Encoding.Default.GetBytes("Learn something new everyday!");
        //訊息發佈
        model.BasicPublish("my-topic-exchange", "education", null, messagebuffer);

        //定義發佈訊息
        messagebuffer = Encoding.Default.GetBytes("Stay fit in Mind and Body!");
        //訊息發佈
        model.BasicPublish("my-topic-exchange", "education.health", null, messagebuffer);
        
        Console.WriteLine("Message Sent");
        Console.WriteLine("");
        
        //定義接收訊息物件
        MessageReceiver messageReceiver = new MessageReceiver(model);
        
        //訂閱佇列
        model.BasicConsume("HealthQ", true, "HealthQ", messageReceiver);
        model.BasicConsume("SportsQ", true, "SportsQ", messageReceiver);
        model.BasicConsume("EducationQ", true, "EducationQ", messageReceiver);
    }
}

從實作結果可以發現,Topic交換類型是透過「正規表達式」來進行匹配,首先我們分別發送「health.education」、「education」、「education.health」三則訊息,由於三個佇列「HealthQ」、「SportsQ」、「EducationQ」分別綁定「health.」、「#.sports.」、「#.education」,因此符合條件的有「HealthQ」及「EducationQ」兩種佇列,以下將對其分別解析原理:

  • HealthQ [health.*]:米字號代表一個單詞,因此只有「health.education」符合條件。
  • SportsQ [#.sports.*]:井字號代表零個或多個單字,米字號代表一個單詞,因此沒有任何一條訊息符合條件。
  • EducationQ [#.education]:井字號代表零個或多個單字,因此「health.education」及「education」都符合條件。
 

image

八、案例實作

在現實生活中有許多智慧物聯網的應用,舉例來說像車牌辨識、車輛偵測、人臉辨識、火焰煙霧偵測等就是現今常見的AI應用,而這些AI模組得到的資訊通常都是需要即時且大量的傳輸到Server上,通常作法會是使用如TCP/IP方式來進行封包傳輸,但此一技術的缺點僅限於數據量還不夠多的時候,若傳輸的封包過於繁雜可能會造成黏包、丟包等問題,而這時候可以搭配RabbitMQ的佇列特性來達到此目的,我們將以上述情境透過「.NET Core」及「RabbitMQ」來進行實作。

 

image

本例將分別實作「MQ Server」及「MQ Services」,前者主要模擬伺服端持續接收訊息的消費者(Consumer),而後者則模擬AI模組持續發送訊息的發佈者(Producer),以下將分別進行程式撰寫:

1. 實作MQ Server

步驟 1 - 新增佇列

 

image

步驟 2 - 新增交換器及綁定佇列

 

image

步驟 3 - 程式撰寫

ConsumerModel.cs - 消費者模組

 
using RabbitMQ.Client;
using System.Text;

namespace MQClient
{
    public class ConsumerModel : DefaultBasicConsumer
    {
        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

            Console.WriteLine($"Consuming Message");
            Console.WriteLine($"Message received from the exchange {exchange}");
            Console.WriteLine($"Consumer tag: {consumerTag}");
            Console.WriteLine($"Delivery tag: {deliveryTag}");
            Console.WriteLine($"RoutingKey tag: {routingKey}");
            Console.WriteLine($"Message: {Encoding.UTF8.GetString(body.ToArray())}");
            Console.WriteLine("");
        }
    }
}

Program.cs - 主系統執行

 
using MQClient;
using RabbitMQ.Client;

public class Program
{
    public static void Main()
    {
        //定義MQ連線物件
        var connFactory = new ConnectionFactory()
        {
            HostName = "localhost",
            UserName = "guest",
            Password = "guest"
        };

        //建立連線
        var conn = connFactory.CreateConnection();
        //建立模型
        var model = conn.CreateModel();


        //定義消費者模型
        ConsumerModel consumerModel = new ConsumerModel();

        //訂閱人臉辨識模組
        model.BasicConsume("FaceRecognitionQ", true, "FaceRecognitionQ", consumerModel);

        //訂閱火焰煙霧偵測模組
        //model.BasicConsume("FireSmokeRecognitionQ", true, "FireSmokeRecognitionQ", consumerModel);

        //訂閱車輛偵測 + 車牌辨識模組
        //model.BasicConsume("LicensePlateRecognitionQ", true, "LicensePlateRecognitionQ", consumerModel);

        //訂閱車輛偵測模組
        //model.BasicConsume("VehiclesRecognitionQ", true, "VehiclesRecognitionQ", consumerModel);

        Console.ReadLine();
    }
}

2. 實作MQ Services

定義範例模型 - 程式撰寫

IMQSpecs.cs - MQTT規格介面

 
namespace MQServices.Samples.Interfaces
{
    /// <summary>
    /// MQTT規格介面
    /// </summary>
    public interface IMQSpecs
    {
        /// <summary>
        /// 交換器名稱
        /// </summary>
        public string exchangeName { get; set; }

        /// <summary>
        /// 路由鍵
        /// </summary>
        public string? routingKey { get; set; }

        /// <summary>
        /// 基本屬性
        /// </summary>
        public IBasicProperties? basicProperties { get; set; }

        /// <summary>
        /// 取得佇列訊息
        /// </summary>
        /// <returns></returns>
        public List<MockDataModel> GetQueueMessage();
    }
}

BoxContainer.cs - 偵測矩形容器

 
namespace MQServices.Samples.Models
{
    /// <summary>
    /// 偵測矩形容器
    /// </summary>
    public class BoxContainer
    {
        public int _orgX { get; set; }
        public int _orgY { get; set; }
        public int _width { get; set; }
        public int _height { get; set; }

        public BoxContainer(int orgX, int orgY, int width, int height)
        {
            this._orgX = orgX;
            this._orgY = orgY;
            this._width = width;
            this._height = height;
        }
    }
}

MockDataModel.cs - 虛擬資料模型

 
namespace MQServices.Samples.Models
{
    /// <summary>
    /// 虛擬資料模型
    /// </summary>
    public class MockDataModel
    {
        /// <summary>
        /// 物件標籤名稱
        /// </summary>
        public string ?labelName { get; set; }
        /// <summary>
        /// 偵測矩形容器
        /// </summary>
        public BoxContainer ?boxContainer { get; set; }
        /// <summary>
        /// 偵測相似度
        /// </summary>
        public float score { get; set; } = 0.0f;
    }
}

FaceRecognitionData.cs - 人臉辨識模組

 
using MQServices.Samples.Interfaces;
using MQServices.Samples.Models;
using RabbitMQ.Client;

namespace MQServices.Samples
{
    public class FaceRecognitionData : IMQSpecs
    {
        public string exchangeName { get; set; } = "AI-Topic-Exchange";
        public string?[] routingKeys { get; set; } = new string[] { "FaceRecognition" };
        public IBasicProperties? basicProperties { get; set; }

        /// <summary>
        /// 虛擬資料
        /// </summary>
        private List<MockDataModel> mockDataList = new List<MockDataModel>()
        {
            new MockDataModel()
            {
                labelName = "roger",
                boxContainer = new BoxContainer(38,40,48,48),
                score = 0.9f
            },
            new MockDataModel()
            {
                labelName = "rock",
                boxContainer = new BoxContainer(65,60,50,50),
                score = 0.95f
            },
            new MockDataModel()
            {
                labelName = "kay",
                boxContainer = new BoxContainer(120,80,120,120),
                score = 0.9f
            },
            new MockDataModel()
            {
                labelName = "eric",
                boxContainer = new BoxContainer(300,200,180,180),
                score = 0.45f
            }
        };

        public List<MockDataModel> GetQueueMessage()
        {
            return mockDataList.ToList();
        }
    }
}

FireSmokeRecognitionData.cs - 火焰煙霧偵測模組

 
using MQServices.Samples.Interfaces;
using MQServices.Samples.Models;
using RabbitMQ.Client;

namespace MQServices.Samples
{
    public class FireSmokeRecognitionData : IMQSpecs
    {
        public string exchangeName { get; set; } = "AI-Topic-Exchange";
        public string?[] routingKeys { get; set; } = new string[] { "FireSmokeRecognition" };
        public IBasicProperties? basicProperties { get; set; }

        /// <summary>
        /// 虛擬資料
        /// </summary>
        private List<MockDataModel> mockDataList = new List<MockDataModel>()
        {
            new MockDataModel()
            {
                labelName = "fire",
                boxContainer = new BoxContainer(120,30,50,120),
                score = 0.65f
            },
            new MockDataModel()
            {
                labelName = "fire",
                boxContainer = new BoxContainer(150,45,60,180),
                score = 0.68f
            },
            new MockDataModel()
            {
                labelName = "smoke",
                boxContainer = new BoxContainer(180,30,180,150),
                score = 0.75f
            },
            new MockDataModel()
            {
                labelName = "smoke",
                boxContainer = new BoxContainer(200,220,230,180),
                score = 0.85f
            },
            new MockDataModel()
            {
                labelName = "fire",
                boxContainer = new BoxContainer(350,200,80,50),
                score = 0.9f
            }
        };

        public List<MockDataModel> GetQueueMessage()
        {
            return mockDataList.ToList();
        }
    }
}

LicensePlateRecognitionData.cs - 車牌辨識模組

 
using MQServices.Samples.Interfaces;
using MQServices.Samples.Models;
using RabbitMQ.Client;

namespace MQServices.Samples
{
    public class LicensePlateRecognitionData : IMQSpecs
    {
        public string exchangeName { get; set; } = "AI-Topic-Exchange";
        public string?[] routingKeys { get; set; } = new string[] 
{ "VehiclesRecognition.LicensePlateRecognition" };
        public IBasicProperties? basicProperties { get; set; }

        /// <summary>
        /// 虛擬資料
        /// </summary>
        private List<MockDataModel> mockDataList = new List<MockDataModel>()
        {
            new MockDataModel()
            {
                labelName = "AI-4329",
                boxContainer = new BoxContainer(120,135,150,50),
                score = 0.9f
            },
            new MockDataModel()
            {
                labelName = "YX-2700",
                boxContainer = new BoxContainer(250,175,152,48),
                score = 0.8f
            },
            new MockDataModel()
            {
                labelName = "J0-5032",
                boxContainer = new BoxContainer(175,120,155,52),
                score = 0.85f
            },
            new MockDataModel()
            {
                labelName = "PK-8062",
                boxContainer = new BoxContainer(175,120,155,52),
                score = 0.9f
            },
            new MockDataModel()
            {
                labelName = "LN-1139",
                boxContainer = new BoxContainer(175,120,155,52),
                score = 0.95f
            }
        };

        public List<MockDataModel> GetQueueMessage()
        {
            return mockDataList.ToList();
        }
    }
}

VehiclesRecognitionData.cs - 車輛偵測模組

 
using MQServices.Samples.Interfaces;
using MQServices.Samples.Models;
using RabbitMQ.Client;

namespace MQServices.Samples
{
    public class VehiclesRecognitionData : IMQSpecs
    {
        public string exchangeName { get; set; } = "AI-Topic-Exchange";
        public string?[] routingKeys { get; set; } = new string[]
{"VehiclesRecognition","VehiclesRecognition.LicensePlateRecognition"};
        public IBasicProperties? basicProperties { get; set; }

        /// <summary>
        /// 虛擬資料
        /// </summary>
        private List<MockDataModel> mockDataList = new List<MockDataModel>()
        {
            new MockDataModel()
            {
                labelName = "car",
                boxContainer = new BoxContainer(200,300,50,60),
                score = 0.9f
            },
            new MockDataModel()
            {
                labelName = "truck",
                boxContainer = new BoxContainer(30,35,45,100),
                score = 1.0f
            },
            new MockDataModel()
            {
                labelName = "bus",
                boxContainer = new BoxContainer(65,70,30,80),
                score = 9.5f
            }
        };

        public List<MockDataModel> GetQueueMessage()
        {
            return mockDataList.ToList();
        }
    }


}

撰寫Windows服務程式 - 程式撰寫

IMQTTService.cs - MQ服務介面

 
namespace MQServices.RabbitMQ.Interfaces
{
    /// <summary>
    /// MQTT介面
    /// </summary>
    public interface IMQTTService
    {
        /// <summary>
        /// 訂閱MQ
        /// </summary>
        public void SubscriptionMQ();

    }
}

MQTTService.cs - MQ服務邏輯

 
using Microsoft.Extensions.Logging;
using MQServices.RabbitMQ.Interfaces;
using MQServices.Samples;
using MQServices.Samples.Interfaces;
using MQServices.Samples.Models;
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

namespace MQServices.RabbitMQ.Services
{
    /// <summary>
    /// MQTT服務
    /// </summary>
    public class MQTTService : IMQTTService
    {
        /// <summary>
        /// 定義Log
        /// </summary>
        private readonly ILogger _logger;

        /// <summary>
        /// 定義MQ連線物件
        /// </summary>
        private ConnectionFactory connFactory { get; set; } = new ConnectionFactory()
        {
            HostName = "localhost",
            UserName = "guest",
            Password = "guest"
        };

        /// <summary>
        /// 定義AI模組資料清單
        /// </summary>
        private readonly List<IMQSpecs> mockDataList = new List<IMQSpecs>()
        {
            new FaceRecognitionData(),
            new FireSmokeRecognitionData(),
            new LicensePlateRecognitionData(),
            new VehiclesRecognitionData()
        };

        /// <summary>
        /// 初始化
        /// </summary>
        public MQTTService(ILogger<MQTTService> logger)
        {
            this._logger = logger;
        }

        /// <summary>
        /// 訂閱MQ
        /// </summary>
        public void SubscriptionMQ()
        {
            try
            {
                //建立連線
                var conn = connFactory.CreateConnection();
                //建立模型
                var model = conn.CreateModel();

                //使用GUID的雜湊碼可以達到更隨機的亂數
                Random rnd = new Random(Guid.NewGuid().GetHashCode());

                Console.WriteLine();
                Console.WriteLine("===================================");
                Console.WriteLine();

                foreach (var mockData in mockDataList)
                {
                    var queueMessageList = mockData.GetQueueMessage();
                    List<MockDataModel> rndMockDataList = new List<MockDataModel>();

                    for (int i = 0; i < queueMessageList.Count; i++)
                    {
                        //隨機取N個資料
                        var rndIdx = rnd.Next(queueMessageList.Count);
                        rndMockDataList.Add(queueMessageList[rndIdx]);
                    }

                    var queueJSON = JsonSerializer.Serialize(rndMockDataList,
                        new JsonSerializerOptions() { WriteIndented = true });

                    //定義發佈訊息
                    byte[] messagebuffer = Encoding.Default.GetBytes(queueJSON);

                    Console.WriteLine($"Publish exchange: {mockData.exchangeName}");

                    foreach (var routingKey in mockData.routingKeys)
                    {
                        //訊息發佈
                        model.BasicPublish(mockData.exchangeName, routingKey, mockData.basicProperties, messagebuffer);

                        Console.WriteLine($"Routing key: {routingKey}");
                    }
  
                    Console.WriteLine();
                }
                Console.WriteLine("===================================");

            }
            catch (Exception ex)
            {
                //Log
                this._logger.LogError($"【SubscriptionMQ】{ex.ToString()}");
            }
        }
    }
}

Worker.cs - 服務核心

 
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQServices.RabbitMQ.Interfaces;

namespace MQServices
{
    public class Worker : BackgroundService
    {
        /// <summary>
        /// 服務運作間隔(ms)
        /// </summary>
        protected int serviceInterval { get; set; } = 2500;
        /// <summary>
        /// 定義logger
        /// </summary>
        private readonly ILogger<Worker> _logger;
        /// <summary>
        /// 定義服務提供者
        /// </summary>
        private readonly IServiceProvider _serviceProvider;

        /// <summary>
        /// 初始化
        /// </summary>
        /// <param name="logger"></param>
        public Worker(ILogger<Worker> logger,IServiceProvider serviceProvider)
        {
            this._logger = logger;
            this._serviceProvider = serviceProvider;
        }

        /// <summary>
        /// 啟動服務
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public override Task StartAsync(CancellationToken cancellationToken)
        {
            //TODO:Log
            this._logger.LogInformation("【StartAsync】準備啟動服務");

            return base.StartAsync(cancellationToken);
        }

        /// <summary>
        /// 停止服務
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public override Task StopAsync(CancellationToken cancellationToken)
        {
            //TODO:Log
            this._logger.LogInformation("【StopAsync】服務已停止");

            return base.StopAsync(cancellationToken);
        }

        /// <summary>
        /// 運行服務
        /// </summary>
        /// <param name="stoppingToken"></param>
        /// <returns></returns>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {
                //TODO:Log
                this._logger.LogInformation("【ExecuteAsync】服務已啟動");
                
                var scope = this._serviceProvider.CreateScope();
                var mQService = scope.ServiceProvider.GetService<IMQTTService>()!;

                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {

                        //開始訂閱MQ
                        mQService.SubscriptionMQ();

                        await Task.Delay(serviceInterval);

                    }catch (Exception ex)
                    {
                        //TODO:Log
                        this._logger.LogError($"【ExecuteAsync】{ex.ToString()}");
                    }
                }

                //TODO:Log
                this._logger.LogInformation("【ExecuteAsync】準備停止服務");
            }
            catch (Exception ex)
            {
                //TODO:Log
                this._logger.LogError($"【ExecuteAsync】{ex.ToString()}");
            }
        }
    }
}

Program.cs - 主系統執行

 
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MQServices;
using MQServices.RabbitMQ.Interfaces;
using MQServices.RabbitMQ.Services;
using NLog.Extensions.Logging;

public class Program
{
    private static async Task Main()
    {
        //建立Host服務
        IHost host = Host.CreateDefaultBuilder().UseWindowsService(options =>
        {
            options.ServiceName = "MQServices";
        })
        .ConfigureLogging(logging =>
        {
            logging.AddNLog("nlog.config");
        })
        .ConfigureServices((hostBuilder, services) =>
        {
            services.AddScoped<IMQTTService, MQTTService>();
            services.AddHostedService<Worker>();
        })
        .Build();

        await host.RunAsync();
    }
}

3. 實作結果

.NET Core 範例程式下載

下載連結 1:MQServices
下載連結 2:MQClient


當訂閱「人臉辨識佇列」之後,可以取得「人臉辨識模組」所發佈的訊息

錄製_2023_11_14_14_12_31_527

當訂閱「車牌辨識佇列」之後,可以分別取得「車輛偵測模組」及「車牌辨識模組」所發佈的訊息

錄製_2023_11_14_15_49_20_422

【Roger】洪承孝(昕力DTD)