Redis 的發佈訂閱模型

楔子

Redis 雖然是一個緩存,但它也可以用作消息隊列。比如我們前面介紹 List 類型的時候說過,基於 LPUSH 和 BRPOP 可以實現一個簡易版的消息隊列,但它有兩個缺點:

而 Redis 提供的發佈訂閱模型,正好可以解決第一個問題:重複消費,即多個消費者消費同一批數據的場景。下面來看一下。

普通訂閱與發佈

消息隊列有兩個重要的角色,一個是發佈者(或者說生產者),另一個就是訂閱者(或者說消費者),對應的命令如下:

除了生產者和消費者,還有一個重要的概念:channel(頻道),可以理解爲消息隊列的名稱。首先消費者先要訂閱某個 channel,然後當生產者把消息發送到 channel 當中時,消費者就可以正常接收到消息了,如下圖所示:

下面我們來看具體的命令實現。

訂閱消息

# 可以同時訂閱多個頻道
127.0.0.1:6379> subscribe channel1 channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2

注意:當我們訂閱某個頻道的時候,就阻塞在這裏了。

就類似於微信公衆號一樣,你關注了某個公衆號,那麼當公衆號上面發表文章的時候,你就可以收到。此時操作公衆號的人就是消息發佈者,你就是消息訂閱者,公衆號就是消息隊列,往公衆號上面發表的文章就是消息。

此外一個消費者可以同時訂閱多個 channel,正如一個微信用戶可以關注多個公衆號;一個 channel 也可以被多個消費者訂閱,正如一個公衆號可以被多個微信用戶關注。

發送消息

我們上面的訂閱者在訂閱之後,就處於阻塞狀態,因此我們需要再開一個終端。

127.0.0.1:6379> publish channel1 "satori"
(integer) 1
127.0.0.1:6379> publish channel2 "koishi"
(integer) 1

返回值表示成功發給了幾個訂閱者,所以這裏的 1 就表示成功發給了一個訂閱者,如果有兩個訂閱者,那麼返回值就是 2。因此返回值可以是 0~n,由訂閱者的數量決定。

然後我們來看看訂閱者:

127.0.0.1:6379> subscribe channel1 channel2
...
1) "message"
2) "channel1" # 從 channel1 收到消息
3) "satori"
1) "message"
2) "channel2" # 從 channel2 收到消息
3) "koishi"

成功收到消息,所以每個消費者可以同時訂閱多個 channel,並且每個 channel 也可以被多個消費者訂閱。

因此使用 Pub/Sub 這種方案,既支持阻塞式拉取消息,還很好地滿足了多組消費者消費同一批數據的業務需求。除此之外,Pub/Sub 還提供了「主題訂閱」模式,允許消費者根據一定規則,訂閱「多個」自己感興趣的隊列。

主題訂閱

主題訂閱說白了,和模糊匹配是類似的。假設我們需要訂閱好幾個隊列,但它們都是以 log 開頭的,那麼我們就可以通過 psubscribe log* 來自動訂閱所有以 log 開頭的隊列。

比如上面的 channel1、channel2,我們就可以通過 psubscribe channel* 實現,至於消息發佈者則不需要變。

當然主題訂閱也可以是多個,比如:psubscribe log* db*,訂閱所有以 log 開頭、db 開頭的消息隊列。

取消訂閱

既然有訂閱,那麼就有取消訂閱,就類似於取關 (o(╥﹏╥)o)。

使用 unsubscribe channel1 channel2 可以取消訂閱多個 channel,如果是主題訂閱,那麼也可以通過 punsubscribe ch* 取消訂閱指定的主題。比較簡單,不再贅述。

Python 操作 Redis 的發佈訂閱

訂閱者(生產者):

# 訂閱者
import redis

client = redis.Redis(host="...",
                     decode_responses="utf-8")

