如何實現可靠的 UDP ?

概述

UDP 是無連接的,盡最大可能交付 (網絡 IP 層),不提供流量控制,擁塞控制,面向報文(對於應用程序的數據不合並也不拆分,直接添加 UDP 首部),支持一對一、一對多、多對一和多對多 (也就是多播和廣播) 通信。

TCP 和 UDP 區別總結

TCP 實現可靠傳輸層的核心有三點:

  1. 確認與重傳 (已經可以滿足 “可靠性”,但是可能存在性能問題)

  2. 滑動窗口 (也就是流量控制,爲了提高吞吐量,充分利用鏈路帶寬,避免發送方發的太慢)

  3. 擁塞控制 (防止網絡鏈路過載造成丟包,避免發送方發的太快)

如果希望使用 UDP 來實現 TCP 的可靠傳輸,顯然 最直接的方法就是在應用層實現 確認與重傳。本文沿着這個思路,看看利用 UDP 來實現確認與重傳機制時的設計思路和實現僞代碼。

除了確認和重傳外,TCP 校驗和也是 TCP 實現可靠傳輸的手段之一,但這不是本文的重點,所以這裏一筆帶過。

爲了節省篇幅,本文將可靠傳輸的模型實現簡化爲:

發送方實現重傳機制,接收方實現確認機制。


數據結構

爲了簡化代碼和便於理解,下文中將每個 UDP 數據包封裝一個對象,字段及含義如下所示。

// 數據包標誌位類型
type FlagType uint8

const (
 FlagTypeInvalid FlagType = iota
 FlagTypeData             // 數據包
 FlagTypeAck              // 確認包
)

// 定義數據包的結構體
type Packet struct {
 Seq  int      // 序列號
 Ack  int      // 確認號
 Data string   // 數據內容
 Flag FlagType // 標誌位
}

確認機制

確認機制由接收方實現,在本文中也就是服務端程序。

1. 單個數據包確認

所謂單個確認,也就是常見的 Reply 形式,發送方 (客戶端) 向接收方發送一個 UDP 數據包,對於每個接收到的 UDP 數據包,接收方 (服務端) 都向發送方發送一個確認 ACK 數據包。

單個數據包確認示例圖

實現思路非常簡單:

  1. 服務端程序使用 UDP 監聽指定端口

  2. 客戶端向服務端發送 UDP 數據包

  3. 服務端收到客戶端的 UDP 數據包之後,向客戶端發送 ACK 數據包

  4. 客戶端收到服務端的 ACK 數據包之後,更新 Seq 值

最終的程序代碼及其對應的註釋如下。

// V1 版本

package main

import (
 "fmt"
 "net"
 "strconv"
 "strings"
 "time"
)

// 數據包標誌位類型
type FlagType uint8

const (
 FlagTypeInvalid FlagType = iota
 FlagTypeData             // 數據包
 FlagTypeAck              // 確認包
)

// 定義數據包的結構體
type Packet struct {
 Seq  int      // 序列號
 Ack  int      // 確認號
 Data string   // 數據內容
 Flag FlagType // 標誌位
}

var (
 // 服務端地址
 serverAddr = net.UDPAddr{
  Port: 8080,
  IP:   net.ParseIP("127.0.0.1"),
 }
)

func main() {
 go startServer()

 // 等待服務端程序啓動
 time.Sleep(200 * time.Millisecond)

 startClient()
}

// 服務端程序
func startServer() {
 conn, err := net.ListenUDP("udp"&serverAddr)
 if err != nil {
  fmt.Println("Error starting server:", err)
  return
 }
 defer conn.Close()

 buffer := make([]byte, 1024)

 for {
  n, clientAddr, err := conn.ReadFromUDP(buffer)
  if err != nil {
   fmt.Println("Error reading:", err)
   continue
  }

  // 解析接收到的數據包
  recvPacket := decode(buffer[:n])

  fmt.Printf("client -> server %s\n", serialization(&recvPacket))

  // 構造 Ack 包併發送
  ackPacket := Packet{
   // 因爲這個示例中
   // 服務端不主動發送數據
   // 所以 Seq 固定爲 1
   Seq:  1,
   Ack:  recvPacket.Seq + len(recvPacket.Data),
   Data: "",
   Flag: FlagTypeAck,
  }

  ackData := encode(&ackPacket)
  conn.WriteToUDP(ackData, clientAddr)
 }
}

// 客戶端程序
func startClient() {
 conn, err := net.DialUDP("udp", nil, &serverAddr)
 if err != nil {
  fmt.Println("Error connecting:", err)
  return
 }
 defer conn.Close()

 // 發送一個數據包
 packet := Packet{
  Seq:  1, // 客戶端 Seq 值從 1 開始
  Ack:  1,
  Data: "Hello Server",
  Flag: FlagTypeData,
 }

 // 發送 5 個 UDP 數據包
 for i := 0; i < 5; i++ {
  data := encode(&packet)
  conn.Write(data)

  // 接收 Ack 包
  buffer := make([]byte, 1024)
  n, _, err := conn.ReadFromUDP(buffer)
  if err != nil {
   fmt.Println("Error reading:", err)
   return
  }

  recvAckPacket := decode(buffer[:n])
  fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))

  // 更新下次發送數據包的 Seq 值
  packet.Seq = recvAckPacket.Ack
 }
}

// Packet 數據包編碼
// 使用字符串拼接作爲簡單實現
func encode(p *Packet) []byte {
 return []byte(fmt.Sprintf("%d|%d|%q|%d", p.Seq, p.Ack, p.Data, p.Flag))
}

// Packet 數據包解碼
func decode(data []byte) Packet {
 var p Packet
 _, _ = fmt.Sscanf(string(data)"%d|%d|%q|%d"&p.Seq, &p.Ack, &p.Data, &p.Flag)
 return p
}

// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
 var sb strings.Builder

 if p.Flag == FlagTypeData {
  // 無需任何標誌位渲染
  // 輸出佔位符美化終端顯示
  sb.WriteString("     ")
 } else if p.Flag == FlagTypeAck {
  sb.WriteString("[ACK]")
 } else {
  sb.WriteString("[Unknown]")
 }

 sb.WriteString(" Seq=")
 sb.WriteString(strconv.Itoa(p.Seq))

 if p.Flag == FlagTypeAck {
  sb.WriteString(" Ack=")
  sb.WriteString(strconv.Itoa(p.Ack))
 }

 sb.WriteString(" Len=")
 sb.WriteString(strconv.Itoa(len(p.Data)))

 if p.Flag == FlagTypeData {
  sb.WriteString(" Data=")
  sb.WriteString(p.Data)
 }

 return sb.String()
}

運行程序的輸出如下:

通過輸出結果可以看到,單個數據包的確認機制實現,已經可以正常工作了。

2. 延遲確認

服務端在不發送數據的情況下,每收到一個 UDP 數據包,就發送 Ack 報文,導致了低效的數據傳輸和浪費網絡帶寬,也就是所謂的 “糊塗窗口綜合症”。

既然服務端沒有什麼數據要發送給客戶端,那麼就可以延遲一段時間再發送 Ack 報文, 如果在延遲期間,又接收到了新的數據,就可以將多個 Ack 報文合併到一個數據包裏面發送了。

延遲確認示例圖

當然,要重構的主要服務端程序和客戶端程序的代碼,修改後的代碼如下所示。

// V2 版本

// 其他重複代碼省略
// ...


// 服務端程序
func startServer() {
 conn, err := net.ListenUDP("udp"&serverAddr)
 if err != nil {
  fmt.Println("Error starting server:", err)
  return
 }
 defer conn.Close()

 buffer := make([]byte, 1024)

 // 延遲 200 毫秒發送 ACK
 const ackDelay = 200 * time.Millisecond

 var (
  // 延遲 Ack
  lastAck int
  // 最後發送 Ack 報文的時間
  lastAckTime = time.Now()
  // 客戶端的 UDP 地址
  clientAddr *net.UDPAddr
 )

 // 因爲 conn.ReadFromUDP 方法是阻塞接收操作
 // 所以這裏啓動一個新的 goroutine
 // 來完成延遲 Ack 操作
 go func() {
  for {
   // 超過延遲時間,發送 Ack 確認包
   if time.Since(lastAckTime) >= ackDelay {
    // 超過延遲時間,發送 Ack 確認包
    // 構造 Ack 包併發送
    ackPacket := Packet{
     // 因爲這個示例中
     // 服務端不主動發送數據
     // 所以 Seq 固定爲 1
     Seq:  1,
     Ack:  lastAck,
     Data: "",
     Flag: FlagTypeAck,
    }

    ackData := encode(&ackPacket)
    conn.WriteToUDP(ackData, clientAddr)

    // 更新最後發送 Ack 的時間
    lastAckTime = time.Now()
   }

   // 短暫休眠,避免佔用過多 CPU 資源
   time.Sleep(100 * time.Millisecond)
  }
 }()

 for {
  _, clientAddr, err = conn.ReadFromUDP(buffer)
  if err != nil {
   fmt.Println("Error reading:", err)
   continue
  }

  // 解析接收到的數據包
  recvPacket := decode(buffer[:])

  fmt.Printf("client -> server %s\n", serialization(&recvPacket))

  // 更新最後接收到的確認號
  lastAck = recvPacket.Seq + len(recvPacket.Data)
 }
}

// 客戶端程序
func startClient() {
 conn, err := net.DialUDP("udp", nil, &serverAddr)
 if err != nil {
  fmt.Println("Error connecting:", err)
  return
 }
 defer conn.Close()

 // 構建一個 UDP 數據包
 packet := Packet{
  Seq:  1, // 客戶端 Seq 值從 1 開始
  Ack:  1,
  Data: "Hello Server",
  Flag: FlagTypeData,
 }

 var wg sync.WaitGroup
 wg.Add(1)

 // 這裏啓動一個新的 goroutine
 // 來完成接收 Ack 操作
 go func() {
  defer wg.Done()

  // 接收 Ack 包
  buffer := make([]byte, 1024)
  _, _, err := conn.ReadFromUDP(buffer)
  if err != nil {
   fmt.Println("Error reading:", err)
   return
  }

  recvAckPacket := decode(buffer[:])
  fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))
 }()

 // 連續發送 5 個 UDP 數據包
 for i := 0; i < 5; i++ {
  data := encode(&packet)
  conn.Write(data)

  // 更新下次發送數據包的 Seq 值
  packet.Seq += len(packet.Data)
 }

 // 等待 Ack 報文接收完成
 wg.Wait()
}


// 其他重複代碼省略
// ...

運行程序的輸出如下:

通過輸出結果可以看到,客戶端連續發送了 5 個 UDP 數據包,但是因爲服務端啓動了延遲確認,最終發送給客戶端的 Ack 報文只有 1 個。

3. 選擇性確認

在選擇性重傳中,接收方通過 SAck 向發送方應答已經收到的非連續數據包,發送方可以作爲依據,來重傳接收方沒有收到 (可能已經丟失) 的數據包。

如圖所示,SAck = 1361 ~ 2721 表示這個區間的數據包已經收到了。

代碼實現也很簡單,就是接收方記錄已經接收到的數據包 Seq,並定時將每個區間 Seq 的最大值作爲 Ack 報文響應給發送方。

因爲我們的僞代碼只考慮接收方實現選擇性確認,所以只需要在剛纔的代碼基礎上,對服務端和客戶端代碼稍加調整即可。

選擇性確認示例圖

最後修改後的代碼如下所示。

// V3 版本

package main

import (
 "fmt"
 "net"
 "strconv"
 "strings"
 "sync"
 "time"
)

// 數據包標誌位類型
type FlagType uint8

