Redis 多線程架構的演進

Redis 真的是單線程嗎?網上有很多關於這個問題的討論,得出的結論也幾乎是一致的。本文在討論這個問題之前,先定義好問題中 “單線程” 的概念邊界:

對於邊界 1,那麼答案是肯定的,在 Redis v6.0 版本以前,Redis 的網絡模型一直都是單線程模式的,即使到了 v6.0 版本,所有客戶端命令的執行依然是在主線程上完成的;對於邊界 2,答案是否定的,Redis 從發佈之出就有兩個BIO(background I/O service)線程來異步處理 aof 持久化、關閉文件的任務,在 Redis v4.0 版本中又添加了一個 BIO 線程,將比較耗時的命令異步化,到了 Redis v6.0,Redis 核心的網絡模型也被改造成了多線程。

在概念邊界 2 的限制下,我們可以得出結論:Redis 從來都不是單線程工作的!在 Redis 發佈近十年來,在系統架構演進過程中都遇到了哪些問題?作者 antirez 對這些問題是怎樣思考的?採取了什麼樣的方案來改進?探索這些問題對於開發者的成長很有價值,也是本文的寫作目的,筆者會結合相關源碼與讀者共同解答這些問題。

1. Redis 基礎架構設計

性能優異的服務離不開好的架構設計,Redis 使用 I/O multiplexing 實現了單線程接收海量客戶端請求;通過單線程Reactor模型實現了高性能的事件處理;基於條件變量實現的生產者消費者模型構建了自己的BIO系統。本節先簡單介紹一下這些從 Redis 誕生以來就一直使用的基礎架構設計。

1.1 Redis 對 I/O multiplexing 的封裝

I/O multiplexing 指的是多個網絡 socket I/O 複用同一個線程,它解決了 C10K 的問題。Redis 將不同的 I/O 多路複用函數封裝成相同的 API 提供給上層使用,僅僅以單線程處理網絡 I/O,就可以爲成千上萬的客戶端提供服務。

Redis 的 I/O 多路複用模塊提供的 API:

//下面的方法不同版本的redis在src目錄下的ae_epoll.c、ae_evport.c、ae_kqueue.c、ae_select.c代碼文件中都有實現
static int aeApiCreate(aeEventLoop *eventLoop)
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
static void aeApiFree(aeEventLoop *eventLoop)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

Redis 可以在多個平臺上運行,所以會通過宏定義,根據編譯平臺的不同,選擇不同的 I/O 多路複用函數作爲子模塊,提供給上層接口做封裝。