# 調用 pubsub 方法返回一個訂閱者
sub = client.pubsub()
# 訂閱兩個隊列
sub.subscribe("ch1", "ch2")
# 監聽,此時處於阻塞狀態
for item in sub.listen():
    # 一旦發佈者發佈消息,這裏就可以接收到
    # item["channel"]是隊列,item["data"]是內容
    print(item["channel"], item["data"])

發佈者(消費者):

# 訂閱者
import redis

client = redis.Redis(host="...",
                     decode_responses="utf-8")

# 發佈者很簡單,直接發佈消息接口
client.publish("ch1", "7ki7ki棒棒1")
client.publish("ch1", "7ki7ki棒棒2")
client.publish("ch2", "7ki7ki棒棒3")

當執行發佈者的時候,會發現訂閱者多了幾條輸出,至於內容顯然是發佈者發佈的內容。

另外 Python 操作訂閱者還有幾種方式。

# 訂閱者
import redis

client = redis.Redis(host="...",
                     decode_responses="utf-8")

sub = client.pubsub()
sub.subscribe("ch1", "ch2")

while True:
    # 這種方式會瞬間返回,如果有消息得到消息
    # 沒有消息會返回None
    item = sub.get_message()
    if item:
        print(item["channel"], item["data"])

顯然該方法有一個缺陷,就是會造成 CPU 空轉,因此我們不推薦使用該方法。

也可以開啓一個新的線程去監聽。

# 訂閱者
import redis

client = redis.Redis(host="...",
                     decode_responses="utf-8")

sub = client.pubsub()
sub.subscribe("ch1", "ch2")

def handler(item):
    print(item["channel"], item["data"])

# 給每一個頻道註冊一個處理函數
# 當頻道有消息時,會自動將消息傳遞給處理函數
sub.channels.update({"ch1": handler, "ch2": handler})
# 開啓一個線程運行,會返回新開啓的線程對象
# 注意:因爲是單獨開了一個線程,所以這裏不會阻塞
th = sub.run_in_thread()
print("xxx")
print("yyy")
print("zzz")

# 先啓動訂閱者,再啓動發佈者,程序輸出如下
"""
xxx
yyy
zzz
ch1 7ki7ki棒棒1
ch1 7ki7ki棒棒2
ch2 7ki7ki棒棒3
"""

# 注意:這裏程序依舊會卡住,因爲開啓的線程是非守護線程
# 所以即便主線程執行完畢,也依舊會等待子線程
# 如果不想主線程等待,解決的辦法有兩種:
# 一種是在run_in_thread中加上一個參數daemon=True
# 將其設置爲守護線程,這樣主線程就不會等待了
# 另一種是手動停止,因爲sub.run_in_thread會返回新開啓的線程
# 調用其stop方法即可讓它停止,通過這種方式,可以在任意時刻停止監聽
# th.stop()

還有主題訂閱,發佈者代碼依舊不用變,只需要將訂閱者的 sub.subscribe 換成 sub.psubscribe 即可。

# 訂閱者
sub = client.pubsub()
# 訂閱以 ch 開頭、log 開頭的隊列
sub.psubscribe("ch*", "log*")

如果是開啓新的線程的話:

# 訂閱者
import redis

client = redis.Redis(host="...",
                     decode_responses="utf-8")

sub = client.pubsub()
sub.psubscribe("ch*", "log*")

def handler(item):
    print(item["channel"], item["data"])

# 對於開啓新的線程去監聽
# 要將之前的 sub.channels 換成 sub.patterns
sub.patterns.update({"ch*": handler, 
                     "log*": handler})
sub.run_in_thread()

以上就是 Python 操作 Redis 發佈訂閱相關的內容。

發佈訂閱作爲消息隊列有什麼缺點?

Pub/Sub 最大的優勢就是,支持多組生產者、消費者處理消息。但除了這一個優點之外,剩下的都是缺點了。

1)發佈訂閱模式是 "發後既忘" 的工作模式,如果訂閱者離線,那麼重連之後不能消費之前的歷史消息。

因爲 Pub/Sub 的實現原理非常簡單,它沒有基於任何數據類型,也沒有做任何的數據存儲,只是單純地爲生產者、消費者建立「數據轉發通道」,把符合規則的數據,從一端轉發到另一端。

