RabbitMQ

建立RabbitMQ & SpringBoot集成環境

陳志祥 2020/11/16 13:59:32
222

介紹

RabbitMQ是實現了進階訊息佇列協定AMQP)的開源訊息代理軟體(亦稱訊息導向中介層)。RabbitMQ伺服器是用Erlang語言編寫的,Erlang是一個使用垃圾回收的虛擬機運行的併發編程語言,類似Java,而群集和故障轉移是構建在開放電信框架上的。所有主要的程式語言均有與代理介面通訊的客戶端函式庫。(維基百科)

 

實作

 

使用Docker建立RabbitMQ 服務,並透過Springboot AutoConfiguration特性,快速建立一個Web Application來與RabbitMQ進行send and receive測試。

 

首先執行docker 指令,下載RabbitMQ映像檔

 

docker pull rabbitmq:management

 

啟動服務 default port 5672 , admin port 15672 

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

 

服務啟動後即可登入RabbitMQ管理介面,預設帳密:guest , guest

 

建立simple springboot web project .import amqp dependency

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>

 

 

application.properties參數設定中加入RabbitMQ 連線參數

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.activemq.user=guest
spring.activemq.password=guest

 

 

建立RabbitmqConfig.java,自訂部分設定,創建Queuetpu.queue,並且加入@EnableRabbit 標籤來讓@RabbitListener 標籤生效。

package com.tpisoftware;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitmqConfig {

/**
* 將自定義的消息類序列化成json格式,再轉成byte構造 Message,在接收消息時,會將接收到的 Message 再反序列化成自定義的類。
* @param objectMapper
* @return
*/
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}

/**
* create Rabbit Queue
* @return
*/
@Bean
public Queue tpuQueue() {
return new Queue("tpu.queue");
}

}

 

建立一個收發訊息的封裝物件User.java

package com.tpisoftware;

import java.io.Serializable;

public class User implements Serializable{

private String name;
private Integer age;

public User() {
}

public User(String name, Integer age) {
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}

@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

 

 

建立SendMessageController.java,讓使用者可以透過http GET method傳送name & age並用User封裝後送Message Queue中。

package com.tpisoftware;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendMessageController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/put_message")
public String putMessage(@RequestParam(name="name") String name, @RequestParam(name="age") Integer age) {
rabbitTemplate.convertAndSend("tpu.queue", new User(name,age));
return "this is quick demo for Spring Boot!";
}
}

 

 

 

建立ReceiveMessageListener.java,印出receive message表示監聽指定Queue中的資料並進行消費。

package com.tpisoftware;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiveMessageListener {

/**
* 監聽Queue中是否有資料,若有資料則進行消費。
* @param user
*/
@RabbitListener(queues={"tpu.queue"})
public void receive(User user) {
System.out.println("receive message from queue:"+user.toString());
}

}

 

 

程式完成後,啟動 SpringBootApplication

 

 

在啟動log中可以看到是否成功與RabbitMQ Service建立連線。

 

 

測試

 

透過curl 發送請求給Server.

curl -X GET 'http://localhost:8080/put_message?name=Tian&age=28'
curl -X GET 'http://localhost:8080/put_message?name=Lin&age=18'
curl -X GET 'http://localhost:8080/put_message?name=Peter&age=51'
curl -X GET 'http://localhost:8080/put_message?name=John&age=33'

 

ReceiveMessageListener 接收到tpu.queue發布的訊息,並印在Server Console中。

 

 

 

透過RabbitMQ managementUI 可以觀察佇列內訊息數量的變化。

 

結語

RabbitMQ非常容易的實現於SpringBoot上,而RabbitMQ在與訊息佇列主流 Kafka使用上的選擇,

如果你想要一個簡單、傳統的發布/訂閱消息代理,選用RabbitMQ即可,

RabbitMQ還有靈活的路由預測、優先級隊列選項優點。

 

 

如果需要高流量及大數據的需求,相較下Kafka會是比較好的選擇。

 

 

 

 

參考資料

 

維基百科:https://zh.wikipedia.org/wiki/RabbitMQ

Medium :  https://medium.com/better-programming/kafka-vs-rabbitmq-why-use-kafka-8401b2863b8b

kknews : https://kknews.cc/code/lvxagng.html

ITRead01 : https://www.itread01.com/content/1542097086.html

陳志祥