/*下面代碼在Redis不同版本的ae.c源碼文件中均包含
 *Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

Redis 會優先選擇時間複雜度爲 𝑂(1) 的 I/O 多路複用函數作爲底層實現,包括 Solaries 10 中的 evport、Linux 中的 epoll 和 macOS/FreeBSD 中的 kqueue。select 函數是作爲 POSIX 標準中的系統調用,在不同版本的操作系統上都會實現,所以將其作爲保底方案。

1.2 單線程事件循環 —— Reactor 網絡模型

在服務端架構設計中,Reactor 是一種基於事件驅動的設計模式。目前 Linux 平臺上主流的高性能網絡庫 / 框架中,大都採用這種設計模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python) 等。

Reactor 模式本質上指的是使用 I/O 多路複用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O) 的模式。通常設置一個主線程負責做 event-loop 事件循環和 I/O 讀寫,通過 select/poll/epoll_wait 等系統調用監聽 I/O 事件,業務邏輯提交給其他工作線程去做。而所謂『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系統調用上,這樣可以最大限度的複用 event-loop 線程,讓一個線程能服務於多個 sockets。在 Reactor 模式中,I/O 線程只能阻塞在 I/O multiplexing 函數上(select/poll/epoll_wait)。單線程的 Reactor 網絡模型是這樣的:

Redis 在 6.0 版本之前的單事件循環模型,實際上就是一個非常經典的 Reactor 模型,看下圖:

Redis 的單線程 Reactor 大體是這樣工作的:“I/O 多路複用模塊”會監聽多個 FD ,當這些 FD 產生,accept,read,write 或 close 的文件事件。會向 “文件事件分發器(dispatcher)” 傳送事件。文件事件分發器(dispatcher)在收到事件之後,會根據事件的類型將事件分發給對應的 handler。後邊的章節我們會對這一過程再展開詳述。

1.3 基於條件變量實現的生產者消費者模型

我們前邊提到,Redis 從發版之出就自帶 BIO 線程,主要用來處理一些比較耗時的後臺任務。redis 使用的是一個基於條件變量實現生產者消費者模型來設計 BIO 系統。Redis 使用 C 語言開發,我們以 Linux C 爲例子,先來介紹一下鎖與共享變量的使用方法,再總結一下這一模型的設計。

1.3.1 鎖

多線程情況下, 鎖的使用主要涉及以下 5 個函數, 它們都包含在 pthread.h 頭文件中。

其中鎖變量類型爲 pthread_mutex_t,鎖的使用包含三個步驟: 鎖的初始化、加鎖、以及釋放鎖。

1.3.2 鎖初始化

pthread_mutex_init 該函數用於鎖的初始化,要使用鎖,首先需要聲明一個 pthread_mutex_t 變量, 然後用該函數進行初始化,如下:

pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);

初始化的時候,第二個參數可以用於設置鎖的性質。經過這一步, 我們完成了鎖的初始化,在第二個參數設置 NULL 的時候,一個線程加鎖,另外一個線程再執行加鎖操作就會阻塞,直到另外的線程釋放鎖。

1.3.3 加鎖、釋放鎖與資源回收

pthread_mutex_lock 與 pthread_mutex_trylock 這兩個函數可以用於加鎖。這兩個函數都完成了加鎖的功能,在獲得了變量初始化後的 mutex 以後,直接調用函數即可完成加鎖功能。其中第一個函數在另外一個線程已經獲得鎖的情況下會一直阻塞,而第二個函數則會直接返回,不會阻塞。

pthread_mutex_unlock 函數可以用於釋放鎖。

pthread_mutex_destroy 函數用於釋放資源,在使用 pthread_mutex_init 函數進行鎖初始化的情況下,使用結束以後,需要使用該函數釋放資源。

1.3.4 共享變量

共享變量應用於這樣一種場景:一個線程先對某一條件進行判斷,如果條件不滿足則進入等待,條件滿足的時候,該線程被通知條件滿足,繼續執行任務。共享變量涉及的函數有如下 6 個:

1.3.5 共享變量的初始化與條件等待

共享變量使用函數 pthread_cond_init 初始化,要使用條件變量,首先要聲明一個 pthread_cond_t 變量,然後用該函數進行初始化。第二個參數使用 NULL,可以進行具體的參數設置。初始化完成後,需要通過 pthread_cond_wait 或 pthread_cond_timedwait 等待條件成立;在條件不滿足的時候,函數進入等待;當條件滿足的時候,該函數會停止等待,繼續執行。該函數的第二個參數是 pthread_mutex_t 類型,這是因爲在條件判斷的時候,需要先進行加鎖來防止出現錯誤,在執行改函數前需要主動對這個變量執行加鎖操作,進入這個函數以後,其內部會對 mutex 進行解鎖操作;而函數執行完以後 (也就是停止阻塞以後),又會重新加鎖,具體原因在介紹完本組函數以後進行說明,其中第二個函數可以指定等待的時間,而不是一直在阻塞。

1.3.6 條件滿足通知

上面說到,在條件不滿足的時候,一個線程會調用 pthread_cond_wait 函數阻塞等待;而此時如果其他線程檢查到條件滿足,則可以調用 pthread_cond_signal、pthread_cond_broadcast,讓處於等待狀態的線程重新開始執行。當有多個線程在等待的時候,則可以調用 pthread_cond_signal 函數會喚醒其中一個線程,而 pthread_cond_broadcast 函數會喚醒所有的等待的線程。

1.3.7 實現生產者消費者模型的正確方法

上邊我們介紹完了鎖與共享變量的基本函數,接下來介紹這兩套函數配合使用的一個常見場景:有兩個線程,其中一個線程先對一個條件進行檢查,這個檢查動作需要先加鎖。如果條件成立則執行操作,否則阻塞等待,直到條件成立這個線程纔會被通知繼續執行;另一個線程先做加鎖處理,然後置條件爲真,並通知其他等待的線程條件已經滿足,可以繼續執行。

上面說在檢查共享變量的時候要加鎖,其原因通過以下僞代碼來說明。

第一種情況:

線程1
pthread_mutex_lock(&mutex); 
while (condition == FALSE) {
    pthread_cond_wait(&cond, &mutex); 
}
pthread_mutex_unlock(&mutex);
線程2
condition = TRUE;
pthread_cond_signal(&cond);

可以看到線程 1 先檢查一個條件是否成立,在不成立的情況下就調用 wait 函數進行等待,而在這之前,先對這步過程進行了加鎖操作。線程 2 則是把條件設置爲 true(假設其通過某種方式知道了這個時候該條件應當爲 true),然後用 pthread_cond_signal 函數通知線程 1 停止阻塞繼續執行。上面的程序在多個線程併發執行的時候有如下的問題:如果線程 1 先判斷,發現條件不滿足,準備進入等待,在這個時候線程 2 中條件被置爲真,且發送通知,然後線程 1 才阻塞等待,這樣的話線程 1 錯過了一次通知,導致其在條件滿足的情況下依然在阻塞等待。

        線程1                               線程2
pthread_mutex_lock(&mutex);
while (condition == FALSE)
                                      condition = TRUE;
                                      pthread_cond_signal(&cond);
pthread_cond_wait(&cond, &mutex);

爲了解決上面說的問題,對程序進行了如下的改進。通過線程 2 的加鎖操作,避免了這樣的問題。這也解釋了爲什麼 pthread_cond_wait 函數在進入以後要進行解鎖操作,如果起不解鎖,那麼線程 2 在進行條件置爲 true 的操作就沒有辦法執行,因爲線程 1 在進入等待之前已經對這個變量加鎖了,這樣線程 1 會一直等待,而線程 2 也會等待,導致死鎖。

線程1                                       線程2
pthread_mutex_lock(&mutex);           pthread_mutex_lock(&mutex);
while (condition == FALSE) {          condition = TRUE;
 pthread_cond_wait(&cond, &mutex);    pthread_cond_signal(&cond);
}                                     pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&mutex);

因爲 wait 重新執行的時候需要再次加鎖,所以上面的 pthread_cond_signal 調用以後,必須釋放鎖才能夠完成 wait。另外,也可以先解鎖,然後調用 pthread_cond_signal,這兩種寫法都是正確的。雖然共享變量的訪問一般需要加鎖,但在這個場景下不加鎖造成的競爭不會產生錯誤,只是會造成線程調度效率上的問題,所以也可以這麼寫,但是一般推薦標準的寫法。

對於條件變量,其基本的使用場景是,某些線程對條件進行判斷,如果不滿足條件,就進入等待狀態。在進行條件判斷之前,先進行加鎖操作,另外一些線程則是負責對條件賦值爲真,然後通知等待的線程繼續執行,線程被喚醒後,繼續進入判斷的環節以及後續的操作。

總結一下本節介紹的基於條件變量實現的生產者消費者模型:A 類線程依次執行加鎖、檢查 (條件不成立則等待, 知道成立再次進入檢查階段)、執行、解鎖;同時 B 類線程依次執行加鎖、條件置爲真、通知、解鎖。

2. Redis 核心網絡模型架構的演進

從 Redis 的發展歷史我們可以看出,其核心網絡模型經歷了從單線程到多線程的演進。那麼 Redis 單線程的 I/O 模型是怎樣實現的?單線程存在什麼問題?Redis 的多線程的 I/O 又是怎樣實現的?爲什麼要這麼做? 本節我們就來探索這些問題。

2.1 單線程 I/O 的工作流程

我們前邊有說到,Redis 在 6.0 版本以前,其核心的網絡模型一直是單線程 Reactor 模型,利用 epoll/select/kqueue 等多路複用技術,在單線程的事件循環中不斷去處理客戶端請求,最後回寫響應數據到客戶端:

我們先來學習一下 redis 網絡模型中的一些核心概念:

/* client對象在redis不同版本的redis.h中均有定義
 * With multiplexing we need to take per-client state.
 * Clients are taken in a linked list. */
typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t qb_pos;          /* The position we have read in querybuf. */
    sds pending_querybuf;   /* If this client is flagged as master, this buffer
                               represents the yet not applied portion of the
                               replication stream that we are receiving from
                               the master. */
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    ...... //此處省略很多其它的屬性
    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

然後,筆者再描述一下 Redis 服務端的工作流程:

  1. Redis 服務器啓動,開啓主線程事件循環(Event Loop),註冊 acceptTcpHandler 連接應答處理器到用戶配置的監聽端口對應的文件描述符,等待新連接到來;

  2. 客戶端和服務端建立網絡連接;

  3. acceptTcpHandler 被調用,主線程使用 AE 的 API 將 readQueryFromClient 命令讀取處理器綁定到新連接對應的文件描述符上,並初始化一個 client 綁定這個客戶端連接;

  4. 客戶端發送請求命令,觸發讀就緒事件,主線程調用 readQueryFromClient 通過 socket 讀取客戶端發送過來的命令存入 client->querybuf 讀入緩衝區;

  5. 接着調用 processInputBuffer,在其中使用 processInlineBuffer 或者 processMultibulkBuffer 根據 Redis 協議解析命令,最後調用 processCommand 執行命令;

  6. 根據請求命令的類型(SET, GET, DEL, EXEC 等),分配相應的命令執行器去執行,最後調用 addReply 函數族的一系列函數將響應數據寫入到對應 client 的寫出緩衝區:client->buf 或者 client->reply ,client->buf 是首選的寫出緩衝區,固定大小 16KB,一般來說可以緩衝足夠多的響應數據,但是如果客戶端在時間窗口內需要響應的數據非常大,那麼則會自動切換到 client->reply 鏈表上去,使用鏈表理論上能夠保存無限大的數據(受限於機器的物理內存),最後把 client 添加進一個 LIFO 隊列 clients_pending_write;

  7. 在事件循環(Event Loop)中,主線程執行 beforeSleep --> handleClientsWithPendingWrites,遍歷 clients_pending_write 隊列,調用 writeToClient 把 client 的寫出緩衝區裏的數據回寫到客戶端,如果寫出緩衝區還有數據遺留,則註冊 sendReplyToClient 命令回覆處理器到該連接的寫就緒事件,等待客戶端可寫時在事件循環中再繼續回寫殘餘的響應數據。

筆者整理了一張更詳細的 redis 單線程 I/O 的工作流程圖,感興趣的讀者可以結合上邊的描述過程看一下。

2.2 單線程存在什麼問題?

我們都知道單線程的程序是無法利用服務器的多核 CPU 的,那麼早期的 Redis 爲什麼還要使用單線程呢?我們不妨先看一下Redis官方給出的 FAQ

核心意思是:CPU 並不是制約Redis性能表現的瓶頸所在,更多情況下是受到內存大小和網絡 I/O 的限制,所以 Redis 核心網絡模型使用單線程並沒有什麼問題,如果你想要使用服務的多核 CPU,可以在一臺服務器上啓動多個實例或者採用分片集羣的方式。

通過上一節對 Redis 單線程 Reactor 模型的分析,我們知道 Redis 的 I/O 線程除了在等待事件,其它的事件都是非阻塞的,沒有浪費任何的 CPU 時間,這也是 Redis 能夠提供高性能服務的原因。不過除了我們上邊說的只能使用一個 CPU 核心外,這個模型還有兩個缺陷:

redis 主線程的時間主要消耗在兩個地方:邏輯計算的消耗 (CPU 計算) 和由於同步 IO(I/O multiplexing)的讀寫,內核態和用戶態相互拷貝數據。當 value 很大時,Redis 的瓶頸會首先出現在同步 IO 這裏,如果能有多個線程來分擔這部分壓力,那 Redis 的 QPS 還能有大幅度的提升,這就是 Redis 多線程網絡模型的實現思路。

2.3 Redis 多線程網絡模型的實現

Redis 在 6.0 版本之後正式在覈心網絡模型中引入了多線程,前邊我們提到 Redis 在 6.0 版本之前,使用的是經典的單線程 Reactor 模型,通常來說,單線程的 Reactor 架構模型在引入了多線程之後會進化爲 Multi-Reactors 模式,它的工作模式是這樣的:

區別於單 Reactor 模式,這種模式不再是單線程的事件循環,而是有多個線程(Sub Reactors)各自維護一個獨立的事件循環,由 Main Reactor 負責接收新連接並分發給 Sub Reactors 去獨立處理,最後 Sub Reactors 回寫響應給客戶端。

Multiple Reactors 模式通常也可以等同於 Master-Workers 模式,比如 Nginx 和 Memcached 等就是採用這種多線程模型,雖然不同的項目實現細節略有區別,但總體來說模式是一致的。

