不一樣的網絡協議 -------KCP 協議詳解

1、kcp 的協議特點

1.1、RTO 不翻倍

RTO(Retransmission TimeOut),重傳超時時間。tcp x 2,kcp x 1.5,提高傳輸速度

1.2、選擇重傳

TCP 丟包時會全部重傳從該包開始以後的數據,而 KCP 選擇性重傳,只重傳真正丟失的數據包。

1.3、快速重傳

tcp 重傳模式

超時重傳:超過規定的時間 RTO 則重傳

快速重傳:收到三個冗餘 ACK,不去等待 RTO ,直接重傳

這裏指的是收到 fastresend 個失序報文後,不等待超時,直接重傳,減少丟包等待時間。

1.4、非延遲 ACK

tcp 爲充分利用帶寬,延遲發送 ACK,RTT 時間較大,延長了丟包時的判斷過程。而 kcp 的 ACK 是否延遲發送可以調節。

1.5、ACK + UNA

ARQ (自動重傳請求,Automatic Repeat-reQuest)模型響應有兩種方式

UNA:此編號前所有包已收到,tcp

ACK:該編號包已收到

只用 UNA 將導致全部重傳,只用 ACK 則丟失成本太高,以往協議都是二選其一。而 kcp 協議中,除去單獨的 ACK 包(精確)外,所有包都有 UNA 信息。

1.6、非退讓流控

KCP 正常模式同 TCP 一樣使用公平退讓法則,即發送窗口大小由:發送緩存大小、接收端剩餘接收緩存大小、丟包退讓、慢啓動這四要素決定。但傳送及時性要求很高的小數據時,可選擇僅用前兩項來控制發送頻率。以犧牲部分公平性及帶寬利用率之代價,換取了流暢傳輸的效果。

KCP 實時性好,但帶寬利用率較低,因爲非退讓流控,不斷嘗試發送數據,有效包不多,每個包應答,佔用一定的帶寬

2.kcp 實現


UDP 收到的報文通過 kcp_input 傳遞給 KCP,KCP 會對數據進行解包,重新封裝成應用層用戶數據,應用層通過 kcp_recv 獲取。應用層通過 kcp_send 發送數據,KCP 會把用戶數據拆分 kcp 報文,通過 kcp_output,以 UDP(send) 的方式發送。

2.1、kcp 數據結構

kcp 報文結構:

conv:會話編號,通信雙方必須一致。cmd:報文類型 IKCP_CMD_ACK 確認命令 IKCP_CMD_PUSH 數據推送命令 IKCP_CMD_WASK 接收窗口詢問大小命令 IKCP_CMD_WINS 接收窗口大小告知命令 wnd: 己方可用接收窗口大小,接收窗口大小 - 接收隊列大小 frg:segmen t 分片。0,最後一個分片。3 2 1 0sn:segment 報文的序列號。ts:發送時間戳,用於計算 RTO 和 RTTuna:待接收的序列號,其實確認號,表示該序列號之前的所有報文都收到了,可以刪除 len:數據長度,DATA 的長度 DATA: 用戶數據

kcp 使用的 Segment 定義如下

struct IKCPSEG
{
    struct IQUEUEHEAD node;	// 用來串接多個 KCP segment,即前向後向指針
    IUINT32 conv;   // 會話編號
    IUINT32 cmd;    // 報文類型
    IUINT32 frg;    // 分片   
    IUINT32 wnd;    // 可用接收窗口大小(接收窗口大小-接收隊列大小)
    IUINT32 ts;     // 發送時刻的時間戳
    IUINT32 sn;     // 分片 segment 的序號
    IUINT32 una;    // 待接收消息序號
    IUINT32 len;    // 數據長度
    IUINT32 resendts;	// 下次超時重傳該報文的時間戳
    IUINT32 rto;	    // 重傳超時時間
    //發送端在發送過程中攜帶着RTO,該發送端會啓動一個定時器,進行定時,如果超過RTO就會重傳
    IUINT32 fastack;    // 收到ack時該分片被跳過的次數,用於快速重傳
    IUINT32 xmit;       // 記錄了該報文被傳輸了幾次
    char data[1];		// 實際傳輸的數據 payload
};

每一個 KCP 用戶都需要調用 ikcp_create 創建一個 kcp 控制塊 ikcpcb。ikcpcb 結構用來實現整個 KCP 協議。