在 Redis 中,一個完整的發佈、訂閱消息處理流程是這樣的:

整個過程沒有任何的數據存儲,一切都是實時轉發的。我們上面在發完消息後返回的 1 就是訂閱當前隊列的消費者個數,或者說 Redis 將消息轉發給了多少個消費者。

因此,如果一個消費者異常掛掉了,它再重新上線後,只能接收新的消息,在下線期間生產者發佈的消息是接收不到的。如果所有消費者都下線了,那生產者發佈的消息,因爲找不到任何一個消費者,也會直接「丟棄」。

所以在使用 Pub/Sub 時,一定要注意:消費者必須先訂閱隊列,生產者才能發佈消息,否則消息會丟失。

2)無法持久化保存消息,如果 Redis 服務器宕機或重啓,那麼所有的消息將會丟失

因爲 Pub/Sub 沒有基於任何數據類型實現,所以它不具備「數據持久化」的能力,也就是說,Pub/Sub 的相關操作,不會寫入到 RDB 和 AOF 中,當 Redis 宕機重啓,Pub/Sub 的數據也會全部丟失。

3)積壓的消息不能太多,否則也會丟數據

當消費者的速度,跟不上生產者時,就會導致數據積壓的情況發生。如果採用 List 作爲隊列,消息積壓時會導致底層的鏈表很長,最直接的影響就是 Redis 內存會持續增長,直到消費者把所有數據都從鏈表中取出。

但 Pub/Sub 的處理方式卻不一樣,當消息積壓時,有可能會導致消費失敗和消息丟失。至於原因,還是和 Pub/Sub 的實現原理有關。

每個消費者訂閱一個隊列時,Redis 都會在 Server 端爲該消費者分配一個「緩衝區」,這個緩衝區其實就是一塊內存。當生產者發佈消息時,Redis 先把消息寫到對應消費者的緩衝區中,之後消費者不斷地從緩衝區讀取消息,處理消息。

但問題就出在這個緩衝區上,因爲這個緩衝區的大小是有上限的(可配置),如果消費者拉取消息很慢,就會造成生產者發佈到緩衝區的消息開始積壓,緩衝區內存持續增長。如果超過了緩衝區配置的上限,Redis 就會「強制」把這個消費者踢下線。這時消費者就會消費失敗,也會丟失數據。

Redis 的配置文件裏有這麼這一個配置:client-output-buffer-limit pubsub 32mb 8mb 60,它的參數含義如下:

所以 Pub/Sub 作爲消息隊列和 List 之間的差異比較大的,並且從推拉模型不難看出,List 是屬於「拉」模型,而 Pub/Sub 屬於「推」模型。List 中的數據可以一直積壓在內存中,消費者什麼時候來「拉」都可以。但 Pub/Sub 是把消息先「推」到消費者在 Redis Server 上的緩衝區中,然後等消費者再來取。

但當消費者跟不上生產者的速度時,就會導致緩衝區的內存開始膨脹,而 Redis 爲了控制緩衝區的上限,就會把消費者踢下線。

總結一下 Pub/Sub 模型的優缺點:

除了第一個是優點之外,剩下的都是缺點,所以很多人都覺得 Pub/Sub 很雞肋。也正是以上原因,Pub/Sub 在實際的應用場景中用得並不多。

目前只有哨兵集羣和 Redis 實例通信時,採用了 Pub/Sub 的方案,因爲哨兵正好符合即時通訊的業務場景。

最後還有一點沒有說,就是 Pub/Sub 和 List 一樣,數據一旦取走,就會從 Redis 當中刪除,無法重複消費。而一個成熟的消息隊列,應該具備如下功能:

顯然 Pub/Sub 只能滿足前兩個要求。

於是 Redis 在 5.0 的時候引入 Stream 類型,將 Pub/Sub 的問題全部解決,而 Stream 相關的內容後續詳細介紹。

本文參考自:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YDorGOzjg23eHpA_wafykg