const (
 FlagTypeInvalid FlagType = iota
 FlagTypeData             // 數據包
 FlagTypeAck              // 確認包
)

// 定義數據包的結構體
type Packet struct {
 Seq  int      // 序列號
 Ack  int      // 確認號
 SAck string   // SAck 區間
 Data string   // 數據內容
 Flag FlagType // 標誌位
}

var (
 // 服務端地址
 serverAddr = net.UDPAddr{
  Port: 8080,
  IP:   net.ParseIP("127.0.0.1"),
 }
)

func main() {
 go startServer()

 // 等待服務端程序啓動
 time.Sleep(200 * time.Millisecond)

 startClient()
}

// 服務端程序
func startServer() {
 conn, err := net.ListenUDP("udp"&serverAddr)
 if err != nil {
  fmt.Println("Error starting server:", err)
  return
 }
 defer conn.Close()

 buffer := make([]byte, 32)

 // 延遲 200 毫秒發送 ACK
 const ackDelay = 200 * time.Millisecond

 var (
  // 延遲 Ack
  lastAck int

  // 記錄接收到的區間 Seq
  // [0]: 區間起始 Seq
  // [1]: 區間結束 Seq, Seq + Data.Len()
  seqList = [][2]int{}

  // 最後發送 Ack 報文的時間
  lastAckTime = time.Now()
  // 客戶端的 UDP 地址
  clientAddr *net.UDPAddr
 )

 // 因爲 conn.ReadFromUDP 方法是阻塞接收操作
 // 所以這裏啓動一個新的 goroutine
 // 來完成延遲 Ack 操作
 go func() {
  for {
   // 超過延遲時間,發送 Ack 確認包
   if time.Since(lastAckTime) >= ackDelay && len(seqList) > 0 {
    // 超過延遲時間,發送 Ack 確認包
    // 構造 Ack 包併發送

    lastAck = seqList[0][1]
    lastAckChanged := false

    // 因爲丟包,可能存在多個區間 Ack 確認包
    // 所以需要分開單獨發送
    // 根據 Seq 合併區間
    mergedSeqList := [][2]int{
     seqList[0],
    }

    for i := 1; i < len(seqList); i++ {
     // 數據包 Seq 是連續的,直接合並兩個區間
     if seqList[i][0] == mergedSeqList[len(mergedSeqList)-1][1] {
      mergedSeqList[len(mergedSeqList)-1][1] = seqList[i][1]

      // 更新最後接收到的確認號
      if !lastAckChanged {
       lastAck = mergedSeqList[len(mergedSeqList)-1][1]
      }
     } else {
      lastAckChanged = true

      // 數據包 Seq 不是連續的,有中間數據包還未收到
      mergedSeqList = append(mergedSeqList, seqList[i])
     }
    }

    for _, seq := range mergedSeqList {
     ackPacket := Packet{
      // 因爲這個示例中
      // 服務端不主動發送數據
      // 所以 Seq 固定爲 1
      Seq:  1,
      Ack:  lastAck,
      SAck: fmt.Sprintf("%d-%d", seq[0], seq[1]),
      Data: "",
      Flag: FlagTypeAck,
     }

     ackData := encode(&ackPacket)
     conn.WriteToUDP(ackData, clientAddr)
    }

    // 更新最後發送 Ack 的時間
    lastAckTime = time.Now()

    // 重置區間 Seq
    seqList = seqList[:0]
   }

   // 短暫休眠,避免佔用過多 CPU 資源
   time.Sleep(100 * time.Millisecond)
  }
 }()

 for {
  _, clientAddr, err = conn.ReadFromUDP(buffer)
  if err != nil {
   fmt.Println("Error reading:", err)
   continue
  }

  // 解析接收到的數據包
  recvPacket := decode(buffer[:])

  fmt.Printf("client -> server %s\n", serialization(&recvPacket))

  // 記錄接收到的區間 Seq
  seqList = append(seqList, [2]int{
   recvPacket.Seq,
   recvPacket.Seq + len(recvPacket.Data),
  })
 }
}

// 客戶端程序
func startClient() {
 conn, err := net.DialUDP("udp", nil, &serverAddr)
 if err != nil {
  fmt.Println("Error connecting:", err)
  return
 }
 defer conn.Close()

 var wg sync.WaitGroup
 wg.Add(1)

 // 這裏啓動一個新的 goroutine
 // 來完成接收 Ack 操作
 go func() {
  defer wg.Done()

  t := time.NewTimer(1 * time.Second)
  defer t.Stop()

  for {
   select {
   case <-t.C:
    return
   default:
    // 接收 Ack 包
    buffer := make([]byte, 32)

    conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
    _, _, err := conn.ReadFromUDP(buffer)
    if err != nil {
     continue
    }

    recvAckPacket := decode(buffer[:])
    fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))
   }
  }
 }()

 // 構建一個 UDP 數據包
 packet := Packet{
  Seq:  1, // 客戶端 Seq 值從 1 開始
  Ack:  1,
  Data: "Hello Server",
  Flag: FlagTypeData,
 }

 // 連續發送 5 個 UDP 數據包
 for i := 0; i < 5; i++ {
  // 第 4 個數據包模擬丟包
  if i != 3 {
   data := encode(&packet)
   conn.Write(data)
  }

  // 更新下次發送數據包的 Seq 值
  packet.Seq += len(packet.Data)
 }

 // 等待 Ack 報文接收完成
 wg.Wait()
}

// Packet 數據包編碼
// 使用字符串拼接作爲簡單實現
func encode(p *Packet) []byte {
 return []byte(fmt.Sprintf("%d|%d|%q|%q|%d", p.Seq, p.Ack, p.SAck, p.Data, p.Flag))
}

// Packet 數據包解碼
func decode(data []byte) Packet {
 var p Packet
 _, _ = fmt.Sscanf(string(data)"%d|%d|%q|%q|%d"&p.Seq, &p.Ack, &p.SAck, &p.Data, &p.Flag)
 return p
}

// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
 var sb strings.Builder

 if p.Flag == FlagTypeData {
  // 無需任何標誌位渲染
  // 輸出佔位符美化終端顯示
  sb.WriteString("     ")
 } else if p.Flag == FlagTypeAck {
  sb.WriteString("[ACK]")
 } else {
  sb.WriteString("[Unknown]")
 }

 sb.WriteString(" Seq=")
 sb.WriteString(strconv.Itoa(p.Seq))

 if p.Flag == FlagTypeAck {
  sb.WriteString(" Ack=")
  sb.WriteString(strconv.Itoa(p.Ack))

  if len(p.SAck) > 0 {
   sb.WriteString(" SAck=")
   sb.WriteString(p.SAck)
  }
 }

 sb.WriteString(" Len=")
 sb.WriteString(strconv.Itoa(len(p.Data)))

 if p.Flag == FlagTypeData {
  sb.WriteString(" Data=")
  sb.WriteString(p.Data)
 }

 return sb.String()
}

運行程序的輸出如下:

通過輸出結果可以看到,客戶端連續發送了 5 個 UDP 數據包,其中第 4 個包模擬丟包 (服務端接收不到),但是因爲服務端啓動了選擇性確認,所以最終發送給客戶端的 Ack 報文有 2 個:

客戶端根據這兩個信息,就可以判斷出丟包的具體數據包,也就是 Seq 在 37 號到 49 號之間的數據包,具體來說,也就是下面這個數據包:

client -> server       Seq=37 Len=12 Data=Hello Server

小結

使用 UDP 實現可靠性傳輸中的 確認機制,接收方 (服務端) 已經完成了,接下來就是發送方 (客戶端) 要實現的 重傳機制。有了前文的基礎後,客戶端部分代碼實現起來應該也很快,繼續 Coding :-)


重傳機制

重傳機制由發送方實現,在本文中也就是客戶端端程序。

1. 超時重傳

爲了簡化實現,本文不計算數據包往返的 RTT, RTO (超時重傳時間) 直接採用 1 個暴力的硬編碼值: 300 毫秒。

此外,因爲前文中接收方 (服務端) 已經實現了選擇性確認,所以這裏將 超時重傳 + 選擇性重傳 一起實現。

最後修改後的代碼如下所示。

// V4 版本

package main

import (
 "fmt"
 "net"
 "sort"
 "strconv"
 "strings"
 "sync"
 "time"
)

// 數據包標誌位類型
type FlagType uint8

const (
 FlagTypeInvalid FlagType = iota
 FlagTypeData             // 數據包
 FlagTypeAck              // 確認包
)

// 定義數據包的結構體
type Packet struct {
 Seq        int      // 序列號
 Ack        int      // 確認號
 SAck       string   // SAck 區間
 Data       string   // 數據內容
 Flag       FlagType // 標誌位
 Retransmit bool     // 重傳標誌位
}

var (
 // 服務端地址
 serverAddr = net.UDPAddr{
  Port: 8080,
  IP:   net.ParseIP("127.0.0.1"),
 }
)

func main() {
 go startServer()

 // 等待服務端程序啓動
 time.Sleep(200 * time.Millisecond)

 startClient()
}

// 服務端程序
func startServer() {
 conn, err := net.ListenUDP("udp"&serverAddr)
 if err != nil {
  fmt.Println("Error starting server:", err)
  return
 }
 defer conn.Close()

 buffer := make([]byte, 32)

 // 延遲 200 毫秒發送 ACK
 const ackDelay = 200 * time.Millisecond

 var (
  // 延遲 Ack
  lastAck int

  // 記錄接收到的區間 Seq
  // [0]: 區間起始 Seq
  // [1]: 區間結束 Seq, Seq + Data.Len()
  seqList = [][2]int{}

  // 記錄歷史接收到的所有區間 Seq
  seqRecord = [][2]int{}

  // 最後發送 Ack 報文的時間
  lastAckTime = time.Now()
  // 客戶端的 UDP 地址
  clientAddr *net.UDPAddr
 )

 // 因爲 conn.ReadFromUDP 方法是阻塞接收操作
 // 所以這裏啓動一個新的 goroutine
 // 來完成延遲 Ack 操作
 go func() {
  for {
   // 超過延遲時間,發送 Ack 確認包
   if time.Since(lastAckTime) >= ackDelay && len(seqList) > 0 {
    // 超過延遲時間,發送 Ack 確認包
    // 構造 Ack 包併發送

    lastAck = seqList[0][1]
    lastAckChanged := false

    // 因爲丟包,可能存在多個區間 Ack 確認包
    // 所以需要分開單獨發送
    // 根據 Seq 合併區間
    mergedSeqList := [][2]int{
     seqList[0],
    }

    for i := 1; i < len(seqList); i++ {
     // 數據包 Seq 是連續的,直接合並兩個區間
     if seqList[i][0] == mergedSeqList[len(mergedSeqList)-1][1] {
      mergedSeqList[len(mergedSeqList)-1][1] = seqList[i][1]

      // 更新最後接收到的確認號
      if !lastAckChanged {
       lastAck = mergedSeqList[len(mergedSeqList)-1][1]
      }
     } else {
      lastAckChanged = true

      // 數據包 Seq 不是連續的,有中間數據包還未收到
      mergedSeqList = append(mergedSeqList, seqList[i])
     }
    }

    for _, seq := range mergedSeqList {
     ackPacket := Packet{
      // 因爲這個示例中
      // 服務端不主動發送數據
      // 所以 Seq 固定爲 1
      Seq:  1,
      Ack:  lastAck,
      SAck: fmt.Sprintf("%d-%d", seq[0], seq[1]),
      Data: "",
      Flag: FlagTypeAck,
     }

     ackData := encode(&ackPacket)
     conn.WriteToUDP(ackData, clientAddr)
    }

    // 更新最後發送 Ack 的時間
    lastAckTime = time.Now()

    // 重置區間 Seq
    seqList = seqList[:0]
   }

   // 短暫休眠,避免佔用過多 CPU 資源
   time.Sleep(100 * time.Millisecond)
  }
 }()

 for {
  _, clientAddr, err = conn.ReadFromUDP(buffer)
  if err != nil {
   fmt.Println("Error reading:", err)
   continue
  }

  // 解析接收到的數據包
  recvPacket := decode(buffer[:])

  fmt.Printf("client -> server %s\n", serialization(&recvPacket))

  // 記錄歷史區間 Seq
  seqRecord = append(seqRecord, [2]int{
   recvPacket.Seq,
   recvPacket.Seq + len(recvPacket.Data),
  })

  // 這裏假設重傳的數據包 100% 接收成功
  // 服務端直接返回確認 Ack 報文
  // 簡化對重傳數據包的再次 Ack 的實現機制
  if recvPacket.Retransmit {
   // 排序合併後的區間
   sort.Slice(seqRecord, func(i, j int) bool {
    return seqRecord[i][0] < seqRecord[j][0] && seqRecord[i][1] < seqRecord[j][1]
   })
   // 合併重複區間
   // 合併重複區間
   uniqueIndex := 0
   for i := 1; i < len(seqRecord); i++ {
    if seqRecord[i][0] == seqRecord[uniqueIndex][1] {
     seqRecord[uniqueIndex][1] = seqRecord[i][1]
    } else {
     uniqueIndex++
    }
   }
   seqRecord = seqRecord[:uniqueIndex+1]

   // 更新已經接收到連續區間最大 Ack
   lastAck = seqRecord[0][1]

   recvPacket.SAck = fmt.Sprintf("%d-%d", recvPacket.Seq, recvPacket.Seq+len(recvPacket.Data))
   recvPacket.Ack = lastAck

   recvPacket.Seq = 1
   recvPacket.Flag = FlagTypeAck
   conn.WriteToUDP(encode(&recvPacket), clientAddr)
   continue
  }

  // 記錄接收到的區間 Seq
  seqList = append(seqList, [2]int{
   recvPacket.Seq,
   recvPacket.Seq + len(recvPacket.Data),
  })
 }
}