2.3.1 多線程網絡模型的工作流程

Redis 雖然也實現了多線程,但是卻不是標準的 Multi-Reactors/Master-Workers 模式,我們先看一下 Redis 多線程網絡模型的總體設計:

  1. Redis 服務器啓動,開啓主線程事件循環(Event Loop),註冊 acceptTcpHandler 連接應答處理器到用戶配置的監聽端口對應的文件描述符,等待新連接到來;

  2. 客戶端和服務端建立網絡連接;

  3. acceptTcpHandler 被調用,主線程使用 AE 的 API 將 readQueryFromClient 命令讀取處理器綁定到新連接對應的文件描述符上,並初始化一個 client 綁定這個客戶端連接;

  4. 客戶端發送請求命令,觸發讀就緒事件,服務端主線程不會通過 socket 去讀取客戶端的請求命令,而是先將 client 放入一個 LIFO 隊列 clients_pending_read;

  5. 在事件循環(Event Loop)中,主線程執行 beforeSleep -->handleClientsWithPendingReadsUsingThreads,利用 Round-Robin 輪詢負載均衡策略,把 clients_pending_read 隊列中的連接均勻地分配給 I/O 線程各自的本地 FIFO 任務隊列 io_threads_list[id] 和主線程自己,I/O 線程通過 socket 讀取客戶端的請求命令,存入 client->querybuf 並解析第一個命令,但不執行命令,主線程忙輪詢,等待所有 I/O 線程完成讀取任務;

  6. 主線程和所有 I/O 線程都完成了讀取任務,主線程結束忙輪詢,遍歷 clients_pending_read 隊列,執行所有客戶端連接的請求命令,先調用 processCommandAndResetClient 執行第一條已經解析好的命令,然後調用 processInputBuffer 解析並執行客戶端連接的所有命令,在其中使用 processInlineBuffer 或者 processMultibulkBuffer 根據 Redis 協議解析命令,最後調用 processCommand 執行命令;

  7. 根據請求命令的類型(SET, GET, DEL, EXEC 等),分配相應的命令執行器去執行,最後調用 addReply 函數族的一系列函數將響應數據寫入到對應 client 的寫出緩衝區:client->buf 或者 client->reply ,client->buf 是首選的寫出緩衝區,固定大小 16KB,一般來說可以緩衝足夠多的響應數據,但是如果客戶端在時間窗口內需要響應的數據非常大,那麼則會自動切換到 client->reply 鏈表上去,使用鏈表理論上能夠保存無限大的數據(受限於機器的物理內存),最後把 client 添加進一個 LIFO 隊列 clients_pending_write;

  8. 在事件循環(Event Loop)中,主線程執行 beforeSleep --> handleClientsWithPendingWritesUsingThreads,利用 Round-Robin 輪詢負載均衡策略,把 clients_pending_write 隊列中的連接均勻地分配給 I/O 線程各自的本地 FIFO 任務隊列 io_threads_list[id] 和主線程自己,I/O 線程通過調用 writeToClient 把 client 的寫出緩衝區裏的數據回寫到客戶端,主線程忙輪詢,等待所有 I/O 線程完成寫出任務;

  9. 主線程和所有 I/O 線程都完成了寫出任務, 主線程結束忙輪詢,遍歷 clients_pending_write 隊列,如果 client 的寫出緩衝區還有數據遺留,則註冊 sendReplyToClient 到該連接的寫就緒事件,等待客戶端可寫時在事件循環中再繼續回寫殘餘的響應數據。

這裏大部分邏輯和之前的單線程模型是一致的,變動的地方僅僅是把讀取客戶端請求命令和回寫響應數據的邏輯異步化了,交給 I/O 線程去完成,這裏需要特別注意的一點是:I/O 線程僅僅是讀取和解析客戶端命令而不會真正去執行命令,客戶端命令的執行最終還是要回到主線程上完成。

筆者也同樣整理了一張更詳細的 redis 多線程 I/O 的工作流程圖,感興趣的讀者可以結合上邊的描述過程看一下。

2.3.2 多線程 I/O 源碼剖析

接下里,筆者以 Redis v6.0.1 版本,對多線程 I/O 實現中比較重要的源碼進行分析。

先來看多線程的初始化:

/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */
    //如果只配置了一個線程,則所有的I/O放到主線程上執行
    if (server.io_threads_num == 1) return;
    //最多配置的線程數量不超過128
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
    //啓動線程,線程數量爲配置中的線程數
    for (int i = 0; i < server.io_threads_num; i++) {   
        //創建I/O線程的本地任務隊列
        io_threads_list[i] = listCreate();
        if (i == 0) continue; //0號線程是主線程
        //初始化 I/O 線程並啓動。
        pthread_t tid;
        // 每個 I/O 線程會分配一個本地鎖,用來休眠和喚醒線程。
        pthread_mutex_init(&io_threads_mutex[i],NULL); 
        // 每個 I/O 線程分配一個原子計數器,用來記錄當前遺留的任務數量。
        io_threads_pending[i] = 0;
        // 主線程在啓動 I/O 線程的時候會默認先鎖住它,直到有 I/O 任務才喚醒它。
        pthread_mutex_lock(&io_threads_mutex[i]);
        // 啓動線程,進入 I/O 線程的主邏輯函數 IOThreadMain
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

initThreadedIO會在 Redis 服務器啓動的末尾被調用,進行 I/O 多線程的初始化工作。Redis 的多線程模式是關閉的,需要用戶在 redis.conf 配置文件中開啓並設置:

io-threads 2
io-threads-do-reads yes

注意通過配置 io-theads 設置的 I/O 線程數量也包括主線程在內,io-threads-do-reads 標識 I/O 線程是否參與讀 I/O 的處理,提供這個配置的原因是因爲,讀 I/O 的並行處理對於 Redis 的性能提升並不明顯。

我們再來看讀取請求的源代碼。當客戶端發送請求命令之後,會觸發 Redis 主線程的事件循環,命令處理器 readQueryFromClient 被回調,在以前的單線程模型下,這個方法會直接讀取解析客戶端命令並執行,但是多線程模式下,則會把 client 加入到 clients_pending_read 任務隊列中去,後面主線程再分配到 I/O 線程去讀取客戶端請求命令:

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    //如果開啓了多線程,將client加入到異步隊列之後返回
    if (postponeClientRead(c)) return;
    ...... //省略代碼,邏輯同單線程版本幾乎一樣
}
/* Return 1 if we want to handle the client read later using threaded I/O.
 * This is called by the readable handler of the event loop.
 * As a side effect of calling this function the client is put in the
 * pending read clients and flagged as such. */
