如何實現可靠的 UDP ?
概述
UDP 是無連接的,盡最大可能交付 (網絡 IP 層),不提供流量控制,擁塞控制,面向報文(對於應用程序的數據不合並也不拆分,直接添加 UDP 首部),支持一對一、一對多、多對一和多對多 (也就是多播和廣播) 通信。
TCP 和 UDP 區別總結
TCP 實現可靠傳輸層的核心有三點:
-
確認與重傳 (已經可以滿足 “可靠性”,但是可能存在性能問題)
-
滑動窗口 (也就是流量控制,爲了提高吞吐量,充分利用鏈路帶寬,避免發送方發的太慢)
-
擁塞控制 (防止網絡鏈路過載造成丟包,避免發送方發的太快)
如果希望使用 UDP 來實現 TCP 的可靠傳輸,顯然 最直接的方法就是在應用層實現 確認與重傳。本文沿着這個思路,看看利用 UDP 來實現確認與重傳機制時的設計思路和實現僞代碼。
除了確認和重傳外,TCP 校驗和也是 TCP 實現可靠傳輸的手段之一,但這不是本文的重點,所以這裏一筆帶過。
爲了節省篇幅,本文將可靠傳輸的模型實現簡化爲:
發送方實現重傳機制,接收方實現確認機制。
數據結構
爲了簡化代碼和便於理解,下文中將每個 UDP 數據包封裝一個對象,字段及含義如下所示。
// 數據包標誌位類型
type FlagType uint8
const (
FlagTypeInvalid FlagType = iota
FlagTypeData // 數據包
FlagTypeAck // 確認包
)
// 定義數據包的結構體
type Packet struct {
Seq int // 序列號
Ack int // 確認號
Data string // 數據內容
Flag FlagType // 標誌位
}
確認機制
確認機制由接收方實現,在本文中也就是服務端程序。
1. 單個數據包確認
所謂單個確認,也就是常見的 Reply 形式,發送方 (客戶端) 向接收方發送一個 UDP 數據包,對於每個接收到的 UDP 數據包,接收方 (服務端) 都向發送方發送一個確認 ACK 數據包。
單個數據包確認示例圖
實現思路非常簡單:
-
服務端程序使用 UDP 監聽指定端口
-
客戶端向服務端發送 UDP 數據包
-
服務端收到客戶端的 UDP 數據包之後,向客戶端發送 ACK 數據包
-
客戶端收到服務端的 ACK 數據包之後,更新 Seq 值
最終的程序代碼及其對應的註釋如下。
// V1 版本
package main
import (
"fmt"
"net"
"strconv"
"strings"
"time"
)
// 數據包標誌位類型
type FlagType uint8
const (
FlagTypeInvalid FlagType = iota
FlagTypeData // 數據包
FlagTypeAck // 確認包
)
// 定義數據包的結構體
type Packet struct {
Seq int // 序列號
Ack int // 確認號
Data string // 數據內容
Flag FlagType // 標誌位
}
var (
// 服務端地址
serverAddr = net.UDPAddr{
Port: 8080,
IP: net.ParseIP("127.0.0.1"),
}
)
func main() {
go startServer()
// 等待服務端程序啓動
time.Sleep(200 * time.Millisecond)
startClient()
}
// 服務端程序
func startServer() {
conn, err := net.ListenUDP("udp", &serverAddr)
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer conn.Close()
buffer := make([]byte, 1024)
for {
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading:", err)
continue
}
// 解析接收到的數據包
recvPacket := decode(buffer[:n])
fmt.Printf("client -> server %s\n", serialization(&recvPacket))
// 構造 Ack 包併發送
ackPacket := Packet{
// 因爲這個示例中
// 服務端不主動發送數據
// 所以 Seq 固定爲 1
Seq: 1,
Ack: recvPacket.Seq + len(recvPacket.Data),
Data: "",
Flag: FlagTypeAck,
}
ackData := encode(&ackPacket)
conn.WriteToUDP(ackData, clientAddr)
}
}
// 客戶端程序
func startClient() {
conn, err := net.DialUDP("udp", nil, &serverAddr)
if err != nil {
fmt.Println("Error connecting:", err)
return
}
defer conn.Close()
// 發送一個數據包
packet := Packet{
Seq: 1, // 客戶端 Seq 值從 1 開始
Ack: 1,
Data: "Hello Server",
Flag: FlagTypeData,
}
// 發送 5 個 UDP 數據包
for i := 0; i < 5; i++ {
data := encode(&packet)
conn.Write(data)
// 接收 Ack 包
buffer := make([]byte, 1024)
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading:", err)
return
}
recvAckPacket := decode(buffer[:n])
fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))
// 更新下次發送數據包的 Seq 值
packet.Seq = recvAckPacket.Ack
}
}
// Packet 數據包編碼
// 使用字符串拼接作爲簡單實現
func encode(p *Packet) []byte {
return []byte(fmt.Sprintf("%d|%d|%q|%d", p.Seq, p.Ack, p.Data, p.Flag))
}
// Packet 數據包解碼
func decode(data []byte) Packet {
var p Packet
_, _ = fmt.Sscanf(string(data), "%d|%d|%q|%d", &p.Seq, &p.Ack, &p.Data, &p.Flag)
return p
}
// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
var sb strings.Builder
if p.Flag == FlagTypeData {
// 無需任何標誌位渲染
// 輸出佔位符美化終端顯示
sb.WriteString(" ")
} else if p.Flag == FlagTypeAck {
sb.WriteString("[ACK]")
} else {
sb.WriteString("[Unknown]")
}
sb.WriteString(" Seq=")
sb.WriteString(strconv.Itoa(p.Seq))
if p.Flag == FlagTypeAck {
sb.WriteString(" Ack=")
sb.WriteString(strconv.Itoa(p.Ack))
}
sb.WriteString(" Len=")
sb.WriteString(strconv.Itoa(len(p.Data)))
if p.Flag == FlagTypeData {
sb.WriteString(" Data=")
sb.WriteString(p.Data)
}
return sb.String()
}
運行程序的輸出如下:
通過輸出結果可以看到,單個數據包的確認機制實現,已經可以正常工作了。
2. 延遲確認
服務端在不發送數據的情況下,每收到一個 UDP 數據包,就發送 Ack 報文,導致了低效的數據傳輸和浪費網絡帶寬,也就是所謂的 “糊塗窗口綜合症”。
既然服務端沒有什麼數據要發送給客戶端,那麼就可以延遲一段時間再發送 Ack 報文, 如果在延遲期間,又接收到了新的數據,就可以將多個 Ack 報文合併到一個數據包裏面發送了。
延遲確認示例圖
當然,要重構的主要服務端程序和客戶端程序的代碼,修改後的代碼如下所示。
// V2 版本
// 其他重複代碼省略
// ...
// 服務端程序
func startServer() {
conn, err := net.ListenUDP("udp", &serverAddr)
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer conn.Close()
buffer := make([]byte, 1024)
// 延遲 200 毫秒發送 ACK
const ackDelay = 200 * time.Millisecond
var (
// 延遲 Ack
lastAck int
// 最後發送 Ack 報文的時間
lastAckTime = time.Now()
// 客戶端的 UDP 地址
clientAddr *net.UDPAddr
)
// 因爲 conn.ReadFromUDP 方法是阻塞接收操作
// 所以這裏啓動一個新的 goroutine
// 來完成延遲 Ack 操作
go func() {
for {
// 超過延遲時間,發送 Ack 確認包
if time.Since(lastAckTime) >= ackDelay {
// 超過延遲時間,發送 Ack 確認包
// 構造 Ack 包併發送
ackPacket := Packet{
// 因爲這個示例中
// 服務端不主動發送數據
// 所以 Seq 固定爲 1
Seq: 1,
Ack: lastAck,
Data: "",
Flag: FlagTypeAck,
}
ackData := encode(&ackPacket)
conn.WriteToUDP(ackData, clientAddr)
// 更新最後發送 Ack 的時間
lastAckTime = time.Now()
}
// 短暫休眠,避免佔用過多 CPU 資源
time.Sleep(100 * time.Millisecond)
}
}()
for {
_, clientAddr, err = conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading:", err)
continue
}
// 解析接收到的數據包
recvPacket := decode(buffer[:])
fmt.Printf("client -> server %s\n", serialization(&recvPacket))
// 更新最後接收到的確認號
lastAck = recvPacket.Seq + len(recvPacket.Data)
}
}
// 客戶端程序
func startClient() {
conn, err := net.DialUDP("udp", nil, &serverAddr)
if err != nil {
fmt.Println("Error connecting:", err)
return
}
defer conn.Close()
// 構建一個 UDP 數據包
packet := Packet{
Seq: 1, // 客戶端 Seq 值從 1 開始
Ack: 1,
Data: "Hello Server",
Flag: FlagTypeData,
}
var wg sync.WaitGroup
wg.Add(1)
// 這裏啓動一個新的 goroutine
// 來完成接收 Ack 操作
go func() {
defer wg.Done()
// 接收 Ack 包
buffer := make([]byte, 1024)
_, _, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading:", err)
return
}
recvAckPacket := decode(buffer[:])
fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))
}()
// 連續發送 5 個 UDP 數據包
for i := 0; i < 5; i++ {
data := encode(&packet)
conn.Write(data)
// 更新下次發送數據包的 Seq 值
packet.Seq += len(packet.Data)
}
// 等待 Ack 報文接收完成
wg.Wait()
}
// 其他重複代碼省略
// ...
運行程序的輸出如下:
通過輸出結果可以看到,客戶端連續發送了 5 個 UDP 數據包,但是因爲服務端啓動了延遲確認,最終發送給客戶端的 Ack 報文只有 1 個。
3. 選擇性確認
在選擇性重傳中,接收方通過 SAck 向發送方應答已經收到的非連續數據包,發送方可以作爲依據,來重傳接收方沒有收到 (可能已經丟失) 的數據包。
如圖所示,SAck = 1361 ~ 2721
表示這個區間的數據包已經收到了。
代碼實現也很簡單,就是接收方記錄已經接收到的數據包 Seq,並定時將每個區間 Seq 的最大值作爲 Ack 報文響應給發送方。
因爲我們的僞代碼只考慮接收方實現選擇性確認,所以只需要在剛纔的代碼基礎上,對服務端和客戶端代碼稍加調整即可。
-
在數據包 Packet 對象中新增一個 SAck 字段
-
修改 Packet 對象的編碼、解碼、渲染方法
-
接收方 (服務端) 實現選擇性確認 Ack
-
客戶端實現模擬丟包和超時自動退出
選擇性確認示例圖
最後修改後的代碼如下所示。
// V3 版本
package main
import (
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
)
// 數據包標誌位類型
type FlagType uint8
const (
FlagTypeInvalid FlagType = iota
FlagTypeData // 數據包
FlagTypeAck // 確認包
)
// 定義數據包的結構體
type Packet struct {
Seq int // 序列號
Ack int // 確認號
SAck string // SAck 區間
Data string // 數據內容
Flag FlagType // 標誌位
}
var (
// 服務端地址
serverAddr = net.UDPAddr{
Port: 8080,
IP: net.ParseIP("127.0.0.1"),
}
)
func main() {
go startServer()
// 等待服務端程序啓動
time.Sleep(200 * time.Millisecond)
startClient()
}
// 服務端程序
func startServer() {
conn, err := net.ListenUDP("udp", &serverAddr)
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer conn.Close()
buffer := make([]byte, 32)
// 延遲 200 毫秒發送 ACK
const ackDelay = 200 * time.Millisecond
var (
// 延遲 Ack
lastAck int
// 記錄接收到的區間 Seq
// [0]: 區間起始 Seq
// [1]: 區間結束 Seq, Seq + Data.Len()
seqList = [][2]int{}
// 最後發送 Ack 報文的時間
lastAckTime = time.Now()
// 客戶端的 UDP 地址
clientAddr *net.UDPAddr
)
// 因爲 conn.ReadFromUDP 方法是阻塞接收操作
// 所以這裏啓動一個新的 goroutine
// 來完成延遲 Ack 操作
go func() {
for {
// 超過延遲時間,發送 Ack 確認包
if time.Since(lastAckTime) >= ackDelay && len(seqList) > 0 {
// 超過延遲時間,發送 Ack 確認包
// 構造 Ack 包併發送
lastAck = seqList[0][1]
lastAckChanged := false
// 因爲丟包,可能存在多個區間 Ack 確認包
// 所以需要分開單獨發送
// 根據 Seq 合併區間
mergedSeqList := [][2]int{
seqList[0],
}
for i := 1; i < len(seqList); i++ {
// 數據包 Seq 是連續的,直接合並兩個區間
if seqList[i][0] == mergedSeqList[len(mergedSeqList)-1][1] {
mergedSeqList[len(mergedSeqList)-1][1] = seqList[i][1]
// 更新最後接收到的確認號
if !lastAckChanged {
lastAck = mergedSeqList[len(mergedSeqList)-1][1]
}
} else {
lastAckChanged = true
// 數據包 Seq 不是連續的,有中間數據包還未收到
mergedSeqList = append(mergedSeqList, seqList[i])
}
}
for _, seq := range mergedSeqList {
ackPacket := Packet{
// 因爲這個示例中
// 服務端不主動發送數據
// 所以 Seq 固定爲 1
Seq: 1,
Ack: lastAck,
SAck: fmt.Sprintf("%d-%d", seq[0], seq[1]),
Data: "",
Flag: FlagTypeAck,
}
ackData := encode(&ackPacket)
conn.WriteToUDP(ackData, clientAddr)
}
// 更新最後發送 Ack 的時間
lastAckTime = time.Now()
// 重置區間 Seq
seqList = seqList[:0]
}
// 短暫休眠,避免佔用過多 CPU 資源
time.Sleep(100 * time.Millisecond)
}
}()
for {
_, clientAddr, err = conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading:", err)
continue
}
// 解析接收到的數據包
recvPacket := decode(buffer[:])
fmt.Printf("client -> server %s\n", serialization(&recvPacket))
// 記錄接收到的區間 Seq
seqList = append(seqList, [2]int{
recvPacket.Seq,
recvPacket.Seq + len(recvPacket.Data),
})
}
}
// 客戶端程序
func startClient() {
conn, err := net.DialUDP("udp", nil, &serverAddr)
if err != nil {
fmt.Println("Error connecting:", err)
return
}
defer conn.Close()
var wg sync.WaitGroup
wg.Add(1)
// 這裏啓動一個新的 goroutine
// 來完成接收 Ack 操作
go func() {
defer wg.Done()
t := time.NewTimer(1 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
return
default:
// 接收 Ack 包
buffer := make([]byte, 32)
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
_, _, err := conn.ReadFromUDP(buffer)
if err != nil {
continue
}
recvAckPacket := decode(buffer[:])
fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))
}
}
}()
// 構建一個 UDP 數據包
packet := Packet{
Seq: 1, // 客戶端 Seq 值從 1 開始
Ack: 1,
Data: "Hello Server",
Flag: FlagTypeData,
}
// 連續發送 5 個 UDP 數據包
for i := 0; i < 5; i++ {
// 第 4 個數據包模擬丟包
if i != 3 {
data := encode(&packet)
conn.Write(data)
}
// 更新下次發送數據包的 Seq 值
packet.Seq += len(packet.Data)
}
// 等待 Ack 報文接收完成
wg.Wait()
}
// Packet 數據包編碼
// 使用字符串拼接作爲簡單實現
func encode(p *Packet) []byte {
return []byte(fmt.Sprintf("%d|%d|%q|%q|%d", p.Seq, p.Ack, p.SAck, p.Data, p.Flag))
}
// Packet 數據包解碼
func decode(data []byte) Packet {
var p Packet
_, _ = fmt.Sscanf(string(data), "%d|%d|%q|%q|%d", &p.Seq, &p.Ack, &p.SAck, &p.Data, &p.Flag)
return p
}
// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
var sb strings.Builder
if p.Flag == FlagTypeData {
// 無需任何標誌位渲染
// 輸出佔位符美化終端顯示
sb.WriteString(" ")
} else if p.Flag == FlagTypeAck {
sb.WriteString("[ACK]")
} else {
sb.WriteString("[Unknown]")
}
sb.WriteString(" Seq=")
sb.WriteString(strconv.Itoa(p.Seq))
if p.Flag == FlagTypeAck {
sb.WriteString(" Ack=")
sb.WriteString(strconv.Itoa(p.Ack))
if len(p.SAck) > 0 {
sb.WriteString(" SAck=")
sb.WriteString(p.SAck)
}
}
sb.WriteString(" Len=")
sb.WriteString(strconv.Itoa(len(p.Data)))
if p.Flag == FlagTypeData {
sb.WriteString(" Data=")
sb.WriteString(p.Data)
}
return sb.String()
}
運行程序的輸出如下:
通過輸出結果可以看到,客戶端連續發送了 5 個 UDP 數據包,其中第 4 個包模擬丟包 (服務端接收不到),但是因爲服務端啓動了選擇性確認,所以最終發送給客戶端的 Ack 報文有 2 個:
-
Ack=37: 表示 Seq 在 37 號之前數據包已經全部接收完成
-
SAck=49-61: 表示 Seq 在 49 號到 61 號之間的數據包已經全部接收完成
客戶端根據這兩個信息,就可以判斷出丟包的具體數據包,也就是 Seq 在 37 號到 49 號之間的數據包,具體來說,也就是下面這個數據包:
client -> server Seq=37 Len=12 Data=Hello Server
小結
使用 UDP 實現可靠性傳輸中的 確認機制
,接收方 (服務端) 已經完成了,接下來就是發送方 (客戶端) 要實現的 重傳機制
。有了前文的基礎後,客戶端部分代碼實現起來應該也很快,繼續 Coding :-)
重傳機制
重傳機制由發送方實現,在本文中也就是客戶端端程序。
1. 超時重傳
爲了簡化實現,本文不計算數據包往返的 RTT, RTO (超時重傳時間) 直接採用 1 個暴力的硬編碼值: 300 毫秒。
此外,因爲前文中接收方 (服務端) 已經實現了選擇性確認,所以這裏將 超時重傳 + 選擇性重傳
一起實現。
最後修改後的代碼如下所示。
// V4 版本
package main
import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
)
// 數據包標誌位類型
type FlagType uint8
const (
FlagTypeInvalid FlagType = iota
FlagTypeData // 數據包
FlagTypeAck // 確認包
)
// 定義數據包的結構體
type Packet struct {
Seq int // 序列號
Ack int // 確認號
SAck string // SAck 區間
Data string // 數據內容
Flag FlagType // 標誌位
Retransmit bool // 重傳標誌位
}
var (
// 服務端地址
serverAddr = net.UDPAddr{
Port: 8080,
IP: net.ParseIP("127.0.0.1"),
}
)
func main() {
go startServer()
// 等待服務端程序啓動
time.Sleep(200 * time.Millisecond)
startClient()
}
// 服務端程序
func startServer() {
conn, err := net.ListenUDP("udp", &serverAddr)
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer conn.Close()
buffer := make([]byte, 32)
// 延遲 200 毫秒發送 ACK
const ackDelay = 200 * time.Millisecond
var (
// 延遲 Ack
lastAck int
// 記錄接收到的區間 Seq
// [0]: 區間起始 Seq
// [1]: 區間結束 Seq, Seq + Data.Len()
seqList = [][2]int{}
// 記錄歷史接收到的所有區間 Seq
seqRecord = [][2]int{}
// 最後發送 Ack 報文的時間
lastAckTime = time.Now()
// 客戶端的 UDP 地址
clientAddr *net.UDPAddr
)
// 因爲 conn.ReadFromUDP 方法是阻塞接收操作
// 所以這裏啓動一個新的 goroutine
// 來完成延遲 Ack 操作
go func() {
for {
// 超過延遲時間,發送 Ack 確認包
if time.Since(lastAckTime) >= ackDelay && len(seqList) > 0 {
// 超過延遲時間,發送 Ack 確認包
// 構造 Ack 包併發送
lastAck = seqList[0][1]
lastAckChanged := false
// 因爲丟包,可能存在多個區間 Ack 確認包
// 所以需要分開單獨發送
// 根據 Seq 合併區間
mergedSeqList := [][2]int{
seqList[0],
}
for i := 1; i < len(seqList); i++ {
// 數據包 Seq 是連續的,直接合並兩個區間
if seqList[i][0] == mergedSeqList[len(mergedSeqList)-1][1] {
mergedSeqList[len(mergedSeqList)-1][1] = seqList[i][1]
// 更新最後接收到的確認號
if !lastAckChanged {
lastAck = mergedSeqList[len(mergedSeqList)-1][1]
}
} else {
lastAckChanged = true
// 數據包 Seq 不是連續的,有中間數據包還未收到
mergedSeqList = append(mergedSeqList, seqList[i])
}
}
for _, seq := range mergedSeqList {
ackPacket := Packet{
// 因爲這個示例中
// 服務端不主動發送數據
// 所以 Seq 固定爲 1
Seq: 1,
Ack: lastAck,
SAck: fmt.Sprintf("%d-%d", seq[0], seq[1]),
Data: "",
Flag: FlagTypeAck,
}
ackData := encode(&ackPacket)
conn.WriteToUDP(ackData, clientAddr)
}
// 更新最後發送 Ack 的時間
lastAckTime = time.Now()
// 重置區間 Seq
seqList = seqList[:0]
}
// 短暫休眠,避免佔用過多 CPU 資源
time.Sleep(100 * time.Millisecond)
}
}()
for {
_, clientAddr, err = conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading:", err)
continue
}
// 解析接收到的數據包
recvPacket := decode(buffer[:])
fmt.Printf("client -> server %s\n", serialization(&recvPacket))
// 記錄歷史區間 Seq
seqRecord = append(seqRecord, [2]int{
recvPacket.Seq,
recvPacket.Seq + len(recvPacket.Data),
})
// 這裏假設重傳的數據包 100% 接收成功
// 服務端直接返回確認 Ack 報文
// 簡化對重傳數據包的再次 Ack 的實現機制
if recvPacket.Retransmit {
// 排序合併後的區間
sort.Slice(seqRecord, func(i, j int) bool {
return seqRecord[i][0] < seqRecord[j][0] && seqRecord[i][1] < seqRecord[j][1]
})
// 合併重複區間
// 合併重複區間
uniqueIndex := 0
for i := 1; i < len(seqRecord); i++ {
if seqRecord[i][0] == seqRecord[uniqueIndex][1] {
seqRecord[uniqueIndex][1] = seqRecord[i][1]
} else {
uniqueIndex++
}
}
seqRecord = seqRecord[:uniqueIndex+1]
// 更新已經接收到連續區間最大 Ack
lastAck = seqRecord[0][1]
recvPacket.SAck = fmt.Sprintf("%d-%d", recvPacket.Seq, recvPacket.Seq+len(recvPacket.Data))
recvPacket.Ack = lastAck
recvPacket.Seq = 1
recvPacket.Flag = FlagTypeAck
conn.WriteToUDP(encode(&recvPacket), clientAddr)
continue
}
// 記錄接收到的區間 Seq
seqList = append(seqList, [2]int{
recvPacket.Seq,
recvPacket.Seq + len(recvPacket.Data),
})
}
}
// 客戶端程序
func startClient() {
conn, err := net.DialUDP("udp", nil, &serverAddr)
if err != nil {
fmt.Println("Error connecting:", err)
return
}
defer conn.Close()
// 記錄客戶端已經發送過的數據包 Seq 列表
sentPackets := []*Packet{}
// 記錄客戶端已經接收到的數據包 Seq 列表
receivedPackets := []*Packet{}
var wg sync.WaitGroup
wg.Add(1)
// 這裏啓動一個新的 goroutine
// 1. 完成超時重傳
// 2. 完成接收 Ack 操作
go func() {
defer wg.Done()
// 超時退出
timeout := time.NewTimer(1 * time.Second)
defer timeout.Stop()
// 超時重傳定時器
// 硬編碼爲 300 毫秒
ticket := time.NewTicker(300 * time.Millisecond)
defer ticket.Stop()
for {
select {
case <-timeout.C:
return
case <-ticket.C:
// 發送的數據包已經被接收方全部確認
// 無需重傳
if len(sentPackets) == len(receivedPackets) {
continue
}
// 通過區間差集算法
// 同時考慮 選擇性確認 的情況
lostPackets := []*Packet{}
receivedAckList := [][2]int{}
for _, val := range receivedPackets {
ackBlock := strings.Split(val.SAck, "-")
start, _ := strconv.ParseInt(ackBlock[0], 10, 64)
end, _ := strconv.ParseInt(ackBlock[1], 10, 64)
receivedAckList = append(receivedAckList, [2]int{
int(start),
int(end),
})
}
// 排序合併後的區間
sort.Slice(receivedAckList, func(i, j int) bool {
return receivedAckList[i][0] < receivedAckList[j][0] && receivedAckList[i][1] < receivedAckList[j][1]
})
// 合併重複區間
uniqueIndex := 0
for i := 1; i < len(receivedAckList); i++ {
if receivedAckList[i][0] == receivedAckList[uniqueIndex][1] {
receivedAckList[uniqueIndex][1] = receivedAckList[i][1]
} else {
uniqueIndex++
}
}
receivedAckList = receivedAckList[:uniqueIndex+1]
// 計算丟失的數據包
curRecvIndex := 0
for i, val := range sentPackets {
if curRecvIndex >= len(receivedPackets) {
lostPackets = append(lostPackets, val)
continue
}
if val.Seq > receivedAckList[curRecvIndex][1] {
curRecvIndex++
lostPackets = append(lostPackets, sentPackets[i-1])
}
}
for _, val := range lostPackets {
// 構建 1 個 UDP 數據包
packet := Packet{
Seq: val.Seq,
Ack: 1,
Data: "Hello Server",
Flag: FlagTypeData,
Retransmit: true,
}
data := encode(&packet)
conn.Write(data)
}
default:
// 接收 Ack 包
buffer := make([]byte, 32)
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
_, _, err := conn.ReadFromUDP(buffer)
if err != nil {
continue
}
recvAckPacket := decode(buffer[:])
fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))
// 更新接收到的數據包 Seq
receivedPackets = append(receivedPackets, &recvAckPacket)
}
}
}()
// 客戶端 Seq 值從 1 開始
curSeq := 1
// 連續發送 5 個 UDP 數據包
for i := 0; i < 5; i++ {
// 構建 1 個 UDP 數據包
packet := Packet{
Seq: curSeq,
Ack: 1,
Data: "Hello Server",
Flag: FlagTypeData,
}
// 更新發送過的數據包 Seq
sentPackets = append(sentPackets, &packet)
// 第 4 個數據包模擬丟包
if i != 3 {
data := encode(&packet)
conn.Write(data)
}
// 更新下次發送數據包的 Seq 值
curSeq += len(packet.Data)
}
// 等待 Ack 報文接收完成
wg.Wait()
}
// Packet 數據包編碼
// 使用字符串拼接作爲簡單實現
func encode(p *Packet) []byte {
return []byte(fmt.Sprintf("%d|%d|%q|%q|%d|%t", p.Seq, p.Ack, p.SAck, p.Data, p.Flag, p.Retransmit))
}
// Packet 數據包解碼
func decode(data []byte) Packet {
var p Packet
_, _ = fmt.Sscanf(string(data), "%d|%d|%q|%q|%d|%t", &p.Seq, &p.Ack, &p.SAck, &p.Data, &p.Flag, &p.Retransmit)
return p
}
// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
var sb strings.Builder
if p.Retransmit {
sb.WriteString("[TCP Retransmit] ")
}
if p.Flag == FlagTypeData {
// 無需任何標誌位渲染
// 輸出佔位符美化終端顯示
if !p.Retransmit {
sb.WriteString(" ")
}
} else if p.Flag == FlagTypeAck {
sb.WriteString("[ACK]")
} else {
sb.WriteString("[Unknown]")
}
sb.WriteString(" Seq=")
sb.WriteString(strconv.Itoa(p.Seq))
if p.Flag == FlagTypeAck {
sb.WriteString(" Ack=")
sb.WriteString(strconv.Itoa(p.Ack))
if len(p.SAck) > 0 {
sb.WriteString(" SAck=")
sb.WriteString(p.SAck)
}
}
sb.WriteString(" Len=")
sb.WriteString(strconv.Itoa(len(p.Data)))
if p.Flag == FlagTypeData {
sb.WriteString(" Data=")
sb.WriteString(p.Data)
}
return sb.String()
}
運行程序的輸出如下:
通過輸出結果可以看到,客戶端連續發送了 5 個 UDP 數據包,其中第 4 個包模擬丟包 (服務端接收不到),但是因爲服務端啓動了選擇性確認,所以最終發送給客戶端的 Ack 報文有 2 個:
-
Ack=37: 表示 Seq 在 37 號之前數據包已經全部接收完成
-
SAck=49-61: 表示 Seq 在 49 號到 61 號之間的數據包已經全部接收完成
客戶端根據這兩個信息,就可以判斷出丟包的具體數據包,也就是 Seq 在 37 號到 49 號之間的數據包,具體來說,也就是下面這個數據包:
client -> server Seq=37 Len=12 Data=Hello Server
客戶端在超時計時器觸發後,通過對比 已經收到的數據包 Ack
和 已經發送的數據包 Seq
集合,計算出還未接受到的數據包,也就是丟包數據,然後重新發送,通過輸出的結果,可以看到對應數據包中的 [TCP Retransmit]
標識信息。
2. 快速重傳
快速重傳機制依賴於重複確認(Duplicate Acknowledgments, Dup ACK)來檢測數據包丟失,當接收方接收到一個亂序 (不連續) 的數據包時,會重新發送對最後一個按序 (連續) 到達的數據包的 Ack, 發送方收到一定數量 (3 個) 的重複 Ack 之後,認爲數據包 (可能已經) 丟失,並立即重傳該數據包。
實現方面,只需要通過在 超時重傳
的代碼基礎上,對服務端程序略加修改,通過程序打亂接收到數據包的順序,來模擬亂序到達,然後對於亂序的數據包,發送對應的 Dup ACK
響應報文即可。
最後修改後的代碼如下所示。
// V5 版本
// 其他重複代碼省略
// ...
const (
FlagTypeInvalid FlagType = iota
FlagTypeData // 數據包
FlagTypeAck // 確認包
FlagTypeDupAck // 快速重傳包
)
// 服務端程序
func startServer() {
conn, err := net.ListenUDP("udp", &serverAddr)
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer conn.Close()
buffer := make([]byte, 32)
// 延遲 200 毫秒發送 ACK
const ackDelay = 200 * time.Millisecond
var (
// 延遲 Ack
lastAck int
// 記錄接收到的區間 Seq
// [0]: 區間起始 Seq
// [1]: 區間結束 Seq, Seq + Data.Len()
seqList = [][2]int{}
// 記錄歷史接收到的所有區間 Seq
seqRecord = [][2]int{}
// 最後發送 Ack 報文的時間
lastAckTime = time.Now()
// 客戶端的 UDP 地址
clientAddr *net.UDPAddr
)
// 因爲 conn.ReadFromUDP 方法是阻塞接收操作
// 所以這裏啓動一個新的 goroutine
// 來完成延遲 Ack 操作
go func() {
for {
// 超過延遲時間,發送 Ack 確認包
if time.Since(lastAckTime) >= ackDelay && len(seqList) > 0 {
// 超過延遲時間,發送 Ack 確認包
// 構造 Ack 包併發送
lastAck = seqList[0][1]
lastAckChanged := false
// 程序模擬數據包亂序
// 模擬除了第 1 個數據包之外
// 其他的所有數據包都發生了亂序
for i, j := 1, len(seqList)-1; i < j; i, j = i+1, j-1 {
seqList[i], seqList[j] = seqList[j], seqList[i]
}
// 根據亂序數據包發送快速重傳報文
for _, val := range seqList {
if val[0] > lastAck {
ackPacket := Packet{
// 因爲這個示例中
// 服務端不主動發送數據
// 所以 Seq 固定爲 1
Seq: 1,
Ack: lastAck,
SAck: "",
Data: "",
Flag: FlagTypeDupAck,
}
ackData := encode(&ackPacket)
conn.WriteToUDP(ackData, clientAddr)
} else {
lastAck = val[1]
}
}
// 排序合併後的區間
sort.Slice(seqList, func(i, j int) bool {
return seqList[i][0] < seqList[j][0] && seqList[i][1] < seqList[j][1]
})
// 因爲丟包,可能存在多個區間 Ack 確認包
// 所以需要分開單獨發送
// 根據 Seq 合併區間
mergedSeqList := [][2]int{
seqList[0],
}
for i := 1; i < len(seqList); i++ {
// 數據包 Seq 是連續的,直接合並兩個區間
if seqList[i][0] == mergedSeqList[len(mergedSeqList)-1][1] {
mergedSeqList[len(mergedSeqList)-1][1] = seqList[i][1]
// 更新最後接收到的確認號
if !lastAckChanged {
lastAck = mergedSeqList[len(mergedSeqList)-1][1]
}
} else {
lastAckChanged = true
// 數據包 Seq 不是連續的,有中間數據包還未收到
mergedSeqList = append(mergedSeqList, seqList[i])
}
}
for _, seq := range mergedSeqList {
ackPacket := Packet{
// 因爲這個示例中
// 服務端不主動發送數據
// 所以 Seq 固定爲 1
Seq: 1,
Ack: lastAck,
SAck: fmt.Sprintf("%d-%d", seq[0], seq[1]),
Data: "",
Flag: FlagTypeAck,
}
ackData := encode(&ackPacket)
conn.WriteToUDP(ackData, clientAddr)
}
// 更新最後發送 Ack 的時間
lastAckTime = time.Now()
// 重置區間 Seq
seqList = seqList[:0]
}
// 短暫休眠,避免佔用過多 CPU 資源
time.Sleep(100 * time.Millisecond)
}
}()
for {
_, clientAddr, err = conn.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error reading:", err)
continue
}
// 解析接收到的數據包
recvPacket := decode(buffer[:])
fmt.Printf("client -> server %s\n", serialization(&recvPacket))
// 記錄歷史區間 Seq
seqRecord = append(seqRecord, [2]int{
recvPacket.Seq,
recvPacket.Seq + len(recvPacket.Data),
})
// 這裏假設重傳的數據包 100% 接收成功
// 服務端直接返回確認 Ack 報文
// 簡化對重傳數據包的再次 Ack 的實現機制
if recvPacket.Retransmit {
// 排序合併後的區間
sort.Slice(seqRecord, func(i, j int) bool {
return seqRecord[i][0] < seqRecord[j][0] && seqRecord[i][1] < seqRecord[j][1]
})
// 合併重複區間
// 合併重複區間
uniqueIndex := 0
for i := 1; i < len(seqRecord); i++ {
if seqRecord[i][0] == seqRecord[uniqueIndex][1] {
seqRecord[uniqueIndex][1] = seqRecord[i][1]
} else {
uniqueIndex++
}
}
seqRecord = seqRecord[:uniqueIndex+1]
// 更新已經接收到連續區間最大 Ack
lastAck = seqRecord[0][1]
recvPacket.SAck = fmt.Sprintf("%d-%d", recvPacket.Seq, recvPacket.Seq+len(recvPacket.Data))
recvPacket.Ack = lastAck
recvPacket.Seq = 1
recvPacket.Flag = FlagTypeAck
conn.WriteToUDP(encode(&recvPacket), clientAddr)
continue
}
// 記錄接收到的區間 Seq
seqList = append(seqList, [2]int{
recvPacket.Seq,
recvPacket.Seq + len(recvPacket.Data),
})
}
}
// 客戶端程序
func startClient() {
conn, err := net.DialUDP("udp", nil, &serverAddr)
if err != nil {
fmt.Println("Error connecting:", err)
return
}
defer conn.Close()
// 記錄客戶端已經發送過的數據包 Seq 列表
sentPackets := []*Packet{}
// 記錄客戶端已經接收到的數據包 Seq 列表
receivedPackets := []*Packet{}
var wg sync.WaitGroup
wg.Add(1)
// 這裏啓動一個新的 goroutine
// 1. 完成超時重傳
// 2. 完成接收 Ack 操作
go func() {
defer wg.Done()
// 超時退出
timeout := time.NewTimer(1 * time.Second)
defer timeout.Stop()
// 超時重傳定時器
// 硬編碼爲 300 毫秒
ticket := time.NewTicker(300 * time.Millisecond)
defer ticket.Stop()
for {
select {
case <-timeout.C:
return
case <-ticket.C:
// 發送的數據包已經被接收方全部確認
// 無需重傳
if len(sentPackets) == len(receivedPackets) {
continue
}
// 通過區間差集算法
// 同時考慮 選擇性確認 的情況
lostPackets := []*Packet{}
receivedAckList := [][2]int{}
for _, val := range receivedPackets {
ackBlock := strings.Split(val.SAck, "-")
if len(ackBlock) < 2 {
continue
}
start, _ := strconv.ParseInt(ackBlock[0], 10, 64)
end, _ := strconv.ParseInt(ackBlock[1], 10, 64)
receivedAckList = append(receivedAckList, [2]int{
int(start),
int(end),
})
}
// 排序合併後的區間
sort.Slice(receivedAckList, func(i, j int) bool {
return receivedAckList[i][0] < receivedAckList[j][0] && receivedAckList[i][1] < receivedAckList[j][1]
})
// 合併重複區間
uniqueIndex := 0
for i := 1; i < len(receivedAckList); i++ {
if receivedAckList[i][0] == receivedAckList[uniqueIndex][1] {
receivedAckList[uniqueIndex][1] = receivedAckList[i][1]
} else {
uniqueIndex++
}
}
receivedAckList = receivedAckList[:uniqueIndex+1]
// 計算丟失的數據包
curRecvIndex := 0
for i, val := range sentPackets {
if curRecvIndex >= len(receivedPackets) {
lostPackets = append(lostPackets, val)
continue
}
if val.Seq > receivedAckList[curRecvIndex][1] {
curRecvIndex++
lostPackets = append(lostPackets, sentPackets[i-1])
}
}
for _, val := range lostPackets {
// 構建 1 個 UDP 數據包
packet := Packet{
Seq: val.Seq,
Ack: 1,
Data: "Hello Server",
Flag: FlagTypeData,
Retransmit: true,
}
data := encode(&packet)
conn.Write(data)
}
default:
// 接收 Ack 包
buffer := make([]byte, 32)
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
_, _, err := conn.ReadFromUDP(buffer)
if err != nil {
continue
}
recvAckPacket := decode(buffer[:])
fmt.Printf("server -> client %s\n", serialization(&recvAckPacket))
// 更新接收到的數據包 Seq
receivedPackets = append(receivedPackets, &recvAckPacket)
}
}
}()
// 客戶端 Seq 值從 1 開始
curSeq := 1
// 連續發送 5 個 UDP 數據包
for i := 0; i <= 5; i++ {
// 構建 1 個 UDP 數據包
packet := Packet{
Seq: curSeq,
Ack: 1,
Data: "Hello Server",
Flag: FlagTypeData,
}
// 更新發送過的數據包 Seq
sentPackets = append(sentPackets, &packet)
// 第 4 個數據包模擬丟包
if i != 3 {
data := encode(&packet)
conn.Write(data)
}
// 更新下次發送數據包的 Seq 值
curSeq += len(packet.Data)
}
// 等待 Ack 報文接收完成
wg.Wait()
}
// 格式化數據包顯示
// 模擬 WireShark 的輸出格式
func serialization(p *Packet) string {
var sb strings.Builder
if p.Retransmit {
sb.WriteString("[TCP Retransmit] ")
}
if p.Flag == FlagTypeData {
// 無需任何標誌位渲染
// 輸出佔位符美化終端顯示
if !p.Retransmit {
sb.WriteString(" ")
}
} else if p.Flag == FlagTypeAck {
sb.WriteString("[ACK]")
} else if p.Flag == FlagTypeDupAck {
sb.WriteString("[TCP Dup ACK]")
} else {
sb.WriteString("[Unknown]")
}
sb.WriteString(" Seq=")
sb.WriteString(strconv.Itoa(p.Seq))
if p.Flag == FlagTypeAck || p.Flag == FlagTypeDupAck {
sb.WriteString(" Ack=")
sb.WriteString(strconv.Itoa(p.Ack))
if len(p.SAck) > 0 {
sb.WriteString(" SAck=")
sb.WriteString(p.SAck)
}
}
sb.WriteString(" Len=")
sb.WriteString(strconv.Itoa(len(p.Data)))
if p.Flag == FlagTypeData {
sb.WriteString(" Data=")
sb.WriteString(p.Data)
}
return sb.String()
}
// 其他重複代碼省略
// ...
運行程序的輸出如下:
通過輸出結果可以看到,客戶端連續發送了 6 個 UDP 數據包,其中第 4 個包模擬丟包 (服務端接收不到),但是因爲服務端啓動了選擇性確認,所以最終發送給客戶端的 Ack 報文有 2 個:
-
Ack=37: 表示 Seq 在 37 號之前數據包已經全部接收完成
-
SAck=49-73: 表示 Seq 在 49 號到 72 號之間的數據包已經全部接收完成
客戶端根據這兩個信息,就可以判斷出丟包的具體數據包,也就是 Seq 在 37 號到 49 號之間的數據包,具體來說,也就是下面這個數據包:
client -> server Seq=37 Len=12 Data=Hello Server
客戶端在超時計時器觸發後,通過對比 已經收到的數據包 Ack
和 已經發送的數據包 Seq
集合,計算出還未接受到的數據包,也就是丟包數據,然後重新發送,通過輸出的結果,可以看到對應數據包中的 [TCP Retransmit]
標識信息。
此外,通過在服務端模擬接收到的數據包亂序,服務端向客戶端發送了快速重傳 Dup ACK
報文,當然,上述代碼實現的是一個純演示版本。
3. 選擇性重傳
在前文中 超時重傳
代碼實現時已經順帶實現了,這裏不再贅述。
小結
本文通過僞代碼實現,演示了使用 UDP 來實現 TCP 中的確認與重傳機制,文中整體的所有代碼實現非常粗糙簡陋以及高度耦合 (可以直接運行,但只是爲了演示效果),而且沒有考慮任何併發安全、錯誤處理、性能優化等工程問題,但是本文主要的目的在於說明設計思路,僞代碼可以輔助理解實現細節,能到達這個目標就足夠了。
大多數有過網絡編程經驗的開發者,或多或少會產生過一個執念: 通過 UDP 來實現和 TCP 一樣的可靠傳輸保證 (RUDP),但這樣也就失去了創造 UDP 本身的意思,退一步說,即使真的實現了,充其量也就是和 TCP 性能持平 (畢竟 TCP 處於內核態沒有上下文切換成本,RUDP 處於用戶態有上下文切換成本),沒有任何技術價值。此外還要考慮網絡鏈路中的 UDP 流量服務質量 (包括運營商限制、防火牆丟包等)。不過好在,今天我們有了新的選擇: QUIC, a multiplexed transport over UDP[1]
參考資料
[1]
QUIC, a multiplexed transport over UDP: https://www.chromium.org/quic/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/xe32fVUSQU-QIpZPtBszeQ