java Reactive

Reactive Programming - Reactor 3

許文 Richard Hsu 2021/11/02 19:03:31
700

 

Reactor 3


Reactive Programming()應式編程

 

一、Introduction to Reactive Programming

  1. 響應式編程(Reactive Programing),是一個事件觸發機制,並且是以異步和非阻塞的方式發送和接收。
  2. 是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程方式。
  3. Java 8 Stream非常類似,但仍存在一些區別:Reactivepush-based,是基於事件驅動模式來進行編程的,可以先將整個處理數據量構造完成,然後向其中填充數據,在出口處可以取出轉換結果。

圖片參考自:

 

二、Why reactive

  1. 可以提高程序的性能
  2. 一般命令式編碼阻塞浪費資源,一旦程序涉及一些延遲(例如數據庫請求或網絡調用、 I/O ),資源就會被浪費,因為線程(可能很多線程)現在處於空閒狀態,等待數據。
  3. 通過編寫異步、非阻塞代碼,可以讓執行切換到另一個使用相同底層資源的活動任務,並在異步處理完成後返回到當前進程。

 

  1. Java 提供了兩種異步編程模型,但使用麻煩:

Callbacks:

Call back很難組合在一起,很快導致代碼難以閱讀和維護。

(稱為 callback-hell)。

Futures:

Future<T> CompletableFuture <T> (JAVA 8)

功能簡單可使用情境少。

通過調用該get()方法很容易導致另一個對象阻塞情況。

 

 

三、  Interactions – Publisher & Subscriber

響應流序列中,數據源Publisher產生數據。但是默認情況下,在 Subscriber註冊(訂閱)之前它什麼都不做;訂閱行為發生後,此時會將數據推送給Subscriber

圖片參考自:

 

四、Reactor Core – Flux, Mono

•  Flux<T>, for 0…N elements

Ø  Flux<T>是一個 Reactive Streams Publisher,增加了許多可用於生成、轉換、編排 Flux 序列的運算符。

Ø  它可以發出 0 n <T>元素(onNext事件)然後完成或錯誤(onCompleteonError終端事件)。

圖片參考自:

 

 Mono<T>, for most 1 element

Ø  Mono<T> Reactive Streams Publisher,還增加了許多可用於生成、轉換、編排 Mono 序列的運算符。

Ø  它是最多Flux可以發出1 <T>元素的特化:Mono 要麼是有值的(完整的元素),要麼是空的(完整的沒有元素),要麼是失敗的(錯誤)。

圖片參考自:

五、Back Pressure

   Back Pressure(回壓)是指, Publisher傳遞到Subscriber的數據需要進行處理,然而Publisher推送的速度又過快, Subscriber可能承受能力不足導致問題。

 

  造成Back Pressure(回壓)的問題,有兩個前提:

1. PublisherSubscriber不在同一個線程中。

2. Publisher發出數據的速度高於Subscriber處理數據的速度。 

圖片參考自:

六、Getting started

 

1. gradle

// .. 以上視專案需求配置

dependencies {
	
	// Reactor 核心
	implementation 'io.projectreactor:reactor-core'

    // Reactor 提供了 reactor-test,可以透過其 StepVerifier 來協助測試
	testImplementation group: 'io.projectreactor', name: 'reactor-test', version: '3.1.0.RELEASE'
	
	// assert 驗證。
	testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.8.0'
	
	// JUnit test framework
    testImplementation 'junit:junit:4.12'

}

 

2. Mono and Flux Test

	// =========================================================
	// ============= Static Generators Flux、Mono ==============
	// =========================================================
	
	/**
	 * just():可以指定序列中包含的全部元素。創建出來的序列在發布這些元素之後會自動結束。
	 * */
	@Test
	public void justTest() {
		Mono.just("Mono Hello world!").subscribe(System.out::println);
		
		Flux.just("Flux ", "Hello ", "World!").subscribe(System.out::print);
		
		// no subscribe
		Flux.just("Flux ", "Hello ", "World!");
	}
	/**
	 * range(int start, int count):創建包含從start 起始的count 個數量的Integer 對象的序列。
	 * */
	@Test
	public void FluxRangeTest() {
		Flux.range(1, 10).subscribe(System.out::println);
	}
	
	/**
	 * fromArray():可以從一個數組、Iterable 對像或Stream 對像中創建Flux 對象。
	 * */
	@Test
	public void FluxFromArrayTest() {
		Integer[] array = {1, 2, 3};
		Flux.fromArray(array).map(i -> i+1).subscribe(System.out::println);
	}
	
	/**
	 * buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組數據(按20分組)
	 * */
	@Test
	public void FluxBufferTest() {
		Flux.range(1, 100).buffer(20).subscribe(System.out::println);
	}
	
	/**
	 * 錯誤結果測試.
	 */
	@Test
	public void FluxErrorHandlingTest() {
		Flux.range(1, 6)
		.map(i -> 10 / (i - 3)) // 1, 2, 3 (error)
		.map(i -> i * i)
		.subscribe(System.out::println, System.err::println);
	}

 