int postponeClientRead(client *c) {
    //當多線程 I/O 模式開啓、主線程沒有在處理阻塞任務時,將 client 加入異步隊列。
    if (io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 給 client 打上 CLIENT_PENDING_READ 標識,表示該 client 需要被多線程處理,
        // 後續在 I/O 線程中會在讀取和解析完客戶端命令之後判斷該標識並放棄執行命令,讓主線程去執行。
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

主線程會在事件循環的 beforeSleep() 方法中,調用 handleClientsWithPendingReadsUsingThreads:

/* When threaded I/O is also enabled for the reading + parsing side, the
 * readable handler will just put normal clients into a queue of clients to
 * process (instead of serving them synchronously). This function runs
 * the queue using the I/O threads, and process them in order to accumulate
 * the reads in the buffers, and also parse the first command available
 * rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;
    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
    // 遍歷待讀取的 client 隊列 clients_pending_read,
    // 通過 RR 輪詢均勻地分配給 I/O 線程和主線程自己(編號 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    // 設置當前 I/O 操作爲讀取操作,給每個 I/O 線程的計數器設置分配的任務數量,
    // 讓 I/O 線程可以開始工作:只讀取和解析命令,不執行。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }
     // 主線程自己也會去執行讀取客戶端請求命令的任務,以達到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    // 忙輪詢,累加所有 I/O 線程的原子任務計數器,直到所有計數器的遺留任務數量都是 0,
    // 表示所有任務都已經執行完成,結束輪詢。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");
    // 遍歷待讀取的 client 隊列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 標記,
    // 然後解析並執行所有 client 的命令。
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            // client 的第一條命令已經被解析好了,直接嘗試執行。
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
        }
        // 繼續解析並執行 client 命令。
        processInputBuffer(c);
    }
    return processed;
}

這裏主線程所做的核心工作就是:

與讀取相求相對應的自然是寫回響應了,完成命令的讀取、解析以及執行之後,客戶端命令的響應數據已經存入 client->buf 或者 client->reply 中了,接下來就需要把響應數據回寫到客戶端了,還是在 beforeSleep 中, 主線程調用 handleClientsWithPendingWritesUsingThreads:

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */
    // 如果用戶設置的 I/O 線程數等於 1 或者當前 clients_pending_write 隊列中待寫出的 client
    // 數量不足 I/O 線程數的兩倍,則不用多線程的邏輯,讓所有 I/O 線程進入休眠,
    // 直接在主線程把所有 client 的相應數據回寫到客戶端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
   // 喚醒正在休眠的 I/O 線程(如果有的話)。
    if (!io_threads_active) startThreadedIO();
    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
    // 遍歷待寫出的 client 隊列 clients_pending_write,
    // 通過 RR 輪詢均勻地分配給 I/O 線程和主線程自己(0號線程)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    // 設置當前 I/O 操作爲寫出操作,給每個 I/O 線程的計數器設置分配的任務數量,
    // 讓 I/O 線程可以開始工作,把寫出緩衝區(client->buf 或 c->reply)中的響應數據回寫到客戶端。
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }
    // 主線程自己也會去執行讀取客戶端請求命令的任務,以達到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
    // 忙輪詢,累加所有 I/O 線程的原子任務計數器,直到所有計數器的遺留任務數量都是 0。
    // 表示所有任務都已經執行完成,結束輪詢。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");
    // 最後再遍歷一次 clients_pending_write 隊列,檢查是否還有 client 的中寫出緩衝區中有殘留數據,
    // 如果有,那就爲 client 註冊一個命令回覆器 sendReplyToClient,等待客戶端寫就緒再繼續把數據回寫。
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 檢查 client 的寫出緩衝區是否還有遺留數據。
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    return processed;
}

與讀取請求相似,主線程處理寫回響應的核心工作是:

上邊我們分析了主線程的工作,下邊我們再來看一下 I/O 線程的主邏輯都做了些什麼:

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    while(1) {
        // 忙輪詢,100w 次循環,等待主線程分配 I/O 任務。
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }
        // 如果 100w 次忙輪詢之後如果還是沒有任務分配給它,則通過嘗試加鎖進入休眠,
        // 等待主線程分配任務之後調用 startThreadedIO 解鎖,喚醒 I/O 線程去執行。
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        serverAssert(io_threads_pending[id] != 0);
        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
        // 注意:主線程分配任務給 I/O 線程之時,
        // 會把任務加入每個線程的本地任務隊列 io_threads_list[id],
        // 但是當 I/O 線程開始執行任務之後,主線程就不會再去訪問這些任務隊列,避免數據競爭。
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 如果當前是寫出操作,則把 client 的寫出緩衝區中的數據回寫到客戶端。
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            // 如果當前是讀取操作,則socket 讀取客戶端的請求命令並解析第一條命令。
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        // 所有任務執行完之後把自己的計數器置 0,主線程通過累加所有 I/O 線程的計數器
        // 判斷是否所有 I/O 線程都已經完成工作。
        io_threads_pending[id] = 0;
        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