// 客戶端程序
func startClient() {
 conn, err := net.DialUDP("udp", nil, &serverAddr)
 if err != nil {
  fmt.Println("Error connecting:", err)
  return
 }
 defer conn.Close()

 // 記錄客戶端已經發送過的數據包 Seq 列表
 sentPackets := []*Packet{}
 // 記錄客戶端已經接收到的數據包 Seq 列表
 receivedPackets := []*Packet{}

 var wg sync.WaitGroup
 wg.Add(1)

 // 這裏啓動一個新的 goroutine
 // 1. 完成超時重傳
 // 2. 完成接收 Ack 操作
 go func() {
  defer wg.Done()

  // 超時退出
  timeout := time.NewTimer(1 * time.Second)
  defer timeout.Stop()

  // 超時重傳定時器
  // 硬編碼爲 300 毫秒
  ticket := time.NewTicker(300 * time.Millisecond)
  defer ticket.Stop()

  for {
   select {
   case <-timeout.C:
    return
   case <-ticket.C:
    // 發送的數據包已經被接收方全部確認
    // 無需重傳
    if len(sentPackets) == len(receivedPackets) {
     continue
    }

    // 通過區間差集算法
    // 同時考慮 選擇性確認 的情況
    lostPackets := []*Packet{}
    receivedAckList := [][2]int{}
    for _, val := range receivedPackets {
     ackBlock := strings.Split(val.SAck, "-")
     start, _ := strconv.ParseInt(ackBlock[0], 10, 64)
     end, _ := strconv.ParseInt(ackBlock[1], 10, 64)
     receivedAckList = append(receivedAckList, [2]int{
      int(start),
      int(end),
     })
    }

    // 排序合併後的區間
    sort.Slice(receivedAckList, func(i, j int) bool {
     return receivedAckList[i][0] < receivedAckList[j][0] && receivedAckList[i][1] < receivedAckList[j][1]
    })
    // 合併重複區間
    uniqueIndex := 0
    for i := 1; i < len(receivedAckList); i++ {
     if receivedAckList[i][0] == receivedAckList[uniqueIndex][1] {
      receivedAckList[uniqueIndex][1] = receivedAckList[i][1]
     } else {
      uniqueIndex++
     }
    }
    receivedAckList = receivedAckList[:uniqueIndex+1]

    // 計算丟失的數據包
    curRecvIndex := 0
    for i, val := range sentPackets {
     if curRecvIndex >= len(receivedPackets) {
      lostPackets = append(lostPackets, val)
      continue
     }
     if val.Seq > receivedAckList[curRecvIndex][1] {
      curRecvIndex++
      lostPackets = append(lostPackets, sentPackets[i-1])
     }
    }

    for _, val := range lostPackets {
     // 構建 1 個 UDP 數據包
     packet := Packet{
      Seq:        val.Seq,
      Ack:        1,
      Data:       "Hello Server",
      Flag:       FlagTypeData,
      Retransmit: true,
     }

     data := encode(&packet)
     conn.Write(data)
    }
   default:
    // 接收 Ack 包
    buffer := make([]byte, 32)

    conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
    _, _, err := conn.ReadFromUDP(buffer)
    if err != nil {
     continue
    }

    recvAckPacket := decode(buffer[:])
    fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))

    // 更新接收到的數據包 Seq
    receivedPackets = append(receivedPackets, &recvAckPacket)
   }
  }
 }()

 //  客戶端 Seq 值從 1 開始
 curSeq := 1

 // 連續發送 5 個 UDP 數據包
 for i := 0; i < 5; i++ {
  // 構建 1 個 UDP 數據包
  packet := Packet{
   Seq:  curSeq,
   Ack:  1,
   Data: "Hello Server",
   Flag: FlagTypeData,
  }

  // 更新發送過的數據包 Seq
  sentPackets = append(sentPackets, &packet)

  // 第 4 個數據包模擬丟包
  if i != 3 {
   data := encode(&packet)
   conn.Write(data)
  }

  // 更新下次發送數據包的 Seq 值
  curSeq += len(packet.Data)
 }

 // 等待 Ack 報文接收完成
 wg.Wait()
}