struct IKCPCB
{
    IUINT32 conv;   // 標識會話
    IUINT32 mtu;    // 最大傳輸單元,默認數據爲1400,最小爲50
    IUINT32 mss;    // 最大分片大小,不大於mtu
    IUINT32 state;  // 連接狀態(0xffffffff表示斷開連接)
    
    IUINT32 snd_una;    // 第一個未確認的包
    IUINT32 snd_nxt;    // 下一個待分配包的序號
    IUINT32 rcv_nxt;    // 待接收消息序號.爲了保證包的順序,接收方會維護一個接收窗口,接收窗口有一個起始序號rcv_nxt 以及尾序號rcv_nxt + rcv_wnd(接收窗口大小)
 
    IUINT32 ts_recent;  
    IUINT32 ts_lastack;
    IUINT32 ssthresh;       // 擁塞窗口的閾值
    
    IINT32  rx_rttval;      // RTT的變化量,代表連接的抖動情況
    IINT32  rx_srtt;        // smoothed round trip time,平滑後的RTT;
    IINT32  rx_rto;         // 收ACK接收延遲計算出來的重傳超時時間
    IINT32  rx_minrto;      // 最小重傳超時時間
 
    IUINT32 snd_wnd;        // 發送窗口大小
    IUINT32 rcv_wnd;        // 接收窗口大小,本質上而言如果接收端一直不去讀取數據則rcv_queue就會滿(達到rcv_wnd)
    IUINT32 rmt_wnd;        // 遠端接收窗口大小
    IUINT32 cwnd;           // 擁塞窗口大小, 動態變化
    IUINT32 probe;          // 探查變量, IKCP_ASK_TELL表示告知遠端窗口大小。IKCP_ASK_SEND表示請求遠端告知窗口大小;
 
    IUINT32 current;
    IUINT32 interval;       // 內部flush刷新間隔,對系統循環效率有非常重要影響, 間隔小了cpu佔用率高, 間隔大了響應慢
    IUINT32 ts_flush;       // 下次flush刷新的時間戳
    IUINT32 xmit;           // 發送segment的次數, 當segment的xmit增加時,xmit增加(重傳除外)
 
    IUINT32 nrcv_buf;       // 接收緩存中的消息數量
    IUINT32 nsnd_buf;       // 發送緩存中的消息數量
 
    IUINT32 nrcv_que;       // 接收隊列中消息數量
    IUINT32 nsnd_que;       // 發送隊列中消息數量
 
    IUINT32 nodelay;        // 是否啓動無延遲模式。無延遲模式rtomin將設置爲0,擁塞控制不啓動;
    IUINT32 updated;         //是否調用過update函數的標識;
 
    IUINT32 ts_probe;       // 下次探查窗口的時間戳;
    IUINT32 probe_wait;     // 探查窗口需要等待的時間;
 
    IUINT32 dead_link;      // 最大重傳次數,被認爲連接中斷;
    IUINT32 incr;           // 可發送的最大數據量;
 
    struct IQUEUEHEAD snd_queue;    //發送消息的隊列 
    struct IQUEUEHEAD rcv_queue;    //接收消息的隊列, 確認過用戶可讀取
    struct IQUEUEHEAD snd_buf;      //發送消息的緩存
    struct IQUEUEHEAD rcv_buf;      //接收消息的緩存
 
    IUINT32 *acklist;   //待發送的ack的列表 當收到一個數據報文時,將其對應的 ACK 報文的 sn 號以及時間戳 ts 
                        //同時加入到acklist 中,即形成如 [sn1, ts1, sn2, ts2 …] 的列表
    IUINT32 ackcount;   // 記錄 acklist 中存放的 ACK 報文的數量 
    IUINT32 ackblock;   // acklist 數組的可用長度,當 acklist 的容量不足時,需要進行擴容
 
    void *user;     // 指針,可以任意放置代表用戶的數據,也可以設置程序中需要傳遞的變量;
    char *buffer;   // 存儲字節流信息 
 
    int fastresend; // 觸發快速重傳的重複ACK個數;
    int fastlimit;
 
    int nocwnd;     // 取消擁塞控制
    int stream;     // 是否採用流傳輸模式
 
    int logmask;    // 日誌的類型,如IKCP_LOG_IN_DATA,方便調試
 
    int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);//發送消息的回調函數
    void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);  // 寫日誌的回調函數
};

2.2、kcp 報文發送

KCP 中,數據發送流程分爲:

