圖解 Kafka 的服務端的網絡通信模型

溫馨提示:圖不清楚可以點擊 閱讀原文 查看高清大圖

  1. Kafka 網絡模型使用的是什麼線程模型?

  2. 什麼是 ControllerPlane(控制器面板),什麼是 DataPlane(數據面板)?

  3. Kafka 整個請求流程是什麼樣子的

  4. 與 Kafka 網絡通信相關的配置。

Kafka 的網絡模型

Kafka 中的網絡模型就是基於 主從 Reactor 多線程進行設計的, 在整體講述 Kafka 網絡模型之前, 我們現在按照源碼中的相關類來講解一下他們分別都是用來做什麼的.

關鍵類解析

SocketServer

這個類是網絡通信的核心類, 它持有這 Acceptor 和 Processor 對象。

ConnectionQuotas

這個是控制連接數配額的類,

涉及到的 Broker 配置有:

crl4iP

AbstractServerThread

AbstractServerThread 類:這是 Acceptor 線程和 Processor 線程的抽象基類,它定義了一個抽象方法wakeup() , 主要是用來喚醒 Acceptor 線程和 Processor 對應的Selector的, 當然還有一些共用方法

Acceptor 和 Processor

Acceptor 線程類:繼承自 AbstractServerThread, 這是接收和創建外部 TCP 連接的線程。每個 SocketServer 實例一般會創建一個 Acceptor 線程 (如果listeners配置了多個就會創建多個 Acceptor)。它的唯一目的就是創建連接,並將接收到的 SocketChannel(SocketChannel 通道用於傳輸數據) 傳遞給下游的 Processor 線程處理, Processor 主要是處理連接之後的事情, 例如讀寫 I/O。

涉及到的 Broker 配置有:

R62u1x

Processor 線程類:這是處理單個 TCP 連接上所有請求的處理線程。每個 Acceptor  實例創建若干個(num.network.threads)Processor 線程。Processor 線程負責將接收到的 SocketChannel(SocketChannel 通道用於傳輸數據。), 註冊讀寫事件, 當數據傳送過來的時候, 會立即讀取 Request 數據, 通過解析之後, 然後將其添加到 RequestChannel 的 requestQueue 隊列上,同時還負責將 Response 返還給 Request 發送方。

涉及到的 Broker 配置有:

sxPwba

簡單畫了一張兩個類之間的關係圖

在這裏插入圖片描述

  1. 這兩個類都是 AbstractServerThead 的實現類, 超類是Runnable 可運行的。

  2. 每個 Acceptor 持有num.network.threadsProcessor 線程, 假如配置了多個listeners,那麼總共 Processor 線程數是 listeners*num.network.threads.

  3. Acceptor 創建的是 ServerSocketChannel 通道, 這個通道是用來監聽新進來的 TCP 鏈接的通道,
    通過serverSocketChannel.accept()方法可以拿到 SocketChannel 通道用於傳輸數據。

  4. 每個 Processor 線程都有一個唯一的 id,並且通過 Acceptor 拿到的 SocketChannel 會被暫時放入到newConnections隊列中

  5. 每個 Processor 都創建了自己的 Selector

  6. Processor 會不斷的從自身的newConnections隊列裏面獲取新 SocketChannel,並註冊讀寫事件, 如果有數據傳輸過來, 則會讀取數據, 並解析成 Request 請求。

既然兩個都是可執行線程, 那我們看看兩個線程的run方法都做了哪些事情

Acceptor.run

