「協議」Kcp 協議介紹、Demo 講解與工作過程淺談

Tcp 和 Udp

傳輸控制協議(英語:Transmission Control Protocol,縮寫:TCP)是一種面向連接的、可靠的、基於字節流的傳輸層通信協議,其擁有着相對而言的可靠傳輸(相對 UDP),由於 Tcp 的相關特性如在連接之前先創建兩端的虛擬連接,以及發送數據的超時重傳、滑動窗口、流量 / 擁塞控制等特性保證了其可靠的傳輸,因而 TCP 通常會保證數據準確交付。

但由於其在穿輸數據之前需要進行虛擬連接的建立,這回消耗一定的傳輸時間,且在傳輸過程之中爲保證數據正確交付而採用的超時重傳、滑動窗口、流量 / 擁塞控制等機制也會消耗大量的傳輸時間,另外由於建立 TCP 需要進行虛擬連接建立,因此可能會被利用從而收到 DDOS,DOS 等攻擊。

總的來說,Tcp 算是一種消耗一定的傳輸性能而確保數據準確到達對端的一種協議,當需要網絡質量很高時,需要採用 Tcp 協議。

用戶數據報協議(英語:User Datagram Protocol,縮寫:UDP;又稱用戶數據包協議)是一個簡單的面向數據報的通信協議,其沒有 Tcp 的發送前建立連接這一特性因此其是一個無狀態協議,同時也沒有 Tcp 中確保數據準確到達的一些保證機制,所以對於 Tcp 來說 Udp 的傳輸很快速,但是從而引發出了 Udp 協議的是不可靠的傳輸協議,當網絡波動較大時,丟包率會很高,所以 Udp 保證數據快速到達但不能保證必定到達(管殺不管埋?),且雖然 Udp 無需建立連接,也就可能較 Tcp 被攻擊者利用的漏洞就要少一些。但是其仍然無法避免被攻擊。

快速的 Tcp?可靠的 Udp?

那麼有沒有一種協議可以既擁有 Tcp 的可靠性,又擁有 Udp 的快速性呢?(既要也要你想🍑)

誒,還真有,那就是本篇文章的主角 Kcp 協議,Kcp 是個啥呢?其官網 [1] 是這麼介紹的:

KCP 是一個快速可靠協議,能以比 TCP 浪費 10%-20% 的帶寬的代價,換取平均延遲降低 30%-40%,且最大延遲降低三倍的傳輸效果。純算法實現,並不負責底層協議(如 UDP)的收發,需要使用者自己定義下層數據包的發送方式,以 callback 的方式提供給 KCP。連時鐘都需要外部傳遞進來,內部不會有任何一次系統調用。

TCP 是爲流量設計的(每秒內可以傳輸多少 KB 的數據),講究的是充分利用帶寬。而 KCP 是爲流速設計的(單個數據包從一端發送到一端需要多少時間),以 10%-20% 帶寬浪費的代價換取了比 TCP 快 30%-40% 的傳輸速度。TCP 信道是一條流速很慢,但每秒流量很大的大運河,而 KCP 是水流湍急的小激流。KCP 有正常模式和快速模式兩種,通過以下策略達到提高流速的結果。

理論上來說 Kcp 其實並不是一個傳輸層協議, 其是在傳輸層協議(一般都使用 Udp 作爲基礎,下文也以 Udp 爲基礎展開)的基礎之上,通過算法方式實現 ARQ 模型以及可靠傳輸的應用層協議。

此處存疑,因爲 Kcp 不算是一個傳輸層協議,但是網絡上很多文章都將其算作爲是一個具有可靠性的傳輸層 ARQ 協議,作者對此表示不解,如果有大神瞭解此問題,請後臺留言,再次先做感謝。

Kcp 協議格式

那麼 Kcp 通過何種方式來保證其傳輸可靠性呢?首先來看一下 Kcp 協議的協議頭的組成格式。

•Conv :連接號,因爲其底層基於 Udp,Udp 無連接,所以需要一個連接號來確定本次消息來自於哪個客戶端。相當於代替了虛擬連接。•Cmd:控制字符,其中包括 IKCP_CMD_PUSH(推送數據)、IKCP_CMD_ACK(迴應)、IKCP_CMD_WASK(問問客戶端接口窗口多大)、IKCP_CMD_WINS(告訴服務器接收窗口多大)•Frg:分片,要傳輸的數據會分成幾個 Kcp 包丟出去。•Wnd:窗口大小。•Ts:時間序列。•Sn:Kcp 包的序列號。•Una:確認包的序列號,比如收到 sn=1 的包,回覆的 Ack 包的 una 就爲 sn+1。•Len:數據長度。•Data:實際傳輸的用戶數據,默認的 Mtu 爲 1400Byte。