ikcp_send

ikcp_send

ikcp_send 的功能:把用戶發送的數據根據 MSS 分片成 KCP 的數據包格式,插入待發送隊列

分片方式

流模式:檢測每個發送隊列⾥的分片是否達到 MSS,沒有達到則用新的數據填充分片。

消息模式:將用戶數據的每個分片設置 sn 和 frag,將分片後的數據存入發送隊列,接收方通過 sn 和 frag 解包。即使⼀個分片的數據量可能不能達到 MSS,也會作爲⼀個包發送出去。

int ikcp_send(ikcpcb *kcp, const char *buffer, int len) 
{
    // 1、如果KCP開啓流模式
	if (kcp->stream != 0) {		
		if (!iqueue_is_empty(&kcp->snd_queue)) {
			// 取出 snd_queue 中的最後一個報文,將其填充到 mss 的長度,設置frg爲0
			IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
            
			// 舊分片內數據長度小於mss
			if (old->len < kcp->mss) {
				int capacity = kcp->mss - old->len; // 還能容納的數據長度
				int extend = (len < capacity)? len : capacity; // 需要填充的長度
				seg = ikcp_segment_new(kcp, old->len + extend); // 新建segment
				assert(seg);
				if (seg == NULL) {
					return -2;
				}
                 // 新分片添加到發送隊列尾部
				iqueue_add_tail(&seg->node, &kcp->snd_queue);   
                 // 拷貝舊分片的數據到新分片
				memcpy(seg->data, old->data, old->len); 
				// 將buffer中的數據也拷貝到新分片
				if (buffer) {
					memcpy(seg->data + old->len, buffer, extend);
					buffer += extend; // buffer指向剩餘數據的開頭
				}
				seg->len = old->len + extend;
				seg->frg = 0;
				len -= extend; // 更新len爲剩餘數據長度
				iqueue_del_init(&old->node); // 刪除old
				ikcp_segment_delete(kcp, old);
			}
		}
		if (len <= 0) {
			return 0;
		}
	}
    
    // 2、計算數據需要分成多少段報文
    if (len <= (int)kcp->mss) count = 1; // mss 1376 + head 24 = mtu 1400
    else count = (len + kcp->mss - 1) / kcp->mss;
	if (count >= (int)IKCP_WND_RCV) return -2;  // 超過對方的初始接收窗口
	if (count == 0) count = 1;  
    
    // fragment
    // 3、將數據全部新建 segment 插入發送隊列尾部,隊列計數遞增, frag 遞減
    for (i = 0; i < count; i++) {
        int size = len > (int)kcp->mss ? (int)kcp->mss : len;
        seg = ikcp_segment_new(kcp, size);
        assert(seg);
        if (seg == NULL) {
            return -2;
        }
        if (buffer && len > 0) { // 仍有待發送的數據
            memcpy(seg->data, buffer, size);
        }
        seg->len = size;
        // 分片編號,逆序。流模式情況下分片編號不用填寫
        seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
        iqueue_init(&seg->node);
        iqueue_add_tail(&seg->node, &kcp->snd_queue); // 加入到 snd_queue 中
        kcp->nsnd_que++;
       
        if (buffer) {
		      buffer += size;
        }
        len -= size;
    }
}

應用層調用 ikcp_send 之後將用戶數據置入 snd_queue 中,當 KCP 調用 ikcp_flush 時纔將數據從 snd_queue 中 移入到 snd_buf 中,然後調用 kcp->output() 發送。

檢查 kcp->update 是否更新,未更新直接返回。kcp->update 由 ikcp_update 更新,上層應用需要每隔一段時間(10-100ms)調用 ikcp_update 來驅動 KCP 發送數據;

// 'ikcp_update' haven't been called. 
if (kcp->updated == 0) return;

準備將 acklist 中記錄的 ACK 報文發送出去,即從 acklist 中填充 ACK 報文的 sn 和 ts 字段;

// flush acknowledges
// 逐一獲取 acklist 中的 sn 和 ts,編碼成 segment,以流的方式湊夠 MTU 發送
count = kcp->ackcount;		// 需要應答的分片數量
for (i = 0; i < count; i++) {
    size = (int)(ptr - buffer);
    // 超過 MTU 大小直接發送
    if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
        ikcp_output(kcp, buffer, size);
        ptr = buffer; // 新建分片
    }
    ikcp_ack_get(kcp, i, &seg.sn, &seg.ts); // 應答包
    ptr = ikcp_encode_seg(ptr, &seg);       // 編碼segment協議頭
}
 