// Packet 數據包編碼
// 使用字符串拼接作爲簡單實現
func encode(p *Packet) []byte {
 return []byte(fmt.Sprintf("%d|%d|%q|%q|%d|%t", p.Seq, p.Ack, p.SAck, p.Data, p.Flag, p.Retransmit))
}

// Packet 數據包解碼
func decode(data []byte) Packet {
 var p Packet
 _, _ = fmt.Sscanf(string(data)"%d|%d|%q|%q|%d|%t"&p.Seq, &p.Ack, &p.SAck, &p.Data, &p.Flag, &p.Retransmit)
 return p
}

// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
 var sb strings.Builder

 if p.Retransmit {
  sb.WriteString("[TCP Retransmit] ")
 }

 if p.Flag == FlagTypeData {
  // 無需任何標誌位渲染
  // 輸出佔位符美化終端顯示
  if !p.Retransmit {
   sb.WriteString("     ")
  }
 } else if p.Flag == FlagTypeAck {
  sb.WriteString("[ACK]")
 } else {
  sb.WriteString("[Unknown]")
 }

 sb.WriteString(" Seq=")
 sb.WriteString(strconv.Itoa(p.Seq))

 if p.Flag == FlagTypeAck {
  sb.WriteString(" Ack=")
  sb.WriteString(strconv.Itoa(p.Ack))

  if len(p.SAck) > 0 {
   sb.WriteString(" SAck=")
   sb.WriteString(p.SAck)
  }
 }

 sb.WriteString(" Len=")
 sb.WriteString(strconv.Itoa(len(p.Data)))

 if p.Flag == FlagTypeData {
  sb.WriteString(" Data=")
  sb.WriteString(p.Data)
 }

 return sb.String()
}

運行程序的輸出如下:

通過輸出結果可以看到,客戶端連續發送了 5 個 UDP 數據包,其中第 4 個包模擬丟包 (服務端接收不到),但是因爲服務端啓動了選擇性確認,所以最終發送給客戶端的 Ack 報文有 2 個:

客戶端根據這兩個信息,就可以判斷出丟包的具體數據包,也就是 Seq 在 37 號到 49 號之間的數據包,具體來說,也就是下面這個數據包:

client -> server       Seq=37 Len=12 Data=Hello Server

客戶端在超時計時器觸發後,通過對比 已經收到的數據包 Ack已經發送的數據包 Seq 集合,計算出還未接受到的數據包,也就是丟包數據,然後重新發送,通過輸出的結果,可以看到對應數據包中的 [TCP Retransmit] 標識信息。

2. 快速重傳

快速重傳機制依賴於重複確認(Duplicate Acknowledgments, Dup ACK)來檢測數據包丟失,當接收方接收到一個亂序 (不連續) 的數據包時,會重新發送對最後一個按序 (連續) 到達的數據包的 Ack, 發送方收到一定數量 (3 個) 的重複 Ack 之後,認爲數據包 (可能已經) 丟失,並立即重傳該數據包。

實現方面,只需要通過在 超時重傳 的代碼基礎上,對服務端程序略加修改,通過程序打亂接收到數據包的順序,來模擬亂序到達,然後對於亂序的數據包,發送對應的 Dup ACK 響應報文即可。

最後修改後的代碼如下所示。

// V5 版本

// 其他重複代碼省略
// ...

const (
 FlagTypeInvalid FlagType = iota
 FlagTypeData             // 數據包
 FlagTypeAck              // 確認包
 FlagTypeDupAck           // 快速重傳包
)