def run()Unit = {
    //將serverChannel 註冊到nioSelector上,並且對 Accept事件感興趣:表示服務器監聽到了客戶連接,那麼服務器可以接收這個連接了
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    try {
      var currentProcessorIndex = 0
      while (isRunning) {
        try {
          //返回感興趣的事件數量  這裏是感興趣的是SelectionKey.OP_ACCEPT,監聽到新的鏈接
          val ready = nioSelector.select(500)
          if (ready > 0) {
            //獲取所有就緒通道
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            //遍歷所有就緒通道
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                //只處理   Accept事件,其他的事件則拋出異常,ServerSocketChannel是 監聽Tcp的鏈接通道
                if (key.isAcceptable) {
                  //根據Key 拿到SocketChannle = serverSocketChannel.accept(),然後再遍歷
                  accept(key).foreach { socketChannel =>
                    
                    //將socketChannel分配給我們的 processor來處理,如果有多個socketChannel 則按照輪訓分配的原則
                    //如果一個processor 中能夠處理的newconnection 隊列滿了放不下了,則找下一個
                    // 如果所有的都放不下,則會一直循環直到有processor能夠處理。

                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      //輪訓每個processors來處理
                      processor = synchronized {
                        // adjust the index (if necessary) and retrieve the processor atomically for
                        // correct behaviour in case the number of processors is reduced dynamically
                        currentProcessorIndex = currentProcessorIndex % processors.length
                        processors(currentProcessorIndex)
                      }
                      currentProcessorIndex += 1
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          省略
        }
      }
    } finally {
     省略
    }
  }
  1. 將 ServerSocketChannel 通道註冊到 nioSelector 上, 並關注事件 SelectionKey.OP_ACCEPT

    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  2. while 循環, 持續阻塞監聽事件, 超時時間 500ms

     // 阻塞查詢Selector是否有監聽到新的事件
     val ready = nioSelector.select(500)
     // 如果有事件,則查詢具體的事件和通道
     if(ready>0>{
       //獲取所有就緒事件準備處理
            val keys = nioSelector.selectedKeys()
     }
  3. 遍歷剛剛監聽到的事件, 如果該 SelectionKey 不包含OP_ACCEPT(建立連接) 事件, 則拋出異常,通常不會出現這個異常。

    Unrecognized key state for acceptor thread
  4. 如果 SelectionKey 包含OP_ACCEPT(建立連接) 事件, 則可以通過這個 SelectionKey 拿到 serverSocketChannel,通過 serverSocketChannel 拿到 socketChannel, 並且將 SocketChannel 設置爲非阻塞模式。

      val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
     // 調用accept方法就可以拿到ScoketChannel了。
      val socketChannel = serverSocketChannel.accept()
       //設置爲非阻塞模式 就可以在異步模式下調用connect(), read() 和write()了。
      socketChannel.configureBlocking(false)
  5. 接下來, 把上面拿到的 SocketChannel遍歷的形式給 Acceptor 下面的 Procesor, 讓 Processor 來執行後面的處理。分配的體現形式是, 將拿到的 SocketChannel 保存在 Processor 中的newConnections阻塞隊列中, 這個newConnections上限是 20, 在代碼裏面寫死了的, 也就是說一個 Processor 同時最多隻能處理 20 個連接, 那麼所有的 Processor 能處理的最大連接就是 Processor 數量 * 20;如果你的連接請求併發度很高,可以嘗試調大num.network.threads

  6. 最後, 如果newConnections隊列放入了一個新的 SocketChannel, 則會調用一下對應 Processor 實例的wakeup()方法。

Procesor.run

  override def run()Unit = {
    startupComplete()
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          // 將之前監聽到的TCP鏈接(暫時保存在newConnections中) 開始註冊監聽OP_READ事件到每個Processor的 KSelector選擇器中。
          configureNewConnections()
          // register any new responses for writing
          processNewResponses()
          //在不阻塞的情況下對每個連接執行任何 I/O 操作。這包括完成連接、完成斷開連接、啓動新發送或在進行中的發送或接收上取得進展。
          // 當此調用完成時,用戶可以使用completedSends() 、 completedReceives() 、 connected() 、 disconnected()檢查已完成的發送、接收、連接或斷開連接。
          poll()
          // 把請求解析後放到 requestChannels 隊列中,異步處理
          processCompletedReceives()
          //處理已經發送完成的請求
          processCompletedSends()
          processDisconnected()
          closeExcessConnections()
        } catch {
          // We catch all the throwables here to prevent the processor thread from exiting. We do this because
          // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
          // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
          // be either associated with a specific socket channel or a bad request. These exceptions are caught and
          // processed by the individual methods above which close the failing channel and continue processing other
          // channels. So this catch block should only ever see ControlThrowables.
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
  }
  1. configureNewConnections(): 之前 Acceptor 監聽到的 SocketChannel 保存在 Procesor 中的newConnections阻塞隊列中, 現在開始將newConnections阻塞隊列一個個取出來, 向 Procesor 的 Selector 註冊 SocketChannel 通道,並且感興趣的事件爲SelectionKey.OP_READ讀事件。

  2. processNewResponses() : 去 Processor 裏面的無邊界阻塞隊列responseQueue裏面獲取 RequestChannel.Response 數據, 如果有數據並且需要返回 Response 的話, 則通過 channel 返回數據. 具體的 Channel 是根據 connectionId 獲取之前構建的 KafkaChannel, KafkaChannel 則會通過監聽 SelectionKey.OP_WRITE。然後調用writeTo方法。至於responseQueue這個隊列是什麼時候入隊的, 我們後面再分析

  3. poll(): 這個方法裏面執行的就很多了, 這個方法底層調用的是selector.poll();  將監聽到的事件批量處理, 它纔是執行 I/O 請求的最終地方, 它正對每個連接執行任何的 I/O 操作, 這包括了 完成連接、完成斷開連接、啓動新發送等等。 像校驗身份信息, 還有 handshake 等等這些也都是在這裏執行的。

  4. processCompletedReceives(): 處理所有 completedReceives(已完成接收的請求) 進行接下來的處理,  處理的方式是解析一下收到的請求, 最終調用了 requestChannel.sendRequest(req). 也就是說所有的請求最終通過解析放入到了 RequestChannel 中的requestQueue阻塞隊列中, 這個阻塞隊列的大小爲queued.max.requests默認 500;表示的是在阻塞網絡線程之前,數據平面允許的排隊請求數 PS: 這個completedReceives 是在   poll()方法中添加的元素。

  5. processCompletedSends(): 它負責處理 Response 的回調邏輯,通過遍歷completedSends(已完成發送) 集合 可以從inflightResponses中移除並拿到 response 對象, 然後再調用回調邏輯。 PS: 這個completedSends 是在   poll()方法中添加的元素。

  6. processDisconnected(): 處理斷開鏈接的情況, connectionQuotas 連接限流減掉這個鏈接,inflightResponses 也移除對應連接。

  7. closeExcessConnections(): 關閉超限連接 ,當總連接數 >max.connections && (inter.broker.listener.name!=listener|| listeners 數量 ==1) 則需要關閉一些連接.
    簡單來說就是:就算 Broker 已經達到了最大連接數的限制了, 也應該允許 broker 之間監聽器上的連接, 這種情況下, 將會關閉另外一個監聽器上最近最少使用的連接。broker 之間的監聽器是配置 inter.broker.listener.name 決定的 所謂優先關閉,是指在諸多 TCP 連接中找出最近未被使用的那個。這裏 “未被使用” 就是說,在最近一段時間內,沒有任何 Request 經由這個連接被髮送到 Processor 線程。

RequestChannel

這個類保存這所有的 Processor,還有一個阻塞隊列保存這待處理請求。這個隊列最大長度由queued.max.requests控制, 當待處理請求超過這個數值的時候網絡就會阻塞

涉及到的 Broker 配置有:

Ep5VWN

KafkaApis

具體 Request 的處理類, 所有的請求方法處理邏輯都放在這個裏面。

KafkaRequestHandlerPool

KafkaRequestHandler 的線程池,KafkaRequestHandler 線程的數量由配置num.io.threads決定。

在這裏插入圖片描述

涉及到的 Broker 配置有:

zeaJRP

KafkaRequestHandler

請求處理類, 每個 Handler 都會去 requestChannel 的 requestQueue 隊列裏面 poll 請求, 然後去處理,最終調用的處理方法是 KafkaApis.handle()

這幾個類之間的關係如下

在這裏插入圖片描述

通信流程總結

在這裏插入圖片描述

  1. KafkaServer 啓動的時候, 會根據listeners的配置來初始化對應的實例。

  2. 一個listeners對應一個 Acceptor,一個 Acceptor 持有若干個 (num.network.threads)Processor 實例。

  3. Acceptor 中的 nioSelector 註冊的是 ServerSocketChannel 通道, 並監聽 OP_ACCEPT 事件,它只負責 TCP 創建和連接, 不包含讀寫數據。

  4. 當 Acceptor 監聽到新的連接之後, 就會通過調用socketChannel = serverSocketChannel.accept()拿到 SocketChannel, 然後把 SocketChannel 保存在 Processor 裏面的newConnection隊列中。 那麼具體保存在哪個 Processor 中呢?當然是輪詢分配了, 確保負載均衡嘛。當然每個 Processor 的newConnection隊列最大隻有 20, 並且是代碼寫死的。如果一個 Processor 滿了, 則會尋找下一個存放, 如果所有的都滿了, 那麼就會阻塞。一個 Acceptor 的所有 Processor 最大能夠併發處理的請求是 20 * num.network.threads

  5. Processor 會持續的從自己的newConnection中 poll 數據, 拿到 SocketChannel 之後, 就把它註冊到自己的 Selector 中, 並且監聽事件 OP_READ。 如果newConnection是空的話, poll 的超時時間是 300ms。

  6. 監聽到有新的事件, 比較 READ,則會讀取數據, 並且解析成 Request, 把 Request 放入到 RequestChannel 中的requestQueue阻塞隊列中。所有的待處理請求都是臨時放在這裏面。這個隊列也有最大值queued.max.requests(默認500),超過這個大小就會阻塞。

  7. KafkaRequestHandlerPool 中創建了很多 (num.io.threads(默認8)) 的 KafkaRequestHandler, 用來處理 Request, 他們都會一直從 RequestChannel 中的requestQueue隊列中 poll 新的 Request, 來進行處理。

  8. 處理 Request 的具體邏輯是 KafkaApis 裏面。當 Request 處理完畢之後,會調用 requestChannel.sendResponse() 返回 Response。

  9. 當然, 請求 Request 和返回 Response 必須是一一對應的, 你這個請求是哪個 Processor 監聽到的, 則需要哪個 Processor 返回, 他們通過 id 來標識。

  10. Response 也不是裏面返回的, 而是先放到 Processor 中的 ResponseQueue 隊列中, 然後慢慢返回給客戶端。

數據面板 (DataPlane)

數據面板是用來處理 Broker 與 Broker/Client 之間的網絡模型模塊, 與之相對的是控制器面板

控制器面板 是專門用於 Controller 與 Broker 之間的網絡通信模塊。

其實本質上他們都是一模一樣的, 但是爲了將 Controller 的通信和普通通信隔離, 纔有這麼兩個概念。

上面的網絡通信模型就是以數據面板來分析的,因爲本質是一樣的, 只是有一些配置不一樣。

那麼. 數據面板 就不詳細講了, 我們主要講下控制器面板的不一樣的地方

控制器面板 (ControllerPlane)

控制器面板是用來專門處理 Controller 相關請求的獨立通信模塊。

大家都知道, Controller 是一個很重要的角色, 基本上大部分協調整個集羣的相關請求都跟它有關係, 比如創建 Topic、刪除 Topic、分區副本重分配、等等。他們都很重要

但是一般情況下數據面板的請求很多, 如果因爲請求過多而導致 Controller 相關請求被阻塞不能執行,那麼可能會造成一些影響, 所以我們可以讓 Controller 類的請求有一個單獨的通信模塊。

首先, 要啓用控制器面板, 必須配置control.plane.listener.name. 並且這個監聽器名稱必須在listeners裏面有配置

否則的話, 是不會專用的控制器鏈接的 EndPoint 的。

例如: Broker 配置

## 所有的監聽器
isteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094

## 監聽器對應的安全協議
listener.security.protocol.map = INTERNAL: PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL

## 控制器
control.plane.listener.name = CONTROLLER

在啓動時,代理將開始使用安全協議 “SSL” 監聽“192.1.1.8:9094”。 在控制器端,當它通過 zookeeper 發現代理發佈的端點時,它將使用 control.plane.listener.name 找到端點,它將用於建立與代理的連接。


  1. 必須配置control.plane.listener.name 才能使用獨立的控制器面板

  2. 控制器面板的 RequestChannel 中的requestQueue不是由queued.max.requests控制的, 而是寫死的 20. 因爲控制類請求不會有那麼大的併發

  3. 跟 DataPlane 相關隔離, 互不影響。但是連接限流 ConnectionQuotas 是共享的, 限流的時候, 兩個是算在一起的

  4. 控制類面板只有一個 Acceptor 和一個 Processor, 這個跟數據面板的區別是 DataPlane 的 Processor 可以有多個。

涉及到的 Broker 配置有:

WX0NPl


上面我們主要分析了一下, Kafka 中的網絡通信模型, 那麼聰明的你應該肯定能夠看的出來, 它是使用線程模型中的 Reactor 模式來實現的。

線程模型: Reactor 模式

該模塊詳細請參考 Reactor 模型

Reactor 模式,是指通過一個或多個輸入同時傳遞給服務處理器的服務請求的事件驅動處理模式。 服務端程序處理傳入多路請求,並將它們同步分派給請求對應的處理線程,Reactor 模式也叫 Dispatcher 模式。 即 I/O 多路複用統一監聽事件,收到事件後分發 (Dispatch 給某進程),是編寫高性能網絡服務器的必備技術之一。

根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實現:

  1. 單 Reactor 單線程;

  2. 單 Reactor 多線程;

  3. 主從 Reactor 多線程。

我們主要了解一下 主從 Reactor 多線程

針對單 Reactor 多線程模型中,Reactor 在單線程中運行,高併發場景下容易成爲性能瓶頸,可以讓 Reactor 在多線程中運行。

方案說明:

更詳細的介紹可以看 Reactor 模型

問答

1. Kafka 的網絡模型使用了 Reactor 模式的哪種實現方式?

  1. 單 Reactor 單線程;

  2. 單 Reactor 多線程;

  3. 主從 Reactor 多線程。

答案: 3 。 使用了主從 Reactor 多線程的實現方式.

在這裏插入圖片描述

MainReactor(Acceptor) 只負責監聽 OP_ACCEPT 事件, 監聽到之後把 SocketChannel 傳遞給 SubReactor(Processor), 每個 Processor 都有自己的 Selector。SubReactor 會監聽並處理其他的事件, 並最終把具體的請求傳遞給 KafkaRequestHandlerPool。

很典型的主從 Reactor 多線程模式。

2. 什麼是 ControllerPlane(控制器面板),什麼是 DataPlane(數據面板)?

控制器面板: 主要處理控制器類的的請求數據面板: 主要處理數據類的請求。

讓他們隔離, 互不影響, 比如說普通的請求太多, 導致了阻塞, 那麼 Controller 相關的請求也可能被阻塞了, 所以讓他們隔離, 不會互相影響。

但是默認情況下, ControllerPlane 是沒有設置的, 也就是 Controller 相關的請求還是走的 DataPlane。 想要隔離的話必須設置control.plane.listener.name .

  1. 必須配置control.plane.listener.name

  2. 控制器面板的 RequestChannel 中的requestQueue不是由queued.max.requests控制的, 而是寫死的 20. 因爲控制類請求不會有那麼大的併發

  3. 跟 DataPlane 相關隔離, 互不影響。但是連接限流 ConnectionQuotas 是共享的, 限流的時候, 兩個是算在一起的

  4. 控制類面板只有一個 Acceptor 和一個 Processor, 這個跟數據面板的區別是 DataPlane 的 Processor 可以有多個。

3. Kafka 整個請求流程是什麼樣子的

請看上面網絡通信總結部分。

石臻臻的雜貨鋪 進滴滴技術交流羣, 不同技術專家輪流值班, 本號分享 Java / 大數據 / 中間件等領域乾貨和視頻

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/D_AnkgweaNb20DU7v5FmQw