kcp->ackcount = 0;

檢查當前是否需要對遠端窗口進行探測。由於 KCP 流量控制依賴於遠端通知其可接受窗口的大小,一旦遠端接受窗口 kcp->rmt_wnd 爲 0,那麼本地將不會再向遠端發送數據,因此就沒有機會從遠端接受 ACK 報文,從而沒有機會更新遠端窗口大小。在這種情況下,KCP 需要發送窗口探測報文到遠端,待遠端回覆窗口大小後,後續傳輸可以繼續:

// probe window size (if remote window size equals zero)
// 1、遠端窗口大小爲0,需要發送窗口探測報文
if (kcp->rmt_wnd == 0) {
    // 初始化探測間隔和下一次探測時間
    if (kcp->probe_wait == 0) { 
        kcp->probe_wait = IKCP_PROBE_INIT;  // 默認7秒探測
        kcp->ts_probe = kcp->current + kcp->probe_wait; // 下一次探測時間
    }	
    else {
        //遠端窗口爲0,發送過探測請求,但是已經超過下次探測的時間  
        // 檢測是否到了探測時間
        if (_itimediff(kcp->current, kcp->ts_probe) >= 0) { 
            // 更新探測間隔probe_wait
            if (kcp->probe_wait < IKCP_PROBE_INIT) 
                kcp->probe_wait = IKCP_PROBE_INIT;
            kcp->probe_wait += kcp->probe_wait / 2;
            if (kcp->probe_wait > IKCP_PROBE_LIMIT)
                kcp->probe_wait = IKCP_PROBE_LIMIT;
            // 更新下次探測時間ts_probe
            kcp->ts_probe = kcp->current + kcp->probe_wait;
            // 更新探測變量probe爲IKCP_ASK_SEND,發送探測消息 
            kcp->probe |= IKCP_ASK_SEND;
        }
    }
}	
// 2、遠端窗口正常,則不需要發送窗口探測  
else {
    kcp->ts_probe = 0;	// 更新下次探測時間爲0
    kcp->probe_wait = 0; // 更新探測窗口等待時間爲0
}

將窗口探測報文和窗口回覆報文發送出去

// flush window probing commands
if (kcp->probe & IKCP_ASK_SEND) {
    seg.cmd = IKCP_CMD_WASK;	// 窗口探測[詢問對方窗口size]
    size = (int)(ptr - buffer);
    if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
        ikcp_output(kcp, buffer, size);
        ptr = buffer;
    }
    ptr = ikcp_encode_seg(ptr, &seg);
}
 
// flush window probing commands
if (kcp->probe & IKCP_ASK_TELL) {
    seg.cmd = IKCP_CMD_WINS;	// 窗口告知[告訴對方我方窗口size]
    size = (int)(ptr - buffer);
    if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
        ikcp_output(kcp, buffer, size);
        ptr = buffer;
    }
    ptr = ikcp_encode_seg(ptr, &seg);
}
 
kcp->probe = 0;	//清空標識

計算本次發送可用的窗口大小,這裏 KCP 採用了可以配置的策略,正常情況下,KCP 的窗口大小由發送窗口 snd_wnd,遠端接收窗口 rmt_wnd 以及根據流控計算得到的 kcp->cwnd 共同決定;但是當開啓了 nocwnd 模式時,窗口大小僅由前兩者決定;

// calculate window size 
// 若沒有流控,取發送窗口和遠端接收窗口最小值
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);      
// 若存在流控,則取當前擁塞窗口、發送窗口和遠端接收窗口三者最小值
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

將緩存在 snd_queue 中的數據移到 snd_buf 中等待發送

// move data from snd_queue to snd_buf
// 從snd_queue移動到snd_buf的數量不能超出對方的接收能力,發送符合擁塞範圍的分片
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
    IKCPSEG *newseg;
    if (iqueue_is_empty(&kcp->snd_queue)) break;
 
    newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
	
    iqueue_del(&newseg->node);
    iqueue_add_tail(&newseg->node, &kcp->snd_buf); // 添加到發送緩存
    kcp->nsnd_que--;
    kcp->nsnd_buf++;
    //設置數據分片的屬性
    newseg->conv = kcp->conv;
    newseg->cmd = IKCP_CMD_PUSH;
    newseg->wnd = seg.wnd;	// 告知對方當前的接收窗口
    newseg->ts = current;	// 當前時間
    newseg->sn = kcp->snd_nxt++;	// 序號
    newseg->una = kcp->rcv_nxt;		// 告訴對方可以發送的下一個包序號
    newseg->resendts = current;		// 當前發送的時間
    newseg->rto = kcp->rx_rto;		// 超時重傳的時間
    newseg->fastack = 0;			// 是否快速重傳
    newseg->xmit = 0;				// 重傳次數
}

