Redis 的發佈訂閱模型
楔子
Redis 雖然是一個緩存,但它也可以用作消息隊列。比如我們前面介紹 List 類型的時候說過,基於 LPUSH 和 BRPOP 可以實現一個簡易版的消息隊列,但它有兩個缺點:
-
不支持多個消費者:消費者拉取消息後,這條消息就從 List 中刪除了,無法被其它消費者再次消費,即不支持多個消費者消費同一批數據;
-
消息丟失:消費者拉取到消息後,如果發生異常宕機,那這條消息就丟失了;
而 Redis 提供的發佈訂閱模型,正好可以解決第一個問題:重複消費,即多個消費者消費同一批數據的場景。下面來看一下。
普通訂閱與發佈
消息隊列有兩個重要的角色,一個是發佈者(或者說生產者),另一個就是訂閱者(或者說消費者),對應的命令如下:
-
發佈消息:publish channel "message";
-
訂閱消息:subscribe channel;
除了生產者和消費者,還有一個重要的概念:channel(頻道),可以理解爲消息隊列的名稱。首先消費者先要訂閱某個 channel,然後當生產者把消息發送到 channel 當中時,消費者就可以正常接收到消息了,如下圖所示:
下面我們來看具體的命令實現。
訂閱消息
# 可以同時訂閱多個頻道
127.0.0.1:6379> subscribe channel1 channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2
注意:當我們訂閱某個頻道的時候,就阻塞在這裏了。
就類似於微信公衆號一樣,你關注了某個公衆號,那麼當公衆號上面發表文章的時候,你就可以收到。此時操作公衆號的人就是消息發佈者,你就是消息訂閱者,公衆號就是消息隊列,往公衆號上面發表的文章就是消息。
此外一個消費者可以同時訂閱多個 channel,正如一個微信用戶可以關注多個公衆號;一個 channel 也可以被多個消費者訂閱,正如一個公衆號可以被多個微信用戶關注。
發送消息
我們上面的訂閱者在訂閱之後,就處於阻塞狀態,因此我們需要再開一個終端。
127.0.0.1:6379> publish channel1 "satori"
(integer) 1
127.0.0.1:6379> publish channel2 "koishi"
(integer) 1
返回值表示成功發給了幾個訂閱者,所以這裏的 1 就表示成功發給了一個訂閱者,如果有兩個訂閱者,那麼返回值就是 2。因此返回值可以是 0~n,由訂閱者的數量決定。
然後我們來看看訂閱者:
127.0.0.1:6379> subscribe channel1 channel2
...
1) "message"
2) "channel1" # 從 channel1 收到消息
3) "satori"
1) "message"
2) "channel2" # 從 channel2 收到消息
3) "koishi"
成功收到消息,所以每個消費者可以同時訂閱多個 channel,並且每個 channel 也可以被多個消費者訂閱。
因此使用 Pub/Sub 這種方案,既支持阻塞式拉取消息,還很好地滿足了多組消費者消費同一批數據的業務需求。除此之外,Pub/Sub 還提供了「主題訂閱」模式,允許消費者根據一定規則,訂閱「多個」自己感興趣的隊列。
主題訂閱
主題訂閱說白了,和模糊匹配是類似的。假設我們需要訂閱好幾個隊列,但它們都是以 log 開頭的,那麼我們就可以通過 psubscribe log* 來自動訂閱所有以 log 開頭的隊列。
比如上面的 channel1、channel2,我們就可以通過 psubscribe channel* 實現,至於消息發佈者則不需要變。
當然主題訂閱也可以是多個,比如:psubscribe log* db*,訂閱所有以 log 開頭、db 開頭的消息隊列。
取消訂閱
既然有訂閱,那麼就有取消訂閱,就類似於取關 (o(╥﹏╥)o)。
使用 unsubscribe channel1 channel2 可以取消訂閱多個 channel,如果是主題訂閱,那麼也可以通過 punsubscribe ch* 取消訂閱指定的主題。比較簡單,不再贅述。
Python 操作 Redis 的發佈訂閱
訂閱者(生產者):
# 訂閱者
import redis
client = redis.Redis(host="...",
decode_responses="utf-8")
# 調用 pubsub 方法返回一個訂閱者
sub = client.pubsub()
# 訂閱兩個隊列
sub.subscribe("ch1", "ch2")
# 監聽,此時處於阻塞狀態
for item in sub.listen():
# 一旦發佈者發佈消息,這裏就可以接收到
# item["channel"]是隊列,item["data"]是內容
print(item["channel"], item["data"])
發佈者(消費者):
# 訂閱者
import redis
client = redis.Redis(host="...",
decode_responses="utf-8")
# 發佈者很簡單,直接發佈消息接口
client.publish("ch1", "7ki7ki棒棒1")
client.publish("ch1", "7ki7ki棒棒2")
client.publish("ch2", "7ki7ki棒棒3")
當執行發佈者的時候,會發現訂閱者多了幾條輸出,至於內容顯然是發佈者發佈的內容。
另外 Python 操作訂閱者還有幾種方式。
# 訂閱者
import redis
client = redis.Redis(host="...",
decode_responses="utf-8")
sub = client.pubsub()
sub.subscribe("ch1", "ch2")
while True:
# 這種方式會瞬間返回,如果有消息得到消息
# 沒有消息會返回None
item = sub.get_message()
if item:
print(item["channel"], item["data"])
顯然該方法有一個缺陷,就是會造成 CPU 空轉,因此我們不推薦使用該方法。
也可以開啓一個新的線程去監聽。
# 訂閱者
import redis
client = redis.Redis(host="...",
decode_responses="utf-8")
sub = client.pubsub()
sub.subscribe("ch1", "ch2")
def handler(item):
print(item["channel"], item["data"])
# 給每一個頻道註冊一個處理函數
# 當頻道有消息時,會自動將消息傳遞給處理函數
sub.channels.update({"ch1": handler, "ch2": handler})
# 開啓一個線程運行,會返回新開啓的線程對象
# 注意:因爲是單獨開了一個線程,所以這裏不會阻塞
th = sub.run_in_thread()
print("xxx")
print("yyy")
print("zzz")
# 先啓動訂閱者,再啓動發佈者,程序輸出如下
"""
xxx
yyy
zzz
ch1 7ki7ki棒棒1
ch1 7ki7ki棒棒2
ch2 7ki7ki棒棒3
"""
# 注意:這裏程序依舊會卡住,因爲開啓的線程是非守護線程
# 所以即便主線程執行完畢,也依舊會等待子線程
# 如果不想主線程等待,解決的辦法有兩種:
# 一種是在run_in_thread中加上一個參數daemon=True
# 將其設置爲守護線程,這樣主線程就不會等待了
# 另一種是手動停止,因爲sub.run_in_thread會返回新開啓的線程
# 調用其stop方法即可讓它停止,通過這種方式,可以在任意時刻停止監聽
# th.stop()
還有主題訂閱,發佈者代碼依舊不用變,只需要將訂閱者的 sub.subscribe 換成 sub.psubscribe 即可。
# 訂閱者
sub = client.pubsub()
# 訂閱以 ch 開頭、log 開頭的隊列
sub.psubscribe("ch*", "log*")
如果是開啓新的線程的話:
# 訂閱者
import redis
client = redis.Redis(host="...",
decode_responses="utf-8")
sub = client.pubsub()
sub.psubscribe("ch*", "log*")
def handler(item):
print(item["channel"], item["data"])
# 對於開啓新的線程去監聽
# 要將之前的 sub.channels 換成 sub.patterns
sub.patterns.update({"ch*": handler,
"log*": handler})
sub.run_in_thread()
以上就是 Python 操作 Redis 發佈訂閱相關的內容。
發佈訂閱作爲消息隊列有什麼缺點?
Pub/Sub 最大的優勢就是,支持多組生產者、消費者處理消息。但除了這一個優點之外,剩下的都是缺點了。
1)發佈訂閱模式是 "發後既忘" 的工作模式,如果訂閱者離線,那麼重連之後不能消費之前的歷史消息。
因爲 Pub/Sub 的實現原理非常簡單,它沒有基於任何數據類型,也沒有做任何的數據存儲,只是單純地爲生產者、消費者建立「數據轉發通道」,把符合規則的數據,從一端轉發到另一端。
在 Redis 中,一個完整的發佈、訂閱消息處理流程是這樣的:
-
1)消費者訂閱指定隊列,Redis 就會記錄一個映射關係:隊列 -> 消費者;
-
2)生產者向這個隊列發佈消息,然後 Redis 就會從映射關係中找出對應的消費者,把消息轉發給它;
整個過程沒有任何的數據存儲,一切都是實時轉發的。我們上面在發完消息後返回的 1 就是訂閱當前隊列的消費者個數,或者說 Redis 將消息轉發給了多少個消費者。
因此,如果一個消費者異常掛掉了,它再重新上線後,只能接收新的消息,在下線期間生產者發佈的消息是接收不到的。如果所有消費者都下線了,那生產者發佈的消息,因爲找不到任何一個消費者,也會直接「丟棄」。
所以在使用 Pub/Sub 時,一定要注意:消費者必須先訂閱隊列,生產者才能發佈消息,否則消息會丟失。
2)無法持久化保存消息,如果 Redis 服務器宕機或重啓,那麼所有的消息將會丟失
因爲 Pub/Sub 沒有基於任何數據類型實現,所以它不具備「數據持久化」的能力,也就是說,Pub/Sub 的相關操作,不會寫入到 RDB 和 AOF 中,當 Redis 宕機重啓,Pub/Sub 的數據也會全部丟失。
3)積壓的消息不能太多,否則也會丟數據
當消費者的速度,跟不上生產者時,就會導致數據積壓的情況發生。如果採用 List 作爲隊列,消息積壓時會導致底層的鏈表很長,最直接的影響就是 Redis 內存會持續增長,直到消費者把所有數據都從鏈表中取出。
但 Pub/Sub 的處理方式卻不一樣,當消息積壓時,有可能會導致消費失敗和消息丟失。至於原因,還是和 Pub/Sub 的實現原理有關。
每個消費者訂閱一個隊列時,Redis 都會在 Server 端爲該消費者分配一個「緩衝區」,這個緩衝區其實就是一塊內存。當生產者發佈消息時,Redis 先把消息寫到對應消費者的緩衝區中,之後消費者不斷地從緩衝區讀取消息,處理消息。
但問題就出在這個緩衝區上,因爲這個緩衝區的大小是有上限的(可配置),如果消費者拉取消息很慢,就會造成生產者發佈到緩衝區的消息開始積壓,緩衝區內存持續增長。如果超過了緩衝區配置的上限,Redis 就會「強制」把這個消費者踢下線。這時消費者就會消費失敗,也會丟失數據。
Redis 的配置文件裏有這麼這一個配置:client-output-buffer-limit pubsub 32mb 8mb 60,它的參數含義如下:
-
32mb:緩衝區一旦超過 32MB,Redis 直接強制把消費者踢下線;
-
8mb 60:緩衝區超過 8MB,並且持續 60 秒,Redis 也會把消費者踢下線;
所以 Pub/Sub 作爲消息隊列和 List 之間的差異比較大的,並且從推拉模型不難看出,List 是屬於「拉」模型,而 Pub/Sub 屬於「推」模型。List 中的數據可以一直積壓在內存中,消費者什麼時候來「拉」都可以。但 Pub/Sub 是把消息先「推」到消費者在 Redis Server 上的緩衝區中,然後等消費者再來取。
但當消費者跟不上生產者的速度時,就會導致緩衝區的內存開始膨脹,而 Redis 爲了控制緩衝區的上限,就會把消費者踢下線。
總結一下 Pub/Sub 模型的優缺點:
除了第一個是優點之外,剩下的都是缺點,所以很多人都覺得 Pub/Sub 很雞肋。也正是以上原因,Pub/Sub 在實際的應用場景中用得並不多。
目前只有哨兵集羣和 Redis 實例通信時,採用了 Pub/Sub 的方案,因爲哨兵正好符合即時通訊的業務場景。
最後還有一點沒有說,就是 Pub/Sub 和 List 一樣,數據一旦取走,就會從 Redis 當中刪除,無法重複消費。而一個成熟的消息隊列,應該具備如下功能:
-
支持阻塞等待拉取消息;
-
支持發佈 / 訂閱模式;
-
消費者下線之後重新上線,仍能消費下線期間生產者發送的消息;
-
消費者消費失敗,可重新消費,也就是支持消息被同一個消費者消費多次;
-
實例宕機,消息不丟失,數據可持久化;
-
即使消息大量堆積,也不會丟數據;
顯然 Pub/Sub 只能滿足前兩個要求。
於是 Redis 在 5.0 的時候引入 Stream 類型,將 Pub/Sub 的問題全部解決,而 Stream 相關的內容後續詳細介紹。
本文參考自:
- 水滴與銀彈:《把 Redis 當作隊列來用,真的合適嗎?》
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/YDorGOzjg23eHpA_wafykg