不一樣的網絡協議 -------KCP 協議詳解
1、kcp 的協議特點
1.1、RTO 不翻倍
RTO(Retransmission TimeOut),重傳超時時間。tcp x 2,kcp x 1.5,提高傳輸速度
1.2、選擇重傳
TCP 丟包時會全部重傳從該包開始以後的數據,而 KCP 選擇性重傳,只重傳真正丟失的數據包。
1.3、快速重傳
tcp 重傳模式
超時重傳:超過規定的時間 RTO 則重傳
快速重傳:收到三個冗餘 ACK,不去等待 RTO ,直接重傳
這裏指的是收到 fastresend 個失序報文後,不等待超時,直接重傳,減少丟包等待時間。
1.4、非延遲 ACK
tcp 爲充分利用帶寬,延遲發送 ACK,RTT 時間較大,延長了丟包時的判斷過程。而 kcp 的 ACK 是否延遲發送可以調節。
1.5、ACK + UNA
ARQ (自動重傳請求,Automatic Repeat-reQuest)模型響應有兩種方式
UNA:此編號前所有包已收到,tcp
ACK:該編號包已收到
只用 UNA 將導致全部重傳,只用 ACK 則丟失成本太高,以往協議都是二選其一。而 kcp 協議中,除去單獨的 ACK 包(精確)外,所有包都有 UNA 信息。
1.6、非退讓流控
KCP 正常模式同 TCP 一樣使用公平退讓法則,即發送窗口大小由:發送緩存大小、接收端剩餘接收緩存大小、丟包退讓、慢啓動這四要素決定。但傳送及時性要求很高的小數據時,可選擇僅用前兩項來控制發送頻率。以犧牲部分公平性及帶寬利用率之代價,換取了流暢傳輸的效果。
KCP 實時性好,但帶寬利用率較低,因爲非退讓流控,不斷嘗試發送數據,有效包不多,每個包應答,佔用一定的帶寬
2.kcp 實現
UDP 收到的報文通過 kcp_input 傳遞給 KCP,KCP 會對數據進行解包,重新封裝成應用層用戶數據,應用層通過 kcp_recv 獲取。應用層通過 kcp_send 發送數據,KCP 會把用戶數據拆分 kcp 報文,通過 kcp_output,以 UDP(send) 的方式發送。
2.1、kcp 數據結構
kcp 報文結構:
conv:會話編號,通信雙方必須一致。cmd:報文類型 IKCP_CMD_ACK 確認命令 IKCP_CMD_PUSH 數據推送命令 IKCP_CMD_WASK 接收窗口詢問大小命令 IKCP_CMD_WINS 接收窗口大小告知命令 wnd: 己方可用接收窗口大小,接收窗口大小 - 接收隊列大小 frg:segmen t 分片。0,最後一個分片。3 2 1 0sn:segment 報文的序列號。ts:發送時間戳,用於計算 RTO 和 RTTuna:待接收的序列號,其實確認號,表示該序列號之前的所有報文都收到了,可以刪除 len:數據長度,DATA 的長度 DATA: 用戶數據
kcp 使用的 Segment 定義如下
struct IKCPSEG
{
struct IQUEUEHEAD node; // 用來串接多個 KCP segment,即前向後向指針
IUINT32 conv; // 會話編號
IUINT32 cmd; // 報文類型
IUINT32 frg; // 分片
IUINT32 wnd; // 可用接收窗口大小(接收窗口大小-接收隊列大小)
IUINT32 ts; // 發送時刻的時間戳
IUINT32 sn; // 分片 segment 的序號
IUINT32 una; // 待接收消息序號
IUINT32 len; // 數據長度
IUINT32 resendts; // 下次超時重傳該報文的時間戳
IUINT32 rto; // 重傳超時時間
//發送端在發送過程中攜帶着RTO,該發送端會啓動一個定時器,進行定時,如果超過RTO就會重傳
IUINT32 fastack; // 收到ack時該分片被跳過的次數,用於快速重傳
IUINT32 xmit; // 記錄了該報文被傳輸了幾次
char data[1]; // 實際傳輸的數據 payload
};
每一個 KCP 用戶都需要調用 ikcp_create 創建一個 kcp 控制塊 ikcpcb。ikcpcb 結構用來實現整個 KCP 協議。
struct IKCPCB
{
IUINT32 conv; // 標識會話
IUINT32 mtu; // 最大傳輸單元,默認數據爲1400,最小爲50
IUINT32 mss; // 最大分片大小,不大於mtu
IUINT32 state; // 連接狀態(0xffffffff表示斷開連接)
IUINT32 snd_una; // 第一個未確認的包
IUINT32 snd_nxt; // 下一個待分配包的序號
IUINT32 rcv_nxt; // 待接收消息序號.爲了保證包的順序,接收方會維護一個接收窗口,接收窗口有一個起始序號rcv_nxt 以及尾序號rcv_nxt + rcv_wnd(接收窗口大小)
IUINT32 ts_recent;
IUINT32 ts_lastack;
IUINT32 ssthresh; // 擁塞窗口的閾值
IINT32 rx_rttval; // RTT的變化量,代表連接的抖動情況
IINT32 rx_srtt; // smoothed round trip time,平滑後的RTT;
IINT32 rx_rto; // 收ACK接收延遲計算出來的重傳超時時間
IINT32 rx_minrto; // 最小重傳超時時間
IUINT32 snd_wnd; // 發送窗口大小
IUINT32 rcv_wnd; // 接收窗口大小,本質上而言如果接收端一直不去讀取數據則rcv_queue就會滿(達到rcv_wnd)
IUINT32 rmt_wnd; // 遠端接收窗口大小
IUINT32 cwnd; // 擁塞窗口大小, 動態變化
IUINT32 probe; // 探查變量, IKCP_ASK_TELL表示告知遠端窗口大小。IKCP_ASK_SEND表示請求遠端告知窗口大小;
IUINT32 current;
IUINT32 interval; // 內部flush刷新間隔,對系統循環效率有非常重要影響, 間隔小了cpu佔用率高, 間隔大了響應慢
IUINT32 ts_flush; // 下次flush刷新的時間戳
IUINT32 xmit; // 發送segment的次數, 當segment的xmit增加時,xmit增加(重傳除外)
IUINT32 nrcv_buf; // 接收緩存中的消息數量
IUINT32 nsnd_buf; // 發送緩存中的消息數量
IUINT32 nrcv_que; // 接收隊列中消息數量
IUINT32 nsnd_que; // 發送隊列中消息數量
IUINT32 nodelay; // 是否啓動無延遲模式。無延遲模式rtomin將設置爲0,擁塞控制不啓動;
IUINT32 updated; //是否調用過update函數的標識;
IUINT32 ts_probe; // 下次探查窗口的時間戳;
IUINT32 probe_wait; // 探查窗口需要等待的時間;
IUINT32 dead_link; // 最大重傳次數,被認爲連接中斷;
IUINT32 incr; // 可發送的最大數據量;
struct IQUEUEHEAD snd_queue; //發送消息的隊列
struct IQUEUEHEAD rcv_queue; //接收消息的隊列, 確認過用戶可讀取
struct IQUEUEHEAD snd_buf; //發送消息的緩存
struct IQUEUEHEAD rcv_buf; //接收消息的緩存
IUINT32 *acklist; //待發送的ack的列表 當收到一個數據報文時,將其對應的 ACK 報文的 sn 號以及時間戳 ts
//同時加入到acklist 中,即形成如 [sn1, ts1, sn2, ts2 …] 的列表
IUINT32 ackcount; // 記錄 acklist 中存放的 ACK 報文的數量
IUINT32 ackblock; // acklist 數組的可用長度,當 acklist 的容量不足時,需要進行擴容
void *user; // 指針,可以任意放置代表用戶的數據,也可以設置程序中需要傳遞的變量;
char *buffer; // 存儲字節流信息
int fastresend; // 觸發快速重傳的重複ACK個數;
int fastlimit;
int nocwnd; // 取消擁塞控制
int stream; // 是否採用流傳輸模式
int logmask; // 日誌的類型,如IKCP_LOG_IN_DATA,方便調試
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);//發送消息的回調函數
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user); // 寫日誌的回調函數
};
2.2、kcp 報文發送
KCP 中,數據發送流程分爲:
-
上層應用調用 ikcp_send 將數據寫入 snd_queue
-
下層函數 ikcp_flush 決定將多少數據從 snd_queue 移動到 snd_buf,進行發送
ikcp_send
ikcp_send
ikcp_send 的功能:把用戶發送的數據根據 MSS 分片成 KCP 的數據包格式,插入待發送隊列
分片方式
流模式:檢測每個發送隊列⾥的分片是否達到 MSS,沒有達到則用新的數據填充分片。
消息模式:將用戶數據的每個分片設置 sn 和 frag,將分片後的數據存入發送隊列,接收方通過 sn 和 frag 解包。即使⼀個分片的數據量可能不能達到 MSS,也會作爲⼀個包發送出去。
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
// 1、如果KCP開啓流模式
if (kcp->stream != 0) {
if (!iqueue_is_empty(&kcp->snd_queue)) {
// 取出 snd_queue 中的最後一個報文,將其填充到 mss 的長度,設置frg爲0
IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
// 舊分片內數據長度小於mss
if (old->len < kcp->mss) {
int capacity = kcp->mss - old->len; // 還能容納的數據長度
int extend = (len < capacity)? len : capacity; // 需要填充的長度
seg = ikcp_segment_new(kcp, old->len + extend); // 新建segment
assert(seg);
if (seg == NULL) {
return -2;
}
// 新分片添加到發送隊列尾部
iqueue_add_tail(&seg->node, &kcp->snd_queue);
// 拷貝舊分片的數據到新分片
memcpy(seg->data, old->data, old->len);
// 將buffer中的數據也拷貝到新分片
if (buffer) {
memcpy(seg->data + old->len, buffer, extend);
buffer += extend; // buffer指向剩餘數據的開頭
}
seg->len = old->len + extend;
seg->frg = 0;
len -= extend; // 更新len爲剩餘數據長度
iqueue_del_init(&old->node); // 刪除old
ikcp_segment_delete(kcp, old);
}
}
if (len <= 0) {
return 0;
}
}
// 2、計算數據需要分成多少段報文
if (len <= (int)kcp->mss) count = 1; // mss 1376 + head 24 = mtu 1400
else count = (len + kcp->mss - 1) / kcp->mss;
if (count >= (int)IKCP_WND_RCV) return -2; // 超過對方的初始接收窗口
if (count == 0) count = 1;
// fragment
// 3、將數據全部新建 segment 插入發送隊列尾部,隊列計數遞增, frag 遞減
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) { // 仍有待發送的數據
memcpy(seg->data, buffer, size);
}
seg->len = size;
// 分片編號,逆序。流模式情況下分片編號不用填寫
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue); // 加入到 snd_queue 中
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
}
}
應用層調用 ikcp_send 之後將用戶數據置入 snd_queue 中,當 KCP 調用 ikcp_flush 時纔將數據從 snd_queue 中 移入到 snd_buf 中,然後調用 kcp->output() 發送。
檢查 kcp->update 是否更新,未更新直接返回。kcp->update 由 ikcp_update 更新,上層應用需要每隔一段時間(10-100ms)調用 ikcp_update 來驅動 KCP 發送數據;
// 'ikcp_update' haven't been called.
if (kcp->updated == 0) return;
準備將 acklist 中記錄的 ACK 報文發送出去,即從 acklist 中填充 ACK 報文的 sn 和 ts 字段;
// flush acknowledges
// 逐一獲取 acklist 中的 sn 和 ts,編碼成 segment,以流的方式湊夠 MTU 發送
count = kcp->ackcount; // 需要應答的分片數量
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
// 超過 MTU 大小直接發送
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer; // 新建分片
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts); // 應答包
ptr = ikcp_encode_seg(ptr, &seg); // 編碼segment協議頭
}
kcp->ackcount = 0;
檢查當前是否需要對遠端窗口進行探測。由於 KCP 流量控制依賴於遠端通知其可接受窗口的大小,一旦遠端接受窗口 kcp->rmt_wnd 爲 0,那麼本地將不會再向遠端發送數據,因此就沒有機會從遠端接受 ACK 報文,從而沒有機會更新遠端窗口大小。在這種情況下,KCP 需要發送窗口探測報文到遠端,待遠端回覆窗口大小後,後續傳輸可以繼續:
// probe window size (if remote window size equals zero)
// 1、遠端窗口大小爲0,需要發送窗口探測報文
if (kcp->rmt_wnd == 0) {
// 初始化探測間隔和下一次探測時間
if (kcp->probe_wait == 0) {
kcp->probe_wait = IKCP_PROBE_INIT; // 默認7秒探測
kcp->ts_probe = kcp->current + kcp->probe_wait; // 下一次探測時間
}
else {
//遠端窗口爲0,發送過探測請求,但是已經超過下次探測的時間
// 檢測是否到了探測時間
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
// 更新探測間隔probe_wait
if (kcp->probe_wait < IKCP_PROBE_INIT)
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->probe_wait += kcp->probe_wait / 2;
if (kcp->probe_wait > IKCP_PROBE_LIMIT)
kcp->probe_wait = IKCP_PROBE_LIMIT;
// 更新下次探測時間ts_probe
kcp->ts_probe = kcp->current + kcp->probe_wait;
// 更新探測變量probe爲IKCP_ASK_SEND,發送探測消息
kcp->probe |= IKCP_ASK_SEND;
}
}
}
// 2、遠端窗口正常,則不需要發送窗口探測
else {
kcp->ts_probe = 0; // 更新下次探測時間爲0
kcp->probe_wait = 0; // 更新探測窗口等待時間爲0
}
將窗口探測報文和窗口回覆報文發送出去
// flush window probing commands
if (kcp->probe & IKCP_ASK_SEND) {
seg.cmd = IKCP_CMD_WASK; // 窗口探測[詢問對方窗口size]
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
// flush window probing commands
if (kcp->probe & IKCP_ASK_TELL) {
seg.cmd = IKCP_CMD_WINS; // 窗口告知[告訴對方我方窗口size]
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->probe = 0; //清空標識
計算本次發送可用的窗口大小,這裏 KCP 採用了可以配置的策略,正常情況下,KCP 的窗口大小由發送窗口 snd_wnd,遠端接收窗口 rmt_wnd 以及根據流控計算得到的 kcp->cwnd 共同決定;但是當開啓了 nocwnd 模式時,窗口大小僅由前兩者決定;
// calculate window size
// 若沒有流控,取發送窗口和遠端接收窗口最小值
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
// 若存在流控,則取當前擁塞窗口、發送窗口和遠端接收窗口三者最小值
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
將緩存在 snd_queue 中的數據移到 snd_buf 中等待發送
// move data from snd_queue to snd_buf
// 從snd_queue移動到snd_buf的數量不能超出對方的接收能力,發送符合擁塞範圍的分片
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf); // 添加到發送緩存
kcp->nsnd_que--;
kcp->nsnd_buf++;
//設置數據分片的屬性
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd; // 告知對方當前的接收窗口
newseg->ts = current; // 當前時間
newseg->sn = kcp->snd_nxt++; // 序號
newseg->una = kcp->rcv_nxt; // 告訴對方可以發送的下一個包序號
newseg->resendts = current; // 當前發送的時間
newseg->rto = kcp->rx_rto; // 超時重傳的時間
newseg->fastack = 0; // 是否快速重傳
newseg->xmit = 0; // 重傳次數
}
在發送數據之前,先設置快重傳的次數和重傳間隔;KCP 允許設置快重傳的次數,即 fastresend 參數。例如設置 fastresend 爲 2,並且發送端發送了 1,2,3,4,5 幾個包,收到遠端的 ACK: 1, 3, 4, 5,當收到 ACK3 時,KCP 知道 2 被跳過 1 次,收到 ACK4 時,知道 2 被 “跳過” 了 2 次,此時可以認爲 2 號丟失,不用等超時,直接重傳 2 號包;每個報文的 fastack 記錄了該報文被跳過了幾次,由函數 ikcp_parse_fastack 更新。於此同時,KCP 也允許設置 nodelay 參數,當激活該參數時,每個報文的超時重傳時間將由 x2 變爲 x1.5,即加快報文重傳:
// calculate resent
// 是否設置快重傳次數
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
// 是否開啓nodelay
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
將 snd_buf 中的數據發送出去
// flush data segments
// 發送snd buf的分片,只要數據還在snd_buf 說明對方還沒有應答
// 1、新的報文,正常發送
// 2、超時重傳
// 3、快速重傳(如果有)
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
// 1、如果該報文是第一次傳輸,那麼直接發送
if (segment->xmit == 0) {
needsend = 1;
segment->xmit++; // 分片發送次數 + 1
segment->rto = kcp->rx_rto; // 超時時間間隔
segment->resendts = current + segment->rto + rtomin; // 下一次要發送的時間
}
// 2、當前時間達到了該報文的重傳時間,但並沒有新的ack到達,出現丟包, 重傳
else if (_itimediff(current, segment->resendts) >= 0) {
needsend = 1;
segment->xmit++;
kcp->xmit++;
// 根據 nodelay 參數更新重傳時間
if (kcp->nodelay == 0) {
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
} else {
IINT32 step = (kcp->nodelay < 2)? ((IINT32)(segment->rto)) : kcp->rx_rto;
segment->rto += step / 2; //報文超時等待時間更新,控制RTO=1.5
}
segment->resendts = current + segment->rto; //下一次發送的時間
lost = 1; // 丟包,反應到擁塞控制策略去了
}
// 3、該報文的的被跳過次數超過設置的快速重傳次數,需要重傳
else if (segment->fastack >= resent) {
if ((int)segment->xmit <= kcp->fastlimit || kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0; // 重置該分片被跳過的次數
segment->resendts = current + segment->rto;
change++; // 標識快速重傳的發生
}
}
// 需要發送數據
if (needsend) {
int need;
segment->ts = current;
segment->wnd = seg.wnd; // 己方可用接收窗口大小
segment->una = kcp->rcv_nxt; // 待接收的下一個包序號
size = (int)(ptr - buffer);
need = IKCP_OVERHEAD + segment->len;
// 小包封裝成大包發送
if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
// 把segment封裝成線性buffer發送 頭部+數據
ptr = ikcp_encode_seg(ptr, segment);
if (segment->len > 0) {
memcpy(ptr, segment->data, segment->len);
ptr += segment->len;
}
if (segment->xmit >= kcp->dead_link) {
kcp->state = (IUINT32)-1;
}
}
}
// flash remain segments
size = (int)(ptr - buffer); // 剩餘的數據
// 最終只要有數據要發送,一定發出去
if (size > 0) {
ikcp_output(kcp, buffer, size);
}
根據設置的 lost 和 change 更新窗口大小;注意 快重傳和丟包時的窗口更新算法不一致,這一點類似於 TCP 協議的擁塞控制和快恢復算法
// update ssthresh
//如果發生了快速重傳,擁塞窗口閾值降低爲當前未確認包數量的一半或最小值
if (change) {
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
kcp->ssthresh = inflight / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = kcp->ssthresh + resent; // 動態調整擁塞控制窗口
kcp->incr = kcp->cwnd * kcp->mss;
}
// 如果發生了丟包,閾值減半, cwd 窗口保留爲 1
if (lost) {
kcp->ssthresh = cwnd / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = 1; // 動態調整擁塞控制窗口
kcp->incr = kcp->mss;
}
if (kcp->cwnd < 1) {
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
2.3、kcp 報文接收
ikcp_recv
應用層接收函數爲 ikcp_recv,主要做三件事
讀取組好包的數據 rcv_queue -> 用戶 buffer
將接收緩存 rcv_buf 的分片轉移到接收隊列 rcv_queue
如果有接收空間則將 kcp->probe |= IKCP_ASK_TELL ; 以在 update 的時候告知對方可以發送數據了。
首先檢測一下本次接收數據之後,是否需要進行窗口恢復。在前面的內容中解釋過,KCP 協議在遠端窗口爲 0 的時候將會停止發送數據,此時如果遠端調用 ikcp_recv 將數據從 rcv_queue 中移動到應用層 buffer 中之後,表明其可以再次接受數據,爲了能夠恢復數據的發送,遠端可以主動發送 IKCP_ASK_TELL 來告知窗口大小;
if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1; // 標記可以開始窗口恢復
開始將 rcv_queue 中的數據根據分片編號 frg merge 起來,然後拷貝到用戶的 buffer 中。
// merge fragment
// 將屬於同一個消息的各分片重組完整數據,並刪除rcv_queue中segment,nrcv_que減少
// 經過 ikcp_send 發送的數據會進行分片,分片編號爲倒序序號,因此frg爲0的數據包標記着完整接收到了一次 send 發送過來的數據
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;
if (buffer) {
memcpy(buffer, seg->data, seg->len); // 把queue的數據就放入用戶buffer
buffer += seg->len;
}
len += seg->len;
fragment = seg->frg;
if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
}
if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg); // 刪除節點
kcp->nrcv_que--; // nrcv_que接收隊列-1
}
// frg = 0,完整的數據接收到, 本次數據接收結束
if (fragment == 0) //
break;
}
下一步將 rcv_buf 中的數據轉移到 rcv_queue 中,這個過程根據報文的 sn 編號來確保轉移到 rcv_queue 中的數據一定是按序的:
// move available data from rcv_buf -> rcv_queue
// 將 rcv_buf 中的數據轉移到 rev_queue
// 根據報文的sn來確保轉移到 rcv_queue 中的數據一定是按序的
while (! iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
// 1、根據 sn 確保數據是按序轉移到 rcv_queue 中
// 2、接收隊列nrcv_que < 接收窗口rcv_wnd;
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++; // 接收隊列 有多少個分片 + 1
kcp->rcv_nxt++; // 接收序號 + 1
} else {
break;
}
}
最後進行窗口恢復。此時如果 recover 標記爲 1,表明在此次接收之前,可用接收窗口爲 0,如果經過本次接收之後,可用窗口大於 0,將主動發送 IKCP_ASK_TELL 數據包來通知對方已可以接收數據:
// fast recover
// nrcv_que小於rcv_wnd, 說明接收端有空間繼續接收數據了
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
}
kcp_input
ikcp_recv 僅爲上層調用的接口,KCP 協議需要從底層接受數據到 rcv_buf 中,這是通過函數 ikcp_input 實現。ikcp_input 中的所有功能都在一個外層的循環中實現:
首先將接收到的數據包進行解碼,並進行基本的數據包長度和類型校驗;KCP 協議只會接收到前文中所介紹的四種數據包;
調用 ikcp_parse_una 來確定已經發送的數據包有哪些被對方接收到。KCP 中所有的報文類型均帶有 una 信息。發送端發送的數據都會緩存在 snd_buf 中,直到接收到對方確認信息之後纔會刪除。當接收到 una 信息後,表明 sn 小於 una 的數據包都已經被對方接收到,因此可以直接從 snd_buf 中刪除。同時調用 ikcp_shrink_buf 來更新 KCP 控制塊的 snd_una 數值。
// 刪除小於snd_buf中小於una的segment
ikcp_parse_una(kcp, una);
// 更新snd_una爲snd_buf中seg->sn或kcp->snd_nxt ,更新下一個待應答的序號
ikcp_shrink_buf(kcp);
處理 IKCP_CMD_ACK 報文
if (cmd == IKCP_CMD_ACK) {
if (_itimediff(kcp->current, ts) >= 0) { // 根據應答判斷rtt
//更新rx_srtt,rx_rttval,計算kcp->rx_rto
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
//遍歷snd_buf中(snd_una, snd_nxt),將sn相等的刪除,直到大於sn
ikcp_parse_ack(kcp, sn); // 將已經ack的分片刪除
ikcp_shrink_buf(kcp); // 更新控制塊的 snd_una
if (flag == 0) {
flag = 1; //快速重傳標記
maxack = sn; // 記錄最大的 ACK 編號
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
maxack = sn; // 記錄最大的 ACK 編號
latest_ts = ts;
}
}
處理 IKCP_CMD_PUSH 報文
else if (cmd == IKCP_CMD_PUSH) { //接收到具體的數據包
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
// 對該報文的確認 ACK 報文放入 ack 列表中
ikcp_ack_push(kcp, sn, ts);
// 判斷接收的數據分片編號是否符合要求,即:在接收窗口(滑動窗口)範圍之內
if (_itimediff(sn, kcp->rcv_nxt) >= 0) { // 是要接受起始的序號
seg = ikcp_segment_new(kcp, len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
if (len > 0) {
memcpy(seg->data, data, len);
}
// 將該報文插入到 rcv_buf 鏈表中
ikcp_parse_data(kcp, seg);
}
}
}
對於接收到的 IKCP_CMD_WASK 報文,直接標記下次將發送窗口通知報文;而對於報文 IKCP_CMD_WINS 無需做任何特殊操作;
else if (cmd == IKCP_CMD_WASK) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
// 如果是探測包,添加相應的標識位
kcp->probe |= IKCP_ASK_TELL;
}
else if (cmd == IKCP_CMD_WINS) {
// do nothing,如果是 tell me 遠端窗口大小,什麼都不做
}
據記錄的最大的 ACK 編號 maxack 來更新 snd_buf 中的報文的 fastack,這個過程在介紹 ikcp_flush 中提到過,對於 fastack 大於設置的 resend 參數時,將立馬進行快重傳;
最後,根據接收到報文的 una 和 KCP 控制塊的 una 參數進行流控;
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/9aXglFdjfNJwXUHpfz4-pQ