Output

Mono Hello world!
Flux Hello World!
-------------------------------------------------------
1
2
3
4
5
6
7
8
9
10
-------------------------------------------------------
2
3
4
-------------------------------------------------------
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
-------------------------------------------------------
25
100
java.lang.ArithmeticException: / by zero

 

 3. Using StepVerifier

	// =========================================================
	// =============== StepVerifier 步驟驗證器 ================
	// =========================================================
	
	/**
	 * StepVerifier - 步驟驗證器 @reactor.test.StepVerifier
	 */
	@Test
	public void stepVerifierTest() {
		Flux<String> source = Flux.just("Eric", "Monica", "Mark", "Alen", "Frank", "Casper", "Olivia", "Emily", "Paul")
				  .filter(name -> name.length() == 4)
				  .map(String::toUpperCase);
		
		StepVerifier
		  .create(source)
		  .expectNext("ERIC")
		  .expectNextMatches(name -> name.startsWith("MA"))
		  .expectNext("ALEN", "PAUL")
		  .expectComplete()
		  .verify();
		
		List<String> list = new ArrayList<>();
		
		// Assertions
		source.subscribe(s -> list.add(s));
		System.out.println(list);
		Assertions.assertThat(list).containsExactly("ERIC","MARK","ALEN","PAUL");
	}

 

Output

[ERIC, MARK, ALEN, PAUL]

 

4. Spring WebFlux 應用

      使用 Spring Webflux 構建 Uer CRUD

 

User.java

public class User {
    private Integer id;
    private String name;
    private Integer age;

    public User() {}

    public User(Integer id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

 // getter setter 略...
}

 

UserController.java

@RestController
@RequestMapping("/user")
public class UserController {
	
	// 模擬資料庫中的資料
    private Map<Integer, User> users = new ConcurrentHashMap<>();

    // 初始化資料
    public UserController() {
        users.put(1, new User(1, "Mark", 20));
        users.put(2, new User(2, "Paul", 19));
        users.put(3, new User(3, "Emily", 18));
    }


    // 查詢一個使用者
    @GetMapping("/findOne/{id}")
    public Mono<User> findOne(@PathVariable Integer id) {
        return Mono.justOrEmpty(users.get(id));
    }
    
    // 查詢全部使用者
    @GetMapping("/findAll")
    public Flux<User> findAll() {
        return Flux.fromIterable(users.values());
    }
    
    // 新增一個使用者
    @PostMapping("/save")
    public Flux<User> save(@RequestBody User user) {
        Mono<User> userMono = Mono.just(user);
        userMono.doOnNext(u -> {
            // 儲存一個使用者
        	int id = users.size() + 1;
            u.setId(id);
            users.put(id, u);
        }).subscribe();
        
        return Flux.fromIterable(users.values());
    }

    // 刪除一個使用者
    @PostMapping("/delete")
    public Flux<User> delete(@RequestBody User user) {
        Mono<User> userMono = Mono.just(user);
        userMono.doOnNext(u -> {
            // 刪除一個使用者
        	users.remove(u.getId());
        }).subscribe();
        
        return Flux.fromIterable(users.values());
    }

    // 更新一個使用者
    @PostMapping("/update")
    public Flux<User> update(@RequestBody User user) {
        Mono<User> userMono = Mono.just(user);
        userMono.doOnNext(u -> {
            // 更新一個使用者
        	 int id = u.getId();
             users.remove(id);
             users.put(id, u);
        }).subscribe();
        
        return Flux.fromIterable(users.values());
    }
}

 

Output

findOne

 

findAll

 

save

 

delete

 

update

 

七、           References

Reactor 3 Reference Guide
Reactive(3)5分鐘理解SpringBoot 響應式的核心-Reactor
響應式編程(Reactive Programming)介紹
許文 Richard Hsu