// 服務端程序
func startServer() {
 conn, err := net.ListenUDP("udp"&serverAddr)
 if err != nil {
  fmt.Println("Error starting server:", err)
  return
 }
 defer conn.Close()

 buffer := make([]byte, 32)

 // 延遲 200 毫秒發送 ACK
 const ackDelay = 200 * time.Millisecond

 var (
  // 延遲 Ack
  lastAck int

  // 記錄接收到的區間 Seq
  // [0]: 區間起始 Seq
  // [1]: 區間結束 Seq, Seq + Data.Len()
  seqList = [][2]int{}

  // 記錄歷史接收到的所有區間 Seq
  seqRecord = [][2]int{}

  // 最後發送 Ack 報文的時間
  lastAckTime = time.Now()
  // 客戶端的 UDP 地址
  clientAddr *net.UDPAddr
 )

 // 因爲 conn.ReadFromUDP 方法是阻塞接收操作
 // 所以這裏啓動一個新的 goroutine
 // 來完成延遲 Ack 操作
 go func() {
  for {
   // 超過延遲時間,發送 Ack 確認包
   if time.Since(lastAckTime) >= ackDelay && len(seqList) > 0 {
    // 超過延遲時間,發送 Ack 確認包
    // 構造 Ack 包併發送
    lastAck = seqList[0][1]
    lastAckChanged := false

    // 程序模擬數據包亂序
    // 模擬除了第 1 個數據包之外
    // 其他的所有數據包都發生了亂序
    for i, j := 1, len(seqList)-1; i < j; i, j = i+1, j-1 {
     seqList[i], seqList[j] = seqList[j], seqList[i]
    }

    // 根據亂序數據包發送快速重傳報文
    for _, val := range seqList {
     if val[0] > lastAck {
      ackPacket := Packet{
       // 因爲這個示例中
       // 服務端不主動發送數據
       // 所以 Seq 固定爲 1
       Seq:  1,
       Ack:  lastAck,
       SAck: "",
       Data: "",
       Flag: FlagTypeDupAck,
      }

      ackData := encode(&ackPacket)
      conn.WriteToUDP(ackData, clientAddr)
     } else {
      lastAck = val[1]
     }
    }

    // 排序合併後的區間
    sort.Slice(seqList, func(i, j int) bool {
     return seqList[i][0] < seqList[j][0] && seqList[i][1] < seqList[j][1]
    })

    // 因爲丟包,可能存在多個區間 Ack 確認包
    // 所以需要分開單獨發送
    // 根據 Seq 合併區間
    mergedSeqList := [][2]int{
     seqList[0],
    }

    for i := 1; i < len(seqList); i++ {
     // 數據包 Seq 是連續的,直接合並兩個區間
     if seqList[i][0] == mergedSeqList[len(mergedSeqList)-1][1] {
      mergedSeqList[len(mergedSeqList)-1][1] = seqList[i][1]

      // 更新最後接收到的確認號
      if !lastAckChanged {
       lastAck = mergedSeqList[len(mergedSeqList)-1][1]
      }
     } else {
      lastAckChanged = true

      // 數據包 Seq 不是連續的,有中間數據包還未收到
      mergedSeqList = append(mergedSeqList, seqList[i])
     }
    }

    for _, seq := range mergedSeqList {
     ackPacket := Packet{
      // 因爲這個示例中
      // 服務端不主動發送數據
      // 所以 Seq 固定爲 1
      Seq:  1,
      Ack:  lastAck,
      SAck: fmt.Sprintf("%d-%d", seq[0], seq[1]),
      Data: "",
      Flag: FlagTypeAck,
     }

     ackData := encode(&ackPacket)
     conn.WriteToUDP(ackData, clientAddr)
    }

    // 更新最後發送 Ack 的時間
    lastAckTime = time.Now()

    // 重置區間 Seq
    seqList = seqList[:0]
   }

   // 短暫休眠,避免佔用過多 CPU 資源
   time.Sleep(100 * time.Millisecond)
  }
 }()

 for {
  _, clientAddr, err = conn.ReadFromUDP(buffer)
  if err != nil {
   fmt.Println("Error reading:", err)
   continue
  }

  // 解析接收到的數據包
  recvPacket := decode(buffer[:])

  fmt.Printf("client -> server %s\n", serialization(&recvPacket))

  // 記錄歷史區間 Seq
  seqRecord = append(seqRecord, [2]int{
   recvPacket.Seq,
   recvPacket.Seq + len(recvPacket.Data),
  })

  // 這裏假設重傳的數據包 100% 接收成功
  // 服務端直接返回確認 Ack 報文
  // 簡化對重傳數據包的再次 Ack 的實現機制
  if recvPacket.Retransmit {
   // 排序合併後的區間
   sort.Slice(seqRecord, func(i, j int) bool {
    return seqRecord[i][0] < seqRecord[j][0] && seqRecord[i][1] < seqRecord[j][1]
   })
   // 合併重複區間
   // 合併重複區間
   uniqueIndex := 0
   for i := 1; i < len(seqRecord); i++ {
    if seqRecord[i][0] == seqRecord[uniqueIndex][1] {
     seqRecord[uniqueIndex][1] = seqRecord[i][1]
    } else {
     uniqueIndex++
    }
   }
   seqRecord = seqRecord[:uniqueIndex+1]

   // 更新已經接收到連續區間最大 Ack
   lastAck = seqRecord[0][1]

   recvPacket.SAck = fmt.Sprintf("%d-%d", recvPacket.Seq, recvPacket.Seq+len(recvPacket.Data))
   recvPacket.Ack = lastAck

   recvPacket.Seq = 1
   recvPacket.Flag = FlagTypeAck
   conn.WriteToUDP(encode(&recvPacket), clientAddr)
   continue
  }

  // 記錄接收到的區間 Seq
  seqList = append(seqList, [2]int{
   recvPacket.Seq,
   recvPacket.Seq + len(recvPacket.Data),
  })
 }
}