在發送數據之前,先設置快重傳的次數和重傳間隔;KCP 允許設置快重傳的次數,即 fastresend 參數。例如設置 fastresend 爲 2,並且發送端發送了 1,2,3,4,5 幾個包,收到遠端的 ACK: 1, 3, 4, 5,當收到 ACK3 時,KCP 知道 2 被跳過 1 次,收到 ACK4 時,知道 2 被 “跳過” 了 2 次,此時可以認爲 2 號丟失,不用等超時,直接重傳 2 號包;每個報文的 fastack 記錄了該報文被跳過了幾次,由函數 ikcp_parse_fastack 更新。於此同時,KCP 也允許設置 nodelay 參數,當激活該參數時,每個報文的超時重傳時間將由 x2 變爲 x1.5,即加快報文重傳:

// calculate resent 
// 是否設置快重傳次數
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff; 
// 是否開啓nodelay
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

將 snd_buf 中的數據發送出去

// flush data segments
// 發送snd buf的分片,只要數據還在snd_buf 說明對方還沒有應答
// 1、新的報文,正常發送
// 2、超時重傳
// 3、快速重傳(如果有)
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
    IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
    int needsend = 0;
    // 1、如果該報文是第一次傳輸,那麼直接發送
    if (segment->xmit == 0) {   
        needsend = 1;
        segment->xmit++; // 分片發送次數 + 1
        segment->rto = kcp->rx_rto; // 超時時間間隔
        segment->resendts = current + segment->rto + rtomin; // 下一次要發送的時間
    }
    // 2、當前時間達到了該報文的重傳時間,但並沒有新的ack到達,出現丟包, 重傳  
    else if (_itimediff(current, segment->resendts) >= 0) { 
        needsend = 1;
        segment->xmit++;
        kcp->xmit++;
        // 根據 nodelay 參數更新重傳時間
        if (kcp->nodelay == 0) {
            segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
        } else {
            IINT32 step = (kcp->nodelay < 2)? ((IINT32)(segment->rto)) : kcp->rx_rto;
            segment->rto += step / 2; //報文超時等待時間更新,控制RTO=1.5 
        }
        segment->resendts = current + segment->rto;	//下一次發送的時間
        lost = 1;	// 丟包,反應到擁塞控制策略去了
    }
    // 3、該報文的的被跳過次數超過設置的快速重傳次數,需要重傳  
    else if (segment->fastack >= resent) {  
        if ((int)segment->xmit <= kcp->fastlimit || kcp->fastlimit <= 0) {
            needsend = 1;
            segment->xmit++;
            segment->fastack = 0;  // 重置該分片被跳過的次數
            segment->resendts = current + segment->rto;
            change++;	// 標識快速重傳的發生
        }
    }
 
    // 需要發送數據
    if (needsend) {
        int need;
        segment->ts = current;
        segment->wnd = seg.wnd; // 己方可用接收窗口大小
        segment->una = kcp->rcv_nxt;   // 待接收的下一個包序號
 
        size = (int)(ptr - buffer);
        need = IKCP_OVERHEAD + segment->len;
 
        // 小包封裝成大包發送
        if (size + need > (int)kcp->mtu) {		
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
 
        // 把segment封裝成線性buffer發送 頭部+數據
        ptr = ikcp_encode_seg(ptr, segment);    
 
        if (segment->len > 0) {
            memcpy(ptr, segment->data, segment->len);
            ptr += segment->len;
        }
 
        if (segment->xmit >= kcp->dead_link) {
            kcp->state = (IUINT32)-1;
        }
    }
}
 
// flash remain segments
size = (int)(ptr - buffer);	// 剩餘的數據
// 最終只要有數據要發送,一定發出去
if (size > 0) {
    ikcp_output(kcp, buffer, size);	
}