I/O 線程啓動之後,會先進入忙輪詢,判斷原子計數器中的任務數量,如果是非 0 則表示主線程已經給它分配了任務,開始執行任務,否則就一直忙輪詢一百萬次等待,忙輪詢結束之後再查看計數器,如果還是 0,則嘗試加本地鎖,因爲主線程在啓動 I/O 線程之時就已經提前鎖住了所有 I/O 線程的本地鎖,因此 I/O 線程會進行休眠,等待主線程喚醒。

主線程會在每次事件循環中嘗試調用 startThreadedIO 喚醒 I/O 線程去執行任務,如果接收到客戶端請求命令,則 I/O 線程會被喚醒開始工作,根據主線程設置的 io_threads_op 標識去執行命令讀取和解析或者回寫響應數據的任務,I/O 線程在收到主線程通知之後,會遍歷自己的本地任務隊列 io_threads_list[id],取出一個個 client 執行任務:

至此,我們就分析完了多線程 I/O 工作的核心源碼。

2.3.3 I/O 多線程 lock-free 的設計亮點

細心的讀者可能會發現,Redis 的多線程 I/O 設計是無鎖化的,lock-free 一直是多線程設計中的一個特色,Redis 是通過原子操作 + 交錯訪問來實現的,主線程和 I/O 線程之間共享的變量有三個:io_threads_pending 計數器、io_threads_op I/O 標識符和 io_threads_list 線程本地任務隊列。

io_threads_pending 是原子變量,不需要加鎖保護,io_threads_op 和 io_threads_list  這兩個變量則是通過控制主線程和 I/O 線程交錯訪問來規避共享數據競爭問題:I/O 線程啓動之後會通過忙輪詢和鎖休眠等待主線程的信號,在這之前它不會去訪問自己的本地任務隊列 io_threads_list[id],而主線程會在分配完所有任務到各個 I/O 線程的本地隊列之後纔去喚醒 I/O 線程開始工作,並且主線程之後在 I/O 線程運行期間只會訪問自己的本地任務隊列 io_threads_list[0] 而不會再去訪問 I/O 線程的本地隊列,這也就保證了主線程永遠會在 I/O 線程之前訪問 io_threads_list 並且之後不再訪問,保證了交錯訪問。io_threads_op 同理,主線程會在喚醒 I/O 線程之前先設置好 io_threads_op 的值,並且在 I/O 線程運行期間不會再去訪問這個變量。

2.3.4 多線程 I/O 的性能提升與設計缺陷

事實上,Redis通過多線程來提升性能達到的效果可能比你想象中的還要好,下邊是從 ITNEXT 平臺提取的一張壓測結果報告圖,更詳細的內容可參考:對實驗性 Redis 多線程 I / O 進行基準測試

可以看到,在配置 24 個 IO-threads 時,執行普通的 GET、SET 指令,Redis能夠達到 20W + 的 QPS,相比較於單線程,性能提升了將近一倍。

我們前邊提到,Redis 的多線程網絡模型爲了保持與舊版本的兼容性,並沒有使用標準的 Multi-Reactors/Master-Workers 模型。客戶端命令的執行還是在主線程上完成的。並且主線程和 I/O 線程的通信也是通過忙輪詢和鎖粗暴解決的,這會導致 Redis 在服務期間有 CPU 的空轉問題,後邊我們研究 BIO 系統的實現之後,會發現 BIO 線程與主線程之間的通信,相比較與 Redis 的核心多線程網絡模型中通信的實現,要優雅很多。相信之後 Redis 的作者會對目前的方案進行改進。

3. Redis BIO 系統的演進

本文一開始就給出了結論:對於整個服務端架構的設計而言,Redis 從來都不是單線程的。我們討論的單線程,只是針對 Redis 的核心網絡模型而言,這部分的內容我們在上一節已經深入剖析過了。對於 Redis 的 BIO 系統,也就是這些所謂的後臺線程,也經歷了一個演進的過程,在這個過程中都發生了哪些事?Redis 的作者 antirez 對 BIO 系統做了哪些優化呢?本節我們就來一起探索下這兩個問題。

3.1 早期的 BIO 系統設計與實現

早期的 Redis 通過 bio 系統完成兩件事,一是進行 Aof 持久化,也就是將寫入到系統的 page cache 的數據 fsync 到磁盤中;二是關閉文件。爲了完成這件任務,其採用了任務隊列的方式,每個任務都是一個線程來完成,任務會被放到任務隊列中,然後由執行任務線程取走,如果隊列空,則阻塞等待,如果隊列裏有任務,就通知工作線程,這通過條件變量來實現。後面以任務初始化,任務放入隊列,任務出隊列三個方面進行介紹,並且以 aof 持久化爲例說明其在系統中的使用方式,本節的內容與代碼分析基於 redis 的 3.2.3 版本。

3.1.1 任務初始化

對於一個任務,比如 aof 持久化任務,首先要初始化一個隊列,在 redis 裏面使用了 redis 自己的鏈表結構建立這個隊列。這個隊列需要滿足以下特點:

這裏的消費者就是服務線程,而爲了完成隊列爲空則等待的功能,redis 使用了條件變量機制。其初始化代碼如下。