在 Kcp 中每次的數據傳輸通常都是基於 Udp 的,也就是說在傳輸過程中,實際上 Kcp 的協議頭是被包含在 Udp 的協議的數據包之中的數據字段中(自行前往瞭解 Udp 協議頭長啥樣)的,舉個例子,下面的 Byte 數組打印結果是一個通過 udp.ReadFromUdp 接收到的 Udp 數據包,其中包含了 Kcp 的協議頭以及數據,我們來看一下他長啥樣:

從上圖中可以看到,Kcp 的協議頭及其數據都是包含在 Udp 數據包的數據字段中的,因此可以說 Kcp 是基於 Udp 的,且其不是一個傳輸層協議而是一個應用層協議(此處仍然存疑)。

爲什麼可靠?爲什麼快速?

Kcp 協議的開發者表示,KCP 是一個快速可靠協議,那麼可靠在哪裏快速在哪裏?衆所周知,Tcp 協議的可靠性是由連接之前先創建兩端的虛擬連接,以及發送數據的超時重傳、滑動窗口、流量 / 擁塞控制等特性保證了其可靠的傳輸,那麼 Kcp 爲何可靠且快速呢?官網 [2] 中給瞭如下的解釋,以下的保證可靠的方式均由純算法完成。

RTO 翻倍 vs 不翻倍:

TCP 超時計算是 RTOx2,這樣連續丟三次包就變成 RTOx8 了,十分恐怖,而 KCP 啓動快速模式後不 x2,只是 x1.5(實驗證明 1.5 這個值相對比較好),提高了傳輸速度。

選擇性重傳 vs 全部重傳:

TCP 丟包時會全部重傳從丟的那個包開始以後的數據,KCP 是選擇性重傳,只重傳真正丟失的數據包。

快速重傳:

發送端發送了 1,2,3,4,5 幾個包,然後收到遠端的 ACK: 1, 3, 4, 5,當收到 ACK3 時,KCP 知道 2 被跳過 1 次,收到 ACK4 時,知道 2 被跳過了 2 次,此時可以認爲 2 號丟失,不用等超時,直接重傳 2 號包,大大改善了丟包時的傳輸速度。

延遲 ACK vs 非延遲 ACK:

TCP 爲了充分利用帶寬,延遲發送 ACK(NODELAY 都沒用),這樣超時計算會算出較大 RTT 時間,延長了丟包時的判斷過程。KCP 的 ACK 是否延遲發送可以調節。

UNA vs ACK+UNA:

ARQ 模型響應有兩種,UNA(此編號前所有包已收到,如 TCP)和 ACK(該編號包已收到),光用 UNA 將導致全部重傳,光用 ACK 則丟失成本太高,以往協議都是二選其一,而 KCP 協議中,除去單獨的 ACK 包外,所有包都有 UNA 信息。

非退讓流控:

KCP 正常模式同 TCP 一樣使用公平退讓法則,即發送窗口大小由:發送緩存大小、接收端剩餘接收緩存大小、丟包退讓及慢啓動這四要素決定。但傳送及時性要求很高的小數據時,可選擇通過配置跳過後兩步,僅用前兩項來控制發送頻率。以犧牲部分公平性及帶寬利用率之代價,換取了開着 BT 都能流暢傳輸的效果。

Kcp-go 包的 Demo

Kcp 協議各個語言幾乎都對 Kcp 協議進行了實現,Go 也不例外,Go 的 Kcp 實現庫如下:

https://github.com/xtaci/kcp-go

kcp-go is a Production-Grade Reliable-UDP library for golang[3].

This library intents to provide a smooth, resilient, ordered, error-checked and anonymous delivery of streams over UDP packets, it has been battle-tested with opensource project kcptun[4]. Millions of devices(from low-end MIPS routers to high-end servers) have deployed kcp-go powered program in a variety of forms like online games, live broadcasting, file synchronization and network acceleration.

由於這個庫對 Kcp 的原生特性進行了包裝,給的案例 [5] 也是基於封裝之後的,其將 Kcp 的底層做了封裝,那麼接下來我把封裝揭開,直接使用 Kcp 進行 Demo 的製作。