根據設置的 lost 和 change 更新窗口大小;注意 快重傳和丟包時的窗口更新算法不一致,這一點類似於 TCP 協議的擁塞控制和快恢復算法

// update ssthresh
//如果發生了快速重傳,擁塞窗口閾值降低爲當前未確認包數量的一半或最小值 
if (change) {   
    IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
    kcp->ssthresh = inflight / 2;
    if (kcp->ssthresh < IKCP_THRESH_MIN)
        kcp->ssthresh = IKCP_THRESH_MIN;
    kcp->cwnd = kcp->ssthresh + resent;	// 動態調整擁塞控制窗口
    kcp->incr = kcp->cwnd * kcp->mss;
}
// 如果發生了丟包,閾值減半, cwd 窗口保留爲 1
if (lost) {     
    kcp->ssthresh = cwnd / 2;
    if (kcp->ssthresh < IKCP_THRESH_MIN)
        kcp->ssthresh = IKCP_THRESH_MIN;
    kcp->cwnd = 1;  // 動態調整擁塞控制窗口 
    kcp->incr = kcp->mss;
}
 
if (kcp->cwnd < 1) {
    kcp->cwnd = 1;  
    kcp->incr = kcp->mss;
}

2.3、kcp 報文接收

ikcp_recv

應用層接收函數爲 ikcp_recv,主要做三件事

讀取組好包的數據 rcv_queue -> 用戶 buffer

將接收緩存 rcv_buf 的分片轉移到接收隊列 rcv_queue

如果有接收空間則將 kcp->probe |= IKCP_ASK_TELL ; 以在 update 的時候告知對方可以發送數據了。

首先檢測一下本次接收數據之後,是否需要進行窗口恢復。在前面的內容中解釋過,KCP 協議在遠端窗口爲 0 的時候將會停止發送數據,此時如果遠端調用 ikcp_recv 將數據從 rcv_queue 中移動到應用層 buffer 中之後,表明其可以再次接受數據,爲了能夠恢復數據的發送,遠端可以主動發送 IKCP_ASK_TELL 來告知窗口大小;

if (kcp->nrcv_que >= kcp->rcv_wnd)
    recover = 1;  // 標記可以開始窗口恢復

開始將 rcv_queue 中的數據根據分片編號 frg merge 起來,然後拷貝到用戶的 buffer 中。

// merge fragment   
// 將屬於同一個消息的各分片重組完整數據,並刪除rcv_queue中segment,nrcv_que減少 
// 經過 ikcp_send 發送的數據會進行分片,分片編號爲倒序序號,因此frg爲0的數據包標記着完整接收到了一次 send 發送過來的數據
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
    int fragment;
    seg = iqueue_entry(p, IKCPSEG, node);
    p = p->next;
 
    if (buffer) {
        memcpy(buffer, seg->data, seg->len); // 把queue的數據就放入用戶buffer
        buffer += seg->len;
    }
 
    len += seg->len;
    fragment = seg->frg;
 
    if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
        ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
    }
 
    if (ispeek == 0) {
        iqueue_del(&seg->node);
        ikcp_segment_delete(kcp, seg);  // 刪除節點
        kcp->nrcv_que--;    // nrcv_que接收隊列-1
    }
 
    // frg = 0,完整的數據接收到, 本次數據接收結束
    if (fragment == 0) 	// 
        break;
}

下一步將 rcv_buf 中的數據轉移到 rcv_queue 中,這個過程根據報文的 sn 編號來確保轉移到 rcv_queue 中的數據一定是按序的:

// move available data from rcv_buf -> rcv_queue
// 將 rcv_buf 中的數據轉移到 rev_queue 
// 根據報文的sn來確保轉移到 rcv_queue 中的數據一定是按序的
while (! iqueue_is_empty(&kcp->rcv_buf)) {
    seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
    // 1、根據 sn 確保數據是按序轉移到 rcv_queue 中
    // 2、接收隊列nrcv_que < 接收窗口rcv_wnd; 
    if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
        iqueue_del(&seg->node);
        kcp->nrcv_buf--;
        iqueue_add_tail(&seg->node, &kcp->rcv_queue);
        kcp->nrcv_que++;    // 接收隊列 有多少個分片 + 1
        kcp->rcv_nxt++;     // 接收序號 + 1
    } else {
        break;
    }
}

最後進行窗口恢復。此時如果 recover 標記爲 1,表明在此次接收之前,可用接收窗口爲 0,如果經過本次接收之後,可用窗口大於 0,將主動發送 IKCP_ASK_TELL 數據包來通知對方已可以接收數據:

// fast recover 
// nrcv_que小於rcv_wnd, 說明接收端有空間繼續接收數據了
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
    // ready to send back IKCP_CMD_WINS in ikcp_flush
    // tell remote my window size
    kcp->probe |= IKCP_ASK_TELL;
}

kcp_input

ikcp_recv 僅爲上層調用的接口,KCP 協議需要從底層接受數據到 rcv_buf 中,這是通過函數 ikcp_input 實現。ikcp_input 中的所有功能都在一個外層的循環中實現:

首先將接收到的數據包進行解碼,並進行基本的數據包長度和類型校驗;KCP 協議只會接收到前文中所介紹的四種數據包;

調用 ikcp_parse_una 來確定已經發送的數據包有哪些被對方接收到。KCP 中所有的報文類型均帶有 una 信息。發送端發送的數據都會緩存在 snd_buf 中,直到接收到對方確認信息之後纔會刪除。當接收到 una 信息後,表明 sn 小於 una 的數據包都已經被對方接收到,因此可以直接從 snd_buf 中刪除。同時調用 ikcp_shrink_buf 來更新 KCP 控制塊的 snd_una 數值。

// 刪除小於snd_buf中小於una的segment
ikcp_parse_una(kcp, una);       
// 更新snd_una爲snd_buf中seg->sn或kcp->snd_nxt ,更新下一個待應答的序號
ikcp_shrink_buf(kcp);

處理 IKCP_CMD_ACK 報文

if (cmd == IKCP_CMD_ACK) {
    if (_itimediff(kcp->current, ts) >= 0) { // 根據應答判斷rtt
        //更新rx_srtt,rx_rttval,計算kcp->rx_rto
        ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
    }
    //遍歷snd_buf中(snd_una, snd_nxt),將sn相等的刪除,直到大於sn  
    ikcp_parse_ack(kcp, sn);    // 將已經ack的分片刪除
    ikcp_shrink_buf(kcp);       // 更新控制塊的 snd_una
    if (flag == 0) {
        flag = 1;       //快速重傳標記
        maxack = sn;    // 記錄最大的 ACK 編號
        latest_ts = ts;
    }	else {
        if (_itimediff(sn, maxack) > 0) {
            maxack = sn;        // 記錄最大的 ACK 編號 
            latest_ts = ts;
        }
    }

處理 IKCP_CMD_PUSH 報文

else if (cmd == IKCP_CMD_PUSH) {	//接收到具體的數據包
    if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
         // 對該報文的確認 ACK 報文放入 ack 列表中
        ikcp_ack_push(kcp, sn, ts);
        //  判斷接收的數據分片編號是否符合要求,即:在接收窗口(滑動窗口)範圍之內
        if (_itimediff(sn, kcp->rcv_nxt) >= 0) {    // 是要接受起始的序號
            seg = ikcp_segment_new(kcp, len);
            seg->conv = conv;
            seg->cmd = cmd;
            seg->frg = frg;
            seg->wnd = wnd;
            seg->ts = ts;
            seg->sn = sn;
            seg->una = una;
            seg->len = len;
 
            if (len > 0) {
                memcpy(seg->data, data, len);
            }
		   
            // 將該報文插入到 rcv_buf 鏈表中
            ikcp_parse_data(kcp, seg);  
        }
    }
}

對於接收到的 IKCP_CMD_WASK 報文,直接標記下次將發送窗口通知報文;而對於報文 IKCP_CMD_WINS 無需做任何特殊操作;

else if (cmd == IKCP_CMD_WASK) {		
    // ready to send back IKCP_CMD_WINS in ikcp_flush 
    // tell remote my window size 
    // 如果是探測包,添加相應的標識位
    kcp->probe |= IKCP_ASK_TELL;        
}
else if (cmd == IKCP_CMD_WINS) {
    // do nothing,如果是 tell me 遠端窗口大小,什麼都不做
}

據記錄的最大的 ACK 編號 maxack 來更新 snd_buf 中的報文的 fastack,這個過程在介紹 ikcp_flush 中提到過,對於 fastack 大於設置的 resend 參數時,將立馬進行快重傳;

最後,根據接收到報文的 una 和 KCP 控制塊的 una 參數進行流控;

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9aXglFdjfNJwXUHpfz4-pQ