static pthread_t bio_threads[BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_condvar[BIO_NUM_OPS];
static list *bio_jobs[BIO_NUM_OPS];

上面的常量 BIO_NUM_OPS = 2, 表示支持兩種任務。對於每種任務,對應一個 list 用於放置任務,一個 pthread_cond_t 和 pthread_mutex_t 變量用於併發控制,以及一個 pthread_t 用於後臺服務線程。初始化使用了 bioInit 函數,部分代碼如下:

for (j = 0; j < BIO_NUM_OPS; j++) {
    pthread_mutex_init(&bio_mutex[j],NULL);
    pthread_cond_init(&bio_condvar[j],NULL);
    bio_jobs[j] = listCreate();
    bio_pending[j] = 0;
}//初始化鎖與條件變量
......
......
for (j = 0; j < BIO_NUM_OPS; j++) { 
    void *arg = (void*)(unsigned long) j;
    //這裏的函數參數是arg = j,也就是每個線程傳入一個編號j,0代表關閉文件,1代表aof初始化 
    if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {    
        serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); 
        exit(1); 
    } 
    bio_threads[j] = thread; 
}//初始化線程

在完成初始化任務以後,我們有了 BIO_NUM_OPS(其值爲 2) 個鏈表表示任務隊列,有兩個線程調用 bioProcessBackgroundJobs 函數,參數是一個編號 j,並且每個隊列都初始化了鎖與條件變量做併發控制。

3.1.2 任務入隊列

任務入隊列就是把一個任務放到鏈表的頭部, 並且把相應任務的 pending 值 + 1, 表示這個隊列裏面未完成的任務多了一個。其中任務的結構如下:

struct bio_job {
    time_t time;
    void *arg1, *arg2, *arg3;
};

可以看到,任務不是很複雜,只記錄一個時間和參數就可以了,後面講任務執行的時候,會講到這樣一個簡單的結構記錄的任務怎麼執行。任務入隊列的代碼如下:

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));
    job->arg1 = arg1;
 ...
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_condvar[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

這段入隊列的代碼先爲任務結構分配空間,然後使用 listAddNodeTail 函數把任務放到鏈表的頭部。這裏使用的是 redis 自己實現的鏈表。可以看到,進行鏈表操作的時候,要先加鎖,這是因爲這裏的鏈表是共享資源。在任務成功加入隊列以後,調用 pthread_cond_signal 函數,通知阻塞等待的線程繼續執行。上面這個過程是共享變量使用的基本模式: 加鎖、置條件爲真 (這裏是任務入隊列)、通知、解鎖。

3.1.3 任務出隊列

我們已經做好了任務初始化的工作,並且可以在隊列裏面放置新的任務,那麼當隊列裏面有任務的時候,我們在第一步初始化的時候開啓的後臺線程就會調用 bioProcessBackgroundJobs 函數進行任務處理,其處理主要代碼如下:

void *bioProcessBackgroundJobs(void *arg) {
    unsigned long type = (unsigned long) arg;
    struct bio_job *job;
    while(1) {
        listNode *ln;
        pthread_mutex_lock(&bio_mutex[type]);        
        if (listLength(bio_jobs[type]) == 0) {
            //條件不成立,等待
            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
            //被通知以後,停止阻塞,重新判斷條件
            continue;
        }
        //條件成立,直接執行
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        //取走值以後,解鎖
        pthread_mutex_unlock(&bio_mutex[type]);
        //完成隊列處理以後,根據類型調用close函數或者fsync函數。
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            fsync((long)job->arg1);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}

上面的代碼主要流程是,先判斷當前的隊列是不是空的,如果是空的,則等待。否則,從隊列中取出一個 job 結構,並且根據線程的類型決定調用什麼函數。這裏的類型通過創建線程是傳如的參數獲得,可以是 0 或者 1。獲得類型以後,從 job 裏面取出 arg1 作爲參數,調用 close 函數或者 fsync 函數。arg1 是一個文件描述符,所以,在任務加入隊列的時候,只是需要放一個文件描述符如隊列,這也就是爲什麼 bio_job 結構體會設置得如此簡單。

3.1.4 Aof 持久化的例子

Aof 持久化是 redis 的兩大持久化方式之一,其會以字符串的形式把對 redis 的每一個操作都先記錄在內存的一個 buffer 中,然後寫入文件,並且在適當的時間使用 fsync 將數據刷入磁盤,而調用 fsync 的其中一種方式就是使用上面介紹的 bio 系統,其使用的方式遵循了上面說的三個步驟。

首先,在 server.c 中的 main 函數里面,有一個 initServer 函數,其內部調用了 bioInit 函數,完成了 bio 系統的初始化,這樣,相關的隊列結構被建立,後臺線程也被創建了。在 redis 主循環被啓動以後,會進入持久化的時機,調用 flushAppendOnlyFile 函數,完成 aof 持久化工作。這個函數會處理 aof 相關的配置以及優化等各類問題,在本文只關注對 bio 系統的使用,其相關代碼如下:

if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
    sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
......
......
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}

可以看到,其通過 bioPendingJobsOfType 來檢查當前隊列處理的情況,並且調用 bioCreateBackgroundJob 來將 aof 任務加入隊列。由於在前面已經完成了線程的創建,在隊列中有任務的時候,線程就會啓動,並且通過上面講的 fsync 函數完成持久化操作。

Redis 的 Bio 是一個非常好的在實際系統中使條件變量的例子。

3.2 令人頭痛的問題: 大 key 應該如何刪除?

我們知道 Redis 的 DEL 命令用來刪除一個或多個 KEY 存儲的值,它是一個阻塞的命令,有的時候你要刪除一個超大的鍵值對,比如一個有上百萬的對象,這條命令可能就會被阻塞好幾秒了;又因爲 Redis 執行指令都是在主線程上執行的,所以整個服務必然會有大量慢查詢,吞吐量急劇下降!Redis 的作者 antirez 也遇到了這個問題。

那麼針對這個問題怎麼優化呢?研究過 Redis 源碼的讀者可能會想到了:使用漸進式 rehash 的方案,利用定時器和數據遊標,每次刪除一部分數據怎麼樣呢?其實這種思路,antirez 並不是沒有考慮過,但是最終 antirez 採取的方案是在原有的 BIO 系統中再增加一個成員。那麼我們想到的漸進式刪除方案有什麼問題呢?問題的答案要從 antriez 在 2015 年發表的一篇博客中查找了:Lazy Redis is better Redis

筆者截取了 antirez 舉的一個例子:

爲什麼說漸進式懶刪除很難做呢?作者說當我們刪除一個集合的時候,可能我們刪除集合中元素的速度還沒有客戶端向集合中添加元素的速度快,那我們的刪除工作看起來是永遠也無法完成了。

3.3 BIO 系統再添新成員——lazyfree

上一小節我們分析了使用 DEL 命令刪除大 key 時會造成 redis 服務端阻塞,除此之外,在使用 FLUSHDB 和 FLUSHALL 刪除包含大量鍵的數據庫時,或者 redis 在清理過期數據和淘汰內存超限的數據時,如果碰巧撞到了大體積的鍵也會造成服務器阻塞。

爲了解決以上問題, redis 4.0 引入了 lazyfree 的機制,它可以將刪除鍵或數據庫的操作放在後臺線程裏執行, 從而儘可能地避免服務器阻塞。

lazyfree 的原理不難想象,就是在刪除對象時只是進行邏輯刪除,然後把對象丟給後臺,讓後臺線程去執行真正的 destruct,避免由於對象體積過大而造成阻塞。

redis 的 lazyfree 實現即是如此,下面我們主要看一下 unlink 命令的實現 (以下代碼是基於 Redis 4.0 的):

void unlinkCommand(client *c) {
    delGenericCommand(c, 1);
}

入口很簡單,就是調用 delGenericCommand,第二個參數爲 1 表示需要異步刪除。

/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
    int numdel = 0, j;
    for (j = 1; j < c->argc; j++) {
        expireIfNeeded(c->db,c->argv[j]);
        int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                              dbSyncDelete(c->db,c->argv[j]);
        if (deleted) {
            signalModifiedKey(c->db,c->argv[j]);
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
                "del",c->argv[j],c->db->id);
            server.dirty++;
            numdel++;
        }
    }
    addReplyLongLong(c,numdel);
}