// 客戶端程序
func startClient() {
 conn, err := net.DialUDP("udp", nil, &serverAddr)
 if err != nil {
  fmt.Println("Error connecting:", err)
  return
 }
 defer conn.Close()

 // 記錄客戶端已經發送過的數據包 Seq 列表
 sentPackets := []*Packet{}
 // 記錄客戶端已經接收到的數據包 Seq 列表
 receivedPackets := []*Packet{}

 var wg sync.WaitGroup
 wg.Add(1)

 // 這裏啓動一個新的 goroutine
 // 1. 完成超時重傳
 // 2. 完成接收 Ack 操作
 go func() {
  defer wg.Done()

  // 超時退出
  timeout := time.NewTimer(1 * time.Second)
  defer timeout.Stop()

  // 超時重傳定時器
  // 硬編碼爲 300 毫秒
  ticket := time.NewTicker(300 * time.Millisecond)
  defer ticket.Stop()

  for {
   select {
   case <-timeout.C:
    return
   case <-ticket.C:
    // 發送的數據包已經被接收方全部確認
    // 無需重傳
    if len(sentPackets) == len(receivedPackets) {
     continue
    }

    // 通過區間差集算法
    // 同時考慮 選擇性確認 的情況
    lostPackets := []*Packet{}
    receivedAckList := [][2]int{}
    for _, val := range receivedPackets {
     ackBlock := strings.Split(val.SAck, "-")
     if len(ackBlock) < 2 {
      continue
     }
     start, _ := strconv.ParseInt(ackBlock[0], 10, 64)
     end, _ := strconv.ParseInt(ackBlock[1], 10, 64)
     receivedAckList = append(receivedAckList, [2]int{
      int(start),
      int(end),
     })
    }

    // 排序合併後的區間
    sort.Slice(receivedAckList, func(i, j int) bool {
     return receivedAckList[i][0] < receivedAckList[j][0] && receivedAckList[i][1] < receivedAckList[j][1]
    })
    // 合併重複區間
    uniqueIndex := 0
    for i := 1; i < len(receivedAckList); i++ {
     if receivedAckList[i][0] == receivedAckList[uniqueIndex][1] {
      receivedAckList[uniqueIndex][1] = receivedAckList[i][1]
     } else {
      uniqueIndex++
     }
    }
    receivedAckList = receivedAckList[:uniqueIndex+1]

    // 計算丟失的數據包
    curRecvIndex := 0
    for i, val := range sentPackets {
     if curRecvIndex >= len(receivedPackets) {
      lostPackets = append(lostPackets, val)
      continue
     }
     if val.Seq > receivedAckList[curRecvIndex][1] {
      curRecvIndex++
      lostPackets = append(lostPackets, sentPackets[i-1])
     }
    }

    for _, val := range lostPackets {
     // 構建 1 個 UDP 數據包
     packet := Packet{
      Seq:        val.Seq,
      Ack:        1,
      Data:       "Hello Server",
      Flag:       FlagTypeData,
      Retransmit: true,
     }

     data := encode(&packet)
     conn.Write(data)
    }
   default:
    // 接收 Ack 包
    buffer := make([]byte, 32)

    conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
    _, _, err := conn.ReadFromUDP(buffer)
    if err != nil {
     continue
    }

    recvAckPacket := decode(buffer[:])
    fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))

    // 更新接收到的數據包 Seq
    receivedPackets = append(receivedPackets, &recvAckPacket)
   }
  }
 }()

 //  客戶端 Seq 值從 1 開始
 curSeq := 1

 // 連續發送 5 個 UDP 數據包
 for i := 0; i <= 5; i++ {
  // 構建 1 個 UDP 數據包
  packet := Packet{
   Seq:  curSeq,
   Ack:  1,
   Data: "Hello Server",
   Flag: FlagTypeData,
  }

  // 更新發送過的數據包 Seq
  sentPackets = append(sentPackets, &packet)

  // 第 4 個數據包模擬丟包
  if i != 3 {
   data := encode(&packet)
   conn.Write(data)
  }

  // 更新下次發送數據包的 Seq 值
  curSeq += len(packet.Data)
 }

 // 等待 Ack 報文接收完成
 wg.Wait()
}


// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
 var sb strings.Builder

 if p.Retransmit {
  sb.WriteString("[TCP Retransmit] ")
 }

 if p.Flag == FlagTypeData {
  // 無需任何標誌位渲染
  // 輸出佔位符美化終端顯示
  if !p.Retransmit {
   sb.WriteString("     ")
  }
 } else if p.Flag == FlagTypeAck {
  sb.WriteString("[ACK]")
 } else if p.Flag == FlagTypeDupAck {
  sb.WriteString("[TCP Dup ACK]")
 } else {
  sb.WriteString("[Unknown]")
 }

 sb.WriteString(" Seq=")
 sb.WriteString(strconv.Itoa(p.Seq))

 if p.Flag == FlagTypeAck || p.Flag == FlagTypeDupAck {
  sb.WriteString(" Ack=")
  sb.WriteString(strconv.Itoa(p.Ack))

  if len(p.SAck) > 0 {
   sb.WriteString(" SAck=")
   sb.WriteString(p.SAck)
  }
 }

 sb.WriteString(" Len=")
 sb.WriteString(strconv.Itoa(len(p.Data)))

 if p.Flag == FlagTypeData {
  sb.WriteString(" Data=")
  sb.WriteString(p.Data)
 }

 return sb.String()
}


// 其他重複代碼省略
// ...

運行程序的輸出如下:

通過輸出結果可以看到,客戶端連續發送了 6 個 UDP 數據包,其中第 4 個包模擬丟包 (服務端接收不到),但是因爲服務端啓動了選擇性確認,所以最終發送給客戶端的 Ack 報文有 2 個:

客戶端根據這兩個信息,就可以判斷出丟包的具體數據包,也就是 Seq 在 37 號到 49 號之間的數據包,具體來說,也就是下面這個數據包:

client -> server       Seq=37 Len=12 Data=Hello Server

客戶端在超時計時器觸發後,通過對比 已經收到的數據包 Ack已經發送的數據包 Seq 集合,計算出還未接受到的數據包,也就是丟包數據,然後重新發送,通過輸出的結果,可以看到對應數據包中的 [TCP Retransmit] 標識信息。

此外,通過在服務端模擬接收到的數據包亂序,服務端向客戶端發送了快速重傳 Dup ACK 報文,當然,上述代碼實現的是一個純演示版本。

3. 選擇性重傳

在前文中 超時重傳 代碼實現時已經順帶實現了,這裏不再贅述。


小結

本文通過僞代碼實現,演示了使用 UDP 來實現 TCP 中的確認與重傳機制,文中整體的所有代碼實現非常粗糙簡陋以及高度耦合 (可以直接運行,但只是爲了演示效果),而且沒有考慮任何併發安全、錯誤處理、性能優化等工程問題,但是本文主要的目的在於說明設計思路,僞代碼可以輔助理解實現細節,能到達這個目標就足夠了。

大多數有過網絡編程經驗的開發者,或多或少會產生過一個執念: 通過 UDP 來實現和 TCP 一樣的可靠傳輸保證 (RUDP),但這樣也就失去了創造 UDP 本身的意思,退一步說,即使真的實現了,充其量也就是和 TCP 性能持平 (畢竟 TCP 處於內核態沒有上下文切換成本,RUDP 處於用戶態有上下文切換成本),沒有任何技術價值。此外還要考慮網絡鏈路中的 UDP 流量服務質量 (包括運營商限制、防火牆丟包等)。不過好在,今天我們有了新的選擇: QUIC, a multiplexed transport over UDP[1]

參考資料

[1]

QUIC, a multiplexed transport over UDP: https://www.chromium.org/quic/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/xe32fVUSQU-QIpZPtBszeQ