揭開封裝後可以看見,實際上的 Kcp 操作無論是客戶端還是服務器的接收操作和發送操作都是按照一下幾個步驟進行:

接收操作:

1.Udp Listen 接收到數據。2. 使用 Kcp 對象調用 Input 函數將接收到的數據塞進 Input 函數,在此函數中會對傳入的數據進行 Kcp 協議頭的解析並且向發送方返回 Ack(UNA)。然後把去掉 Kcp 協議頭的數據放入 Recv Queue 中等待接收。3. 使用 Kcp 對象調用 PeekSize() 判斷 Recv Queue 中下一個可接收包的長度。4. 使用 Kcp 對象調用 Recv(),從 Recv Queue 中接收數據。5. 進行數據處理。

發送操作:

  1. 使用 Kcp 對象調用 Send() 函數,將要發送的數據進行 Kcp 協議頭封裝後,放入 Send Queue。2. 定時調用 Update() 函數進行數據發送,發送時會調用在創建 Kcp 對象時傳入的回調函數進行發送。

上面的步驟中有一步是定時調用 Update() 函數,這個函數是做啥的呢?Update() 函數的作用是判斷當前時間,如果符合一定規則,則調用 kcp.flush() 函數將處於 Send Queue 中的待發送數據發送出去,並根據需要判斷是否需要重傳、窗口滑動等操作。

Kcp-go 包的協議配置

// nodelay :是否啓用 nodelay模式,0不啓用;1啓用。
// interval :協議內部工作的 interval,單位毫秒,比如 10ms或者 20ms
// resend :快速重傳模式,默認0關閉,可以設置2(2次ACK跨越將會直接重傳)
// nc :是否關閉流控,默認是0代表不關閉,1代表關閉。
// 普通模式:`NoDelay(kcp, 0, 40, 0, 0);
// 極速模式:NoDelay(kcp, 1, 10, 2, 1);
func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int 

// 該調用將會設置協議的最大發送窗口和最大接收窗口大小,默認爲32. 
// 這個可以理解爲 TCP的 SND_BUF 和 RCV_BUF,只不過單位不一樣 SND/RCV_BUF 單位是字節,這個單位是包。
func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int 

// 純算法協議並不負責探測 MTU,默認 mtu是1400字節
// 可以使用SetMtu來設置該值。該值將會影響數據包歸併及分片時候的最大傳輸單元
func (kcp *KCP) SetMtu(mtu int) int

Kcp 的源代碼和工作過程的不負責任解析

不負責任的解析一下 Send、Recv、Input 三個重要的函數的源代碼和工作過程的解析。

Send 的源代碼解析

func (kcp *KCP) Send(buffer []byte) int {
   var count int
  // 不能只發送一個Kcp協議頭
   if len(buffer) == 0 {
      return -1
   }

   // 如果當前kcp處於流模式,則和上一次發送的組合到一起(如果能組合的話)
   // 這個地方就不細說了 正常來說使用NewKcp創建的kcp對象不會處於流模式
   if kcp.stream != 0 {
      n := len(kcp.snd_queue)
      if n > 0 {
         seg := &kcp.snd_queue[n-1]
         if len(seg.data) < int(kcp.mss) {
            capacity := int(kcp.mss) - len(seg.data)
            extend := capacity
            if len(buffer) < capacity {
               extend = len(buffer)
            }

            // grow slice, the underlying cap is guaranteed to
            // be larger than kcp.mss
            oldlen := len(seg.data)
            seg.data = seg.data[:oldlen+extend]
            copy(seg.data[oldlen:], buffer)
            buffer = buffer[extend:]
         }
      }

      if len(buffer) == 0 {
         return 0
      }
   }

  // 如果要發送的數據的長度小於mss的長度則就發一個包就行了
  // 否則進行分片
   if len(buffer) <= int(kcp.mss) {
      count = 1
   } else {
      count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
   }
    // 分片大於255就出問題了
   if count > 255 {
      return -2
   }
    // 至少有1個包 不然發他幹嘛
   if count == 0 {
      count = 1
   }

   for i := 0; i < count; i++ {
      var size int
     // 發的數據大於mss那就每片按照mss的大小發
     // 否則就按照實際長度發
      if len(buffer) > int(kcp.mss) {
         size = int(kcp.mss)
      } else {
         size = len(buffer)
      }
     // 創建一個kcp的數據段
      seg := kcp.newSegment(size)
     // 複製過去
      copy(seg.data, buffer[:size])
        // 如果不是流模式那就把frg設置爲當前是第幾片
     // 就是 假如分了10片,現在是第2片
     // 當前的frg就是10-2-1=7
     // frg就是告訴接收方後面還有幾片
      if kcp.stream == 0 {  
         seg.frg = uint8(count - i - 1)
      } else {
         seg.frg = 0
      }
     // 把組裝好了的kcp的數據段塞進發送隊列
      kcp.snd_queue = append(kcp.snd_queue, seg)
      buffer = buffer[size:]
   }
   return 0
}