delGenericCommand 函數根據 lazy 參數來決定是同步刪除還是異步刪除,同步刪除的邏輯沒有什麼變化就不細講了,我們重點看下新增的異步刪除的實現。

#define LAZYFREE_THRESHOLD 64
// 首先定義了啓用後臺刪除的閾值,對象中的元素大於該閾值時才真正丟給後臺線程去刪除,如果對象中包含的元素太少就沒有必要丟給後臺線程,因爲線程同步也要一定的消耗。
int dbAsyncDelete(redisDb *db, robj *key) {
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    //清除待刪除key的過期時間
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    //dictUnlink返回數據庫字典中包含key的條目指針,並從數據庫字典中摘除該條目(並不會釋放資源)
    if (de) {
        robj *val = dictGetVal(de);
        size_t free_effort = lazyfreeGetFreeEffort(val);
        //lazyfreeGetFreeEffort來獲取val對象所包含的元素個數
        if (free_effort > LAZYFREE_THRESHOLD) {
            atomicIncr(lazyfree_objects,1);
            //原子操作給lazyfree_objects加1,以備info命令查看有多少對象待後臺線程刪除
            bioCreateBackgroundJob(BIO_LAZY_FREE ,val,NULL,NULL);
            //此時真正把對象val丟到後臺線程的任務隊列中
            dictSetVal(db->dict,de,NULL);
            //把條目裏的val指針設置爲NULL,防止刪除數據庫字典條目時重複刪除val對象
        }
    }
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        //刪除數據庫字典條目,釋放資源
        return 1;
    } else {
        return 0;
    }
}

以上便是異步刪除的邏輯,首先會清除過期時間,然後調用 dictUnlink 把要刪除的對象從數據庫字典摘除,再判斷下對象的大小(太小就沒必要後臺刪除),如果足夠大就丟給後臺線程,最後清理下數據庫字典的條目信息。

由以上的邏輯可以看出,當 unlink 一個體積較大的鍵時,實際的刪除是交給後臺線程完成的,所以並不會阻塞 redis。

4. 小結

本文內容確實太過於冗長了,如果讀者將文章仔細看到了這裏,我相信您一定是對這篇文章非常感興趣了。筆者使用 GDB 調試 Redis 6.0 版本代碼時,輸出了 Redis 啓動後的線程信息,我們就從下面這張圖結合本文的內容做一個總結吧:

可以看到筆者的調試信息中,Redis 啓動了 6 個線程,它們分別是:

以上就是筆者對 Redis 各個線程的功能和發展歷史的總結,脈絡很清晰!希望讀者不僅限於瞭解 Redis 多線程的演進過程,還能清楚的知道這個過程中 Redis 都遇到了哪些問題,並對這些問題進行自己的思考,要知道作者 antirez 最終採取了什麼樣方案,他又是基於哪些因素考慮的。此外,本文分析了 Redis 架構設計的很多模型和源碼,比如基於鎖和共享變量實現的多線程生產者消費者模型、redis 多線程 I/O 做到了 lock-free,這些也都是我們在系統設計和編碼中值得學習和借鑑的地方。最後,再次感謝堅持到最後的讀者!

參考 & 延伸閱讀

作者:繪你一世傾城

來源:juejin.cn/post/6949929673615179789

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/qqTgnG3ndeUiZXdnmzQUfQ