初探Spring Cloud Stream
一、概述
Spring Cloud Stream (以下簡稱Stream) 是一個用來建構訊息驅動微服務的框架,讓微服務之間透過訊息代理(Message Broker)彼此溝通。
Stream裡面運用到一些現存的框架,比如Sping Integration和 Spring Message,這些框架雖然運作的很好,但是有與訊息代理之間密切耦合的缺點。Stream做的事情是對於這些框架做進一步的封裝,由於Stream是建立一個在Spring Boot基礎上運作的框架,讓Spring根據配置(configuration)和依賴(dependency)文件在運行時自動注入,這意味著可以透過修改配置文件無痛切換到不同的訊息代理,讓微服務和訊息代理之間的耦合降低。
二、訊息驅動 ─ Message Driven
訊息驅動意思是讓應用服務透過訊息流的方式異步溝通,使服務之間不直接調用來降低彼此的依賴與牽制。
在微服務的架構中,一個請求到回應可能會經過多個服務,假設有一個請求如下圖,A服務直接內部調用B服務:
假設A服務本身的業務處理時間很短、B服務需要比較久的處理時間,我們為B服務啟動多個實體(Instance)來因應;A服務為了等待B服務而無法接收下個請求,導致A服務必須配合B服務啟動多個實體,但對A服務而言其實是沒有必要的。
若換做是訊息驅動的方式,A服務不直接呼叫B服務,而是發送一個訊息事件(Event)到訊息代理(Message Broker)後就可以接待下個請求;而B服務去訂閱A服務傳到訊息代理的事件,如此一來A、B服務只要約定好事件的內容就可以獨立作業,也可以依照各自的需求去擴增應用實體。
三、訊息代理 ─ Message Broker
下面訊息代理會以RabbitMQ來舉例。
RabbitMQ使用Advanced Message Queuing Protocol (AMQP) 作為訊息傳輸的協定,AMQP有一些特性,比如說他可以應用在跨組織、跨平台(應用由不同語言撰寫)系統的聯絡,而這些系統允許非同步處理(Async)或是強調可靠傳達(Reliable)的訊息交換方式,這些特性與微服務搭配起來相當合適。
以下是RabbitMQ 的結構圖:
訊息發送方將訊息傳遞到RabbitMQ中的交換器,交換器依照一些規則將訊息綁定(Binding)在一到多個訊息佇列,而訂閱方透過訂閱訊息佇列來獲得發送方的資訊,如此一來就完成系統之間的訊息溝通。
了解完RabbitMQ的運作架構後,接著來看Stream的運作模型和如何與訊息代理合作。
四、Stream運作模型
Stream為服務注入輸入(input)和輸出(output)通道,通道連接到的綁定器(Binder),由綁定器去跟訊息代理─也就是圖中的Middleware溝通。
綁定器抽象─Binder Abstraction
綁定器隔離了服務和訊息代理,面向服務它整合構成訊息生產和消費的API,而面向消息代理它實作了與代理溝通的設定和細節,因此服務不用考慮如何與不同的訊息代理對接,也可以在不更動到程式的情況下,透過更換綁定器的依賴和配置來轉換不同的訊息代理。
持續發布與訂閱─Persistent Publish-Subscribe Support
這個發佈訂閱模型演示,當一個請求需要發送給多個應用服務做處理,可以讓這個請求成為一個訊息流給多個應用服務去訂閱,再各自拓展下游的應用,這樣既不會浪費現有訊息流,也降低了發布者與消費者的複雜度和耦合度。
消費群體─Consumer group
Stream也提供了分組的功能。當一個服務啟動了多個實體,這些實體可能以輪巡等負載均衡的方式在運作,這個時候所有的實體是訂閱同個主題的。透過配置指定分組(group)方式,可以確保當中只會有一個實體接到訊息。
五、實作
1. 安裝RabbitMQ
RabbitMQ從這裡安裝,為求快速演示這邊使用CloudAMQP提供的雲端安裝:
登錄後點擊"Create New Instance"
設定名稱後點擊"Select Region"
設定雲端中心區域後點擊"Review"
點擊"Create Intance"創建完成
接著你可以在Detail中看到你的使用者名稱、密碼、URL等資訊,等等會需要用URL來連接RabbitMQ;另外點擊左邊的"RabbitMQ Manager"可以開啟監控面板。
2. Project 設置
接著讓我們創建訊息的發送方和接收方:
發送方:
pom.xml 中引入必需的依賴 ─ spring-boot-starter-web和spring-cloud-starter-stream-rabbit
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
或是使用Spring Initializr 快速創建(依賴選擇Spring Web再手動加入spring-cloud-starter-stream-rabbit)
HelloBinding.java
package com.producer;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface HelloBinding {
@Output("greetingChannel")
MessageChannel greeting();
}
我們訂一個類型為MessageChannel的抽象方法greeting,之後會需要用到MessageChannel的send方法將訊息傳出去,加上@Output(),裡面的名稱是自訂的,用來識別通道(Channel)。
ProducerApplication.java
package com.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
在程式的進入點加上@EnableBinding(),參數可以填入一到多個介面(interface),這裡就填上剛剛創建的HelloBinding.class,它的作用是連接到訊息代理(Message Broker)。
ProducerController.java
package com.producer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
private MessageChannel greet;
public ProducerController(HelloBinding binding) {
greet = binding.greeting();
}
@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
String greeting = "Hello, " + name + "!";
Message<String> msg = MessageBuilder.withPayload(greeting)
.build();
this.greet.send(msg);
}
}
創建一支API當作訊息的發送者,當訪問這支API時,會先用MessageBuilder創建一則String訊息,使用MessageChannel的send方法送給訊息代理,這邊API加入一個參數當作識別每次傳進來的內容。
application.properties
spring.rabbitmq.addresses=amqp://itf...
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8081
最後設定properties檔案,告訴spring要如何連到RabbitMQ,addresses的參數是方才在CloudAMQP創建好的實體,URL可以在Dta中找到;我們可以設定綁定的通道greetingChannel在RabbitMQ顯示的exchange名稱為greetings,如果未設定會比照通道名稱,寫到這邊發送方就完成了。
接收方:
pom.xml:與發送方引入的依賴是一樣的。
HelloBinding.java
package com.consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface HelloBinding {
String GREETING_CHANNEL = "greetingChannel";
@Input(GREETING_CHANNEL)
SubscribableChannel greeting();
}
與發送方兩個不一樣的地方在於,訂定一個類型為SubscribableChannel的抽象方法greeting,加上@Input(),填入我們要監聽的greetingChannel通道。
ConsumerApplication.java
package com.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
public class ConsuemerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsuemerApplication.class, args);
}
這邊程式的進入點不做修改。
HelloListener.java
package com.consumer;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@EnableBinding(HelloBinding.class)
public class HelloListener {
@StreamListener(HelloBinding.GREETING_CHANNEL)
public void processGreetingChannel(String msg) {
System.out.println(msg);
}
}
創建一個類別HelloListener來處理接收的訊息,在類別加上@EnableBinding()來啟用綁定,接著在接收的方法加上@StreamListener("通道名稱"),這邊讓它列印在console上。
application.properties
spring.rabbitmq.addresses=amqp://itf...
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=9081
這邊設定與發送方的大致相同,更動port號即可。
六、測試
啟動producer和consumer後,呼叫greet API:http://localhost:8081/greet/Peiiun。
consumer的console正確顯示傳來的訊息:Hello, Peiiun。
現在我們consumer多啟動一個實體,port設定為9082,呼叫greet API:http://localhost:8081/greet/Amy:
Consumer和Consumer2都接收到訊息:Hello, Amy。
當我們希望只有其中一個實體接收到訊息,這個時候必須在consumer配置消費群體(Consumer Group):
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
重啟後,接著呼叫greet API 四次測試,param分別以One、Two、Three、Four分辨,可以看到訊息已分組,以輪巡的方式給其中一個Consumer接收。
七、結論
以上是Spring Cloud Stream主要概念,並演示了單純使用Spring Cloud Stream建構訊息驅動的微服務。應用服務使用訊息驅動的方式溝通,可以讓非核心業務使用異步的方式作業,提升核心業務的客戶體驗;另外Spring Cloud Stream讓更改訊息代理變得相當輕鬆,只要更改代理的依賴和配置檔案即可,實現應用服務和訊息代理之間的無感知。
參考資料:
Spring Cloud Stream Core
Spring Cloud Stream with RabbitMQ: Message-Driven Microservices
AMQP