#Redis #Java #觀察者模式 #緩存資料庫

在Java中實現Redis的pub/Sub(訂閱與發布)

黃郁婷 Bernice Huang 2021/06/16 20:01:13
3316

♦簡述What’s Redis :

Redis是一個使用ANSI C編寫的開源、支援網路、基於記憶體、分散式、可選永續性的鍵值對儲存資料庫也是一種緩存資料庫。從2015年6月開始,Redis的開發由Redis Labs贊助,而2013年5月至2015年6月期間,其開發由Pivotal贊助。在2013年5月之前,其開發由VMware贊助。根據月度排行網站DB-Engines.com的資料,Redis是最流行的鍵值對(Key-Value)儲存資料庫。

 

♦那What’s pub/sub in Redis?

Pub/Sub功能(means Publish, Subscribe)即發布與訂閱功能。它採用事件作為基本的通信機制,提供大規模系統所要求的鬆散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發布者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。熟悉設計模式的朋友應該了解這與23種設計模式中的觀察者模式(備註1)極為相似。所以同理,Redis的pub/sub是一種消息通信模式,主要的目的是解除發布者和訂閱者之間的耦合, Redis作為一個pub/sub的server, 在訂閱者和發布者之間起到了消息路由的功能。

 

我覺得經過上述的專業解釋還是很難理解,用比較白話的景場來解釋比較易懂 :

 

場景1

假如我是個有一千萬粉絲訂閱的網紅,每天我熱情的粉絲們都在等待我的發文,那今天我發了一篇新文章,那我就可以推送消息告訴我可愛的粉絲們我有發表新文章喔!

場景2

 

假如我是擁有三個站台的管理者,而我是管理者想同時在三站台發布最新消息時,那可以創建一個Channel(頻道)讓三個站台成為訂閱者,發布者(管理者)和訂閱者都是Redis客戶端,Channel(頻道)則為Redis伺服器端,發布者(管理者)將消息發送到該創建的頻道,訂閱了這個頻道的訂閱者就能接收到這條消息。


♦如何實作應用在Java :

1.先建立一個專案,再加入Redis相關的依賴

<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
</dependency>

2.建立一個Publisher (發布者) class

/**
* @author Bernice
*/
public class Publisher extends Thread {
   private final JedisPool jedisPool;

   public Publisher(JedisPool jedisPool) {
       this.jedisPool = jedisPool;
   }

   @Override
   public void run() {
       BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
       Jedis jedis = jedisPool.getResource();   //連接池中取出1連線
       while (true) {
           String line;
           try {
               line = reader.readLine();
               if (!"quit".equals(line)) {
                   jedis.publish("mychannel", line);   //從通過mychannel 頻道發布消息
                   System.out.println(String.format("發布消息成功!channel: %s, message: %s", "mychannel", line));
               } else {
                   break;
               }
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
}

3.再建立一個Subscribe(訂閱者) class,這裡訂閱者需要繼承JedisPubSub,來override它的三個方法。用途註解上已經寫了,淺顯易懂。

/**
* @author Bernice
*/
public class Subscriber extends JedisPubSub {

   public Subscriber() {
   }

   @Override
   public void onMessage(String channel, String message) {       //收到消息會調用
       System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
       this.unsubscribe();
   }

   @Override
   public void onSubscribe(String channel, int subscribedChannels) {    //訂閱頻道會調用
       System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
           channel, subscribedChannels));
   }

   @Override
   public void onUnsubscribe(String channel, int subscribedChannels) {   //取消訂閱會調用
       System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
           channel, subscribedChannels));

   }
}

4.我們這裡只是定義了一個訂閱者,下面去訂閱頻道

/**
* @author Bernice
*/
public class SubThread extends Thread {
   private final JedisPool jedisPool;
   private final Subscriber subscriber = new Subscriber();

   private final String channel = "mychannel";

   public SubThread(JedisPool jedisPool) {
       super("Subscriber");
       this.jedisPool = jedisPool;
   }

   @Override
   public void run() {
       // 注意:subscribe是一個阻塞的方法,在取消訂閱該頻道前,thread會一直阻塞在這,無法執行會後續的Code
       System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));

       Jedis jedis = null;

       try {
           jedis = jedisPool.getResource();   /* 取出一个連線*/
           jedis.subscribe(subscriber, channel);    //通過subscribe 的api去訂閱,傳入參數為訂閱者和頻道名
       } catch (Exception e) {
           System.out.println(String.format("subscribe channel error, %s", e));
       } finally {
           if (jedis != null) {
               jedis.close();
           }
       }
   }
}

5.最後,再寫一個main方法去跑一下。鍵盤輸入消息,訂閱者就會觸發onMessage方法

/**
 * @author Bernice
 */
public class TestMainSub {
    public static void main(String[] args) {
        // 連接redis服務端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 6379));

        Publisher publisher = new Publisher(jedisPool);    //發布者
        publisher.start();

        SubThread subThread = new SubThread(jedisPool);    //訂閱者
        subThread.start();
    }
}

6.那看一下執行結果 console

♦小結 :

這裡是簡單的講解Redis的基本訂閱/發布,它還有多種變化可以依照需要的專案需求變化,比如SSM集成RabbitMQ 訂閱模式。總之,Redis 發布訂閱功能足夠簡單,如果專案沒有過多的要求,且不想搭建 Kafka、RabbitMQ 這樣的可靠型消息系統時,可以考慮嘗試使用 。

 

備註1 : Redis通過PUBLISH、SUBSCRIBE等命令實現了訂閱與發布模式,基本功能就是通過頻道將消息發送到訂閱了該頻道的所有客戶端去。每一個客戶端可以把他看成一個觀察者,頻道則是被觀察者。當該頻道內接收到一個新消息時,所有訂閱該頻道的客戶端都會收到這條新消息。

 

參考:

https://en.wikipedia.org/wiki/Redis、

https://redis.io/topics/introduction、

https://redis.io/topics/pubsub、

https://redislabs.com/

 

黃郁婷 Bernice Huang