Send 的工作過程

Send() 把要發送的 buffer 分片成 Kcp 的數據段,然後構建一個數據段結構體後把這個數據段塞進發送隊列裏。當要發送的數據長度超過一個 MSS(最大分片大小) 的時候(Mss 默認爲 MTU-kcp Head,即 1400-24),會對發送的數據進行分片。然後每片都放入到構建的 Kcp 數據段,然後塞進發送隊列裏等待發送。

Recv 的源代碼解析

func (kcp *KCP) Recv(buffer []byte) (n int) {
    // 看看Recv Queue中下一個消息的大小
   peeksize := kcp.PeekSize()
   if peeksize < 0 {
      return -1
   }
   if peeksize > len(buffer) {
      return -2
   }

   var fast_recover bool
  // 如果接收隊列長度大於等於當前的接收窗口大小
  // 則進入快速恢復模式
   if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
      fast_recover = true
   }
    // 在接收隊列裏把發送時大於mss而進行分片的包組合起來
    count := 0
   for k := range kcp.rcv_queue {
      seg := &kcp.rcv_queue[k]
      copy(buffer, seg.data)
      buffer = buffer[len(seg.data):]
      n += len(seg.data)
      count++
      kcp.delSegment(seg)
      if seg.frg == 0 {
         break
      }
   }
   if count > 0 {
      kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
   }
        // 從接收緩存中    拿出數據然後塞進接收隊列裏
    count = 0
   for k := range kcp.rcv_buf {
      seg := &kcp.rcv_buf[k]
      if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue)+count < int(kcp.rcv_wnd) {
         kcp.rcv_nxt++
         count++
      } else {
         break
      }
   }

   if count > 0 {
      kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
      kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
   }

  // 隊列長度小於窗口且處於快速恢復階段時
  // 告知對方自身接受窗口大小
    if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
      kcp.probe |= IKCP_ASK_TELL
   }
   return
}

Recv 的工作過程

很簡單。。沒啥可說的。。就是從接收隊列裏拿出數據然後再從接收緩存裏拿出來塞進接收隊列裏

Input 的源代碼解析

