Reactive Programming - Reactor 3
Reactor 3
Reactive Programming響(反)應式編程
一、Introduction to Reactive Programming
- 響應式編程(Reactive Programing),是一個事件觸發機制,並且是以異步和非阻塞的方式發送和接收。
- 是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程方式。
- 和Java 8 Stream非常類似,但仍存在一些區別:Reactive是push-based,是基於事件驅動模式來進行編程的,可以先將整個處理數據量構造完成,然後向其中填充數據,在出口處可以取出轉換結果。
圖片參考自:
二、Why reactive
- 可以提高程序的性能
- 一般命令式編碼阻塞浪費資源,一旦程序涉及一些延遲(例如數據庫請求或網絡調用、 I/O ),資源就會被浪費,因為線程(可能很多線程)現在處於空閒狀態,等待數據。
- 通過編寫異步、非阻塞代碼,可以讓執行切換到另一個使用相同底層資源的活動任務,並在異步處理完成後返回到當前進程。
- Java 提供了兩種異步編程模型,但使用麻煩:
• Callbacks:
• Call back很難組合在一起,很快導致代碼難以閱讀和維護。
(稱為 callback-hell)。
• Futures:
• Future<T>、 CompletableFuture <T> (JAVA 8)
• 功能簡單可使用情境少。
• 通過調用該get()方法很容易導致另一個對象阻塞情況。
三、 Interactions – Publisher & Subscriber
• 響應流序列中,數據源Publisher產生數據。但是默認情況下,在 Subscriber註冊(訂閱)之前它什麼都不做;訂閱行為發生後,此時會將數據推送給Subscriber 。
圖片參考自:
四、Reactor Core – Flux, Mono
• Flux<T>, for 0…N elements
Ø Flux<T>是一個 Reactive Streams Publisher,增加了許多可用於生成、轉換、編排 Flux 序列的運算符。
Ø 它可以發出 0 到n 個 <T>元素(onNext事件)然後完成或錯誤(onComplete和onError終端事件)。
圖片參考自:
• Mono<T>, for most 1 element
Ø Mono<T>是 Reactive Streams Publisher,還增加了許多可用於生成、轉換、編排 Mono 序列的運算符。
Ø 它是最多Flux可以發出1 個<T>元素的特化:Mono 要麼是有值的(完整的元素),要麼是空的(完整的沒有元素),要麼是失敗的(錯誤)。
圖片參考自:
五、Back Pressure
• Back Pressure(回壓)是指, Publisher傳遞到Subscriber的數據需要進行處理,然而Publisher推送的速度又過快, Subscriber可能承受能力不足導致問題。
• 造成Back Pressure(回壓)的問題,有兩個前提:
1. Publisher與Subscriber不在同一個線程中。
2. Publisher發出數據的速度高於Subscriber處理數據的速度。
圖片參考自:
六、Getting started
1. gradle
// .. 以上視專案需求配置
dependencies {
// Reactor 核心
implementation 'io.projectreactor:reactor-core'
// Reactor 提供了 reactor-test,可以透過其 StepVerifier 來協助測試
testImplementation group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE'
// assert 驗證。
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.8.0'
// JUnit test framework
testImplementation 'junit:junit:4.12'
}
2. Mono and Flux Test
// =========================================================
// ============= Static Generators Flux、Mono ==============
// =========================================================
/**
* just():可以指定序列中包含的全部元素。創建出來的序列在發布這些元素之後會自動結束。
* */
@Test
public void justTest() {
Mono.just("Mono Hello world!").subscribe(System.out::println);
Flux.just("Flux ", "Hello ", "World!").subscribe(System.out::print);
// no subscribe
Flux.just("Flux ", "Hello ", "World!");
}
/**
* range(int start, int count):創建包含從start 起始的count 個數量的Integer 對象的序列。
* */
@Test
public void FluxRangeTest() {
Flux.range(1, 10).subscribe(System.out::println);
}
/**
* fromArray():可以從一個數組、Iterable 對像或Stream 對像中創建Flux 對象。
* */
@Test
public void FluxFromArrayTest() {
Integer[] array = {1, 2, 3};
Flux.fromArray(array).map(i -> i+1).subscribe(System.out::println);
}
/**
* buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組數據(按20分組)
* */
@Test
public void FluxBufferTest() {
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
}
/**
* 錯誤結果測試.
*/
@Test
public void FluxErrorHandlingTest() {
Flux.range(1, 6)
.map(i -> 10 / (i - 3)) // 1, 2, 3 (error)
.map(i -> i * i)
.subscribe(System.out::println, System.err::println);
}
Output
Mono Hello world!
Flux Hello World!
-------------------------------------------------------
1
2
3
4
5
6
7
8
9
10
-------------------------------------------------------
2
3
4
-------------------------------------------------------
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
-------------------------------------------------------
25
100
java.lang.ArithmeticException: / by zero
3. Using StepVerifier
// =========================================================
// =============== StepVerifier 步驟驗證器 ================
// =========================================================
/**
* StepVerifier - 步驟驗證器 @reactor.test.StepVerifier
*/
@Test
public void stepVerifierTest() {
Flux<String> source = Flux.just("Eric", "Monica", "Mark", "Alen", "Frank", "Casper", "Olivia", "Emily", "Paul")
.filter(name -> name.length() == 4)
.map(String::toUpperCase);
StepVerifier
.create(source)
.expectNext("ERIC")
.expectNextMatches(name -> name.startsWith("MA"))
.expectNext("ALEN", "PAUL")
.expectComplete()
.verify();
List<String> list = new ArrayList<>();
// Assertions
source.subscribe(s -> list.add(s));
System.out.println(list);
Assertions.assertThat(list).containsExactly("ERIC","MARK","ALEN","PAUL");
}
Output
[ERIC, MARK, ALEN, PAUL]
4. Spring WebFlux 應用
使用 Spring Webflux 構建 Uer CRUD
User.java
public class User {
private Integer id;
private String name;
private Integer age;
public User() {}
public User(Integer id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
// getter setter 略...
}
UserController.java
@RestController
@RequestMapping("/user")
public class UserController {
// 模擬資料庫中的資料
private Map<Integer, User> users = new ConcurrentHashMap<>();
// 初始化資料
public UserController() {
users.put(1, new User(1, "Mark", 20));
users.put(2, new User(2, "Paul", 19));
users.put(3, new User(3, "Emily", 18));
}
// 查詢一個使用者
@GetMapping("/findOne/{id}")
public Mono<User> findOne(@PathVariable Integer id) {
return Mono.justOrEmpty(users.get(id));
}
// 查詢全部使用者
@GetMapping("/findAll")
public Flux<User> findAll() {
return Flux.fromIterable(users.values());
}
// 新增一個使用者
@PostMapping("/save")
public Flux<User> save(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
userMono.doOnNext(u -> {
// 儲存一個使用者
int id = users.size() + 1;
u.setId(id);
users.put(id, u);
}).subscribe();
return Flux.fromIterable(users.values());
}
// 刪除一個使用者
@PostMapping("/delete")
public Flux<User> delete(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
userMono.doOnNext(u -> {
// 刪除一個使用者
users.remove(u.getId());
}).subscribe();
return Flux.fromIterable(users.values());
}
// 更新一個使用者
@PostMapping("/update")
public Flux<User> update(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
userMono.doOnNext(u -> {
// 更新一個使用者
int id = u.getId();
users.remove(id);
users.put(id, u);
}).subscribe();
return Flux.fromIterable(users.values());
}
}
Output
findOne
findAll
save
delete
update
七、 References