func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
  // 獲取上一個發送的una
   snd_una := kcp.snd_una
  // 獲取到的數據長度不能小於24
   if len(data) < IKCP_OVERHEAD {
      return -1
   }

   var latest uint32 
   var flag int
   var inSegs uint64
   var windowSlides bool

   for {
      var ts, sn, length, una, conv uint32
      var wnd uint16
      var cmd, frg uint8

      if len(data) < int(IKCP_OVERHEAD) {
         break
      }
            // 22行到33行就是解析收到數據的kcp協議頭
      data = ikcp_decode32u(data, &conv)
      if conv != kcp.conv {
         return -1
      }
      data = ikcp_decode8u(data, &cmd)
      data = ikcp_decode8u(data, &frg)
      data = ikcp_decode16u(data, &wnd)
      data = ikcp_decode32u(data, &ts)
      data = ikcp_decode32u(data, &sn)
      data = ikcp_decode32u(data, &una)
      data = ikcp_decode32u(data, &length)
      if len(data) < int(length) {
         return -2
      }

      if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
         cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
         return -3
      }

       // regular 表示它是來自遠程的真實數據包
      //  這意味着它不是由ReedSolomon編解碼器生成的。
      //  僅信任來自常規數據包的窗口更新。即最新更新
      if regular {
         kcp.rmt_wnd = uint32(wnd)
      }
     // 如果解析到的una大於0
     // 那麼後續就會執行窗口滑動
      if kcp.parse_una(una) > 0 {
         windowSlides = true
      }
     // 收縮
      kcp.shrink_buf()
            // 根據cmd的類型執行不同的操作
      if cmd == IKCP_CMD_ACK {
         kcp.parse_ack(sn)
         kcp.parse_fastack(sn, ts)
         flag |= 1
         latest = ts
      } else if cmd == IKCP_CMD_PUSH {
         repeat := true
         if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
            kcp.ack_push(sn, ts)
            if _itimediff(sn, kcp.rcv_nxt) >= 0 {
               var seg segment
               seg.conv = conv
               seg.cmd = cmd
               seg.frg = frg
               seg.wnd = wnd
               seg.ts = ts
               seg.sn = sn
               seg.una = una
               seg.data = data[:length] 
               repeat = kcp.parse_data(seg)
            }
         }
         if regular && repeat {
            atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
         }
      } else if cmd == IKCP_CMD_WASK {
         // ready to send back IKCP_CMD_WINS in Ikcp_flush
         // tell remote my window size
         kcp.probe |= IKCP_ASK_TELL
      } else if cmd == IKCP_CMD_WINS {
         // do nothing
      } else {
         return -3
      }

      inSegs++
      data = data[length:]
   }
   atomic.AddUint64(&DefaultSnmp.InSegs, inSegs)

   // update rtt with the latest ts
   // ignore the FEC packet
   if flag != 0 && regular {
      current := currentMs()
      if _itimediff(current, latest) >= 0 {
         kcp.update_ack(_itimediff(current, latest))
      }
   }

   // cwnd update when packet arrived
   if kcp.nocwnd == 0 {
      if _itimediff(kcp.snd_una, snd_una) > 0 {
         if kcp.cwnd < kcp.rmt_wnd {
            mss := kcp.mss
            if kcp.cwnd < kcp.ssthresh {
               kcp.cwnd++
               kcp.incr += mss
            } else {
               if kcp.incr < mss {
                  kcp.incr = mss
               }
               kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
               if (kcp.cwnd+1)*mss <= kcp.incr {
                  if mss > 0 {
                     kcp.cwnd = (kcp.incr + mss - 1) / mss
                  } else {
                     kcp.cwnd = kcp.incr + mss - 1
                  }
               }
            }
            if kcp.cwnd > kcp.rmt_wnd {
               kcp.cwnd = kcp.rmt_wnd
               kcp.incr = kcp.rmt_wnd * mss
            }
         }
      }
   }
   if windowSlides { // if window has slided, flush
      kcp.flush(false)
   } else if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
      kcp.flush(true)
   }
   return 0
}

Input 的工作過程

Input 的源代碼很多,作者也僅僅是做了一點的註釋,總的來說 Input 的工作過程大概可以解釋爲如下步驟:

  1. 傳入 Udp 接收到的 Kcp 數據包,然後解析前 24 字節的 Kcp 協議頭。2. 根據解析到的協議頭的 cmd 字段去進行對應的操作處理。3. 然後根據情況更新 RTT。4. 更新窗口大小。5. 如果窗口滑動了或者選擇了立即確認模式則立刻刷新。

在 Input 中有個很重要的函數就是 func (kcp *KCP) flush(ackOnly bool) uint32,他負責將處於發送緩存中的數據發出去,這個函數很長,就不在此放代碼了。這個函數非常的重要,kcp 的重要參數都是在調節這個函數的行爲,這個函數只有一個參數 ackOnly,意思就是隻發送 ack,如果 ackOnly 爲 true 的話,該函數只遍歷 ack 列表,然後發送,就完事了。如果不是,也會發送真實數據。在發送數據前先進行 windSize 探測,如果開啓了擁塞控制 nc=0,則每次發送前檢測服務端的 winsize,如果服務端的 winsize 變小了,自身的 winsize 也要更着變小,來避免擁塞。如果沒有開啓擁塞控制,就按設置的 winsize 進行數據發送 [6]。

  2. 如果這個段數據的當前時間大於它自身重發的時間,也就是 RTO,則重傳消息。

    3. 如果這個段數據的 ack 丟失累計超過 resent 次數,則重傳,也就是快速重傳機制。這個 resent 參數由resend參數決定。    4. 如果這個段數據的 ack 有丟失且沒有新的數據段,則觸發 ER,ER 相關信息 ER

Kcp 的工作流程圖展示

最後放一下 Kcp 的工作流程作爲結束。

引用

References

[1] 官網: https://github.com/skywind3000/kcp
[2] 官網: https://github.com/skywind3000/kcp
[3] golang: https://golang.org/
[4] kcptun: https://github.com/xtaci/kcptun
[5] 案例: https://github.com/xtaci/kcp-go/blob/master/examples/echo.go
[6]  https://cloud.tencent.com/developer/article/1165876

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/F-1XNCoIT4n-2vq5ULrEdA