一篇搞懂 tcp,http,socket,socket 連接池之間的關係
前言
作爲一名開發人員我們經常會聽到 HTTP 協議、TCP/IP 協議、UDP 協議、Socket、Socket 長連接、Socket 連接池等字眼,然而它們之間的關係、區別及原理並不是所有人都能理解清楚,這篇文章就從網絡協議基礎開始到 Socket 連接池,一步一步解釋他們之間的關係。
七層網絡模型
首先從網絡通信的分層模型講起:七層模型,亦稱 OSI(Open System Interconnection) 模型。自下往上分爲:物理層、數據鏈路層、網絡層、傳輸層、會話層、表示層和應用層。所有有關通信的都離不開它,下面這張圖片介紹了各層所對應的一些協議和硬件
通過上圖,我知道 IP 協議對應於網絡層,TCP、UDP 協議對應於傳輸層,而 HTTP 協議對應於應用層,OSI 並沒有 Socket,那什麼是 Socket,後面我們將結合代碼具體詳細介紹。
TCP 和 UDP 連接
關於傳輸層 TCP、UDP 協議可能我們平時遇見的會比較多,有人說 TCP 是安全的,UDP 是不安全的,UDP 傳輸比 TCP 快,那爲什麼呢,我們先從 TCP 的連接建立的過程開始分析,然後解釋 UDP 和 TCP 的區別。
TCP 的三次握手和四次分手
我們知道 TCP 建立連接需要經過三次握手,而斷開連接需要經過四次分手,那三次握手和四次分手分別做了什麼和如何進行的。
第一次握手:建立連接。客戶端發送連接請求報文段,將 SYN 位置爲 1,Sequence Number 爲 x;然後,客戶端進入 SYN_SEND 狀態,等待服務器的確認;
第二次握手:服務器收到客戶端的 SYN 報文段,需要對這個 SYN 報文段進行確認,設置 Acknowledgment Number 爲 x+1(Sequence Number+1);同時,自己自己還要發送 SYN 請求信息,將 SYN 位置爲 1,Sequence Number 爲 y;服務器端將上述所有信息放到一個報文段(即 SYN+ACK 報文段)中,一併發送給客戶端,此時服務器進入 SYN_RECV 狀態;
第三次握手:客戶端收到服務器的 SYN+ACK 報文段。然後將 Acknowledgment Number 設置爲 y+1,向服務器發送 ACK 報文段,這個報文段發送完畢以後,客戶端和服務器端都進入 ESTABLISHED 狀態,完成 TCP 三次握手。
完成了三次握手,客戶端和服務器端就可以開始傳送數據。以上就是 TCP 三次握手的總體介紹。通信結束客戶端和服務端就斷開連接,需要經過四次分手確認。
第一次分手:主機 1(可以使客戶端,也可以是服務器端),設置 Sequence Number 和 Acknowledgment Number,向主機 2 發送一個 FIN 報文段;此時,主機 1 進入 FIN_WAIT_1 狀態;這表示主機 1 沒有數據要發送給主機 2 了;
第二次分手:主機 2 收到了主機 1 發送的 FIN 報文段,向主機 1 回一個 ACK 報文段,Acknowledgment Number 爲 Sequence Number 加 1;主機 1 進入 FIN_WAIT_2 狀態;主機 2 告訴主機 1,我 “同意” 你的關閉請求;
第三次分手:主機 2 向主機 1 發送 FIN 報文段,請求關閉連接,同時主機 2 進入 LAST_ACK 狀態;
第四次分手:主機 1 收到主機 2 發送的 FIN 報文段,向主機 2 發送 ACK 報文段,然後主機 1 進入 TIME_WAIT 狀態;主機 2 收到主機 1 的 ACK 報文段以後,就關閉連接;此時,主機 1 等待 2MSL 後依然沒有收到回覆,則證明 Server 端已正常關閉,那好,主機 1 也可以關閉連接了。
可以看到一次 tcp 請求的建立及關閉至少進行 7 次通信,這還不包過數據的通信,而 UDP 不需 3 次握手和 4 次分手。
TCP 和 UDP 的區別
1、TCP 是面向鏈接的,雖然說網絡的不安全不穩定特性決定了多少次握手都不能保證連接的可靠性,但 TCP 的三次握手在最低限度上 (實際上也很大程度上保證了) 保證了連接的可靠性; 而 UDP 不是面向連接的,UDP 傳送數據前並不與對方建立連接,對接收到的數據也不發送確認信號,發送端不知道數據是否會正確接收,當然也不用重發,所以說 UDP 是無連接的、不可靠的一種數據傳輸協議。
2、也正由於 1 所說的特點,使得 UDP 的開銷更小數據傳輸速率更高,因爲不必進行收發數據的確認,所以 UDP 的實時性更好。知道了 TCP 和 UDP 的區別,就不難理解爲何採用 TCP 傳輸協議的 MSN 比採用 UDP 的 QQ 傳輸文件慢了,但並不能說 QQ 的通信是不安全的,因爲程序員可以手動對 UDP 的數據收發進行驗證,比如發送方對每個數據包進行編號然後由接收方進行驗證啊什麼的,即使是這樣,UDP 因爲在底層協議的封裝上沒有采用類似 TCP 的 “三次握手” 而實現了 TCP 所無法達到的傳輸效率。
問題
關於傳輸層我們會經常聽到一些問題
1.TCP 服務器最大併發連接數是多少?
關於 TCP 服務器最大併發連接數有一種誤解就是 “因爲端口號上限爲 65535, 所以 TCP 服務器理論上的可承載的最大併發連接數也是 65535”。首先需要理解一條 TCP 連接的組成部分:客戶端 IP、客戶端端口、服務端 IP、服務端端口。所以對於 TCP 服務端進程來說,他可以同時連接的客戶端數量並不受限於可用端口號,理論上一個服務器的一個端口能建立的連接數是全球的 IP 數 * 每臺機器的端口數。實際併發連接數受限於 linux 可打開文件數,這個數是可以配置的,可以非常大,所以實際上受限於系統性能。通過 #ulimit -n 查看服務的最大文件句柄數,通過 ulimit -n xxx 修改 xxx 是你想要能打開的數量。也可以通過修改系統參數:
#vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536
2.爲什麼 TIME_WAIT 狀態還需要等 2MSL 後才能返回到 CLOSED 狀態?
這是因爲雖然雙方都同意關閉連接了,而且握手的 4 個報文也都協調和發送完畢,按理可以直接回到 CLOSED 狀態(就好比從 SYN_SEND 狀態到 ESTABLISH 狀態那樣);但是因爲我們必須要假想網絡是不可靠的,你無法保證你最後發送的 ACK 報文會一定被對方收到,因此對方處於 LAST_ACK 狀態下的 Socket 可能會因爲超時未收到 ACK 報文,而重發 FIN 報文,所以這個 TIME_WAIT 狀態的作用就是用來重發可能丟失的 ACK 報文。
3.TIME_WAIT 狀態還需要等 2MSL 後才能返回到 CLOSED 狀態會產生什麼問題
通信雙方建立 TCP 連接後,主動關閉連接的一方就會進入 TIME_WAIT 狀態,TIME_WAIT 狀態維持時間是兩個 MSL 時間長度,也就是在 1-4 分鐘,Windows 操作系統就是 4 分鐘。進入 TIME_WAIT 狀態的一般情況下是客戶端,一個 TIME_WAIT 狀態的連接就佔用了一個本地端口。一臺機器上端口號數量的上限是 65536 個,如果在同一臺機器上進行壓力測試模擬上萬的客戶請求,並且循環與服務端進行短連接通信,那麼這臺機器將產生 4000 個左右的 TIME_WAIT Socket,後續的短連接就會產生 address already in use : connect 的異常,如果使用 Nginx 作爲方向代理也需要考慮 TIME_WAIT 狀態,發現系統存在大量 TIME_WAIT 狀態的連接,通過調整內核參數解決。
vi /etc/sysctl.conf
編輯文件,加入以下內容:
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_fin_timeout = 30
然後執行 /sbin/sysctl -p 讓參數生效。
net.ipv4.tcp_syncookies = 1 表示開啓 SYN Cookies。當出現 SYN 等待隊列溢出時,啓用 cookies 來處理,可防範少量 SYN 攻擊,默認爲 0,表示關閉;
net.ipv4.tcp_tw_reuse = 1 表示開啓重用。允許將 TIME-WAIT sockets 重新用於新的 TCP 連接,默認爲 0,表示關閉;
net.ipv4.tcp_tw_recycle = 1 表示開啓 TCP 連接中 TIME-WAIT sockets 的快速回收,默認爲 0,表示關閉。
net.ipv4.tcp_fin_timeout 修改系統默認的 TIMEOUT 時間
HTTP 協議
關於 TCP/IP 和 HTTP 協議的關係,網絡有一段比較容易理解的介紹:“我們在傳輸數據時,可以只使用 (傳輸層)TCP/IP 協議,但是那樣的話,如果沒有應用層,便無法識別數據內容。如果想要使傳輸的數據有意義,則必須使用到應用層協議。應用層協議有很多,比如 HTTP、FTP、TELNET 等,也可以自己定義應用層協議。
HTTP 協議即超文本傳送協議 (Hypertext Transfer Protocol),是 Web 聯網的基礎,也是手機聯網常用的協議之一,WEB 使用 HTTP 協議作應用層協議,以封裝 HTTP 文本信息,然後使用 TCP/IP 做傳輸層協議將它發到網絡上。
由於 HTTP 在每次請求結束後都會主動釋放連接,因此 HTTP 連接是一種 “短連接”,要保持客戶端程序的在線狀態,需要不斷地向服務器發起連接請求。通常 的做法是即時不需要獲得任何數據,客戶端也保持每隔一段固定的時間向服務器發送一次“保持連接” 的請求,服務器在收到該請求後對客戶端進行回覆,表明知道 客戶端“在線”。若服務器長時間無法收到客戶端的請求,則認爲客戶端“下線”,若客戶端長時間無法收到服務器的回覆,則認爲網絡已經斷開。
下面是一個簡單的 HTTP Post application/json 數據內容的請求:
POST HTTP/1.1
Host: 127.0.0.1:9017
Content-Type: application/json
Cache-Control: no-cache
{"a":"a"}
關於 Socket(套接字)
現在我們瞭解到 TCP/IP 只是一個協議棧,就像操作系統的運行機制一樣,必須要具體實現,同時還要提供對外的操作接口。就像操作系統會提供標準的編程接口,比如 Win32 編程接口一樣,TCP/IP 也必須對外提供編程接口,這就是 Socket。現在我們知道,Socket 跟 TCP/IP 並沒有必然的聯繫。Socket 編程接口在設計的時候,就希望也能適應其他的網絡協議。所以,Socket 的出現只是可以更方便的使用 TCP/IP 協議棧而已,其對 TCP/IP 進行了抽象,形成了幾個最基本的函數接口。比如 create,listen,accept,connect,read 和 write 等等。
不同語言都有對應的建立 Socket 服務端和客戶端的庫,下面舉例 Nodejs 如何創建服務端和客戶端:
服務端:
const net = require('net');
const server = net.createServer();
server.on('connection', (client) => {
client.write('Hi!\n'); // 服務端向客戶端輸出信息,使用 write() 方法
client.write('Bye!\n');
//client.end(); // 服務端結束該次會話
});
server.listen(9000);
服務監聽 9000 端口
下面使用命令行發送 http 請求和 telnet
$ curl http://127.0.0.1:9000
Bye!
$telnet 127.0.0.1 9000
Trying 192.168.1.21...
Connected to 192.168.1.21.
Escape character is '^]'.
Hi!
Bye!
Connection closed by foreign host.
注意到 curl 只處理了一次報文。
客戶端
const client = new net.Socket();
client.connect(9000, '127.0.0.1', function () {
});
client.on('data', (chunk) => {
console.log('data', chunk.toString())
//data Hi!
//Bye!
});
Socket 長連接
所謂長連接,指在一個 TCP 連接上可以連續發送多個數據包,在 TCP 連接保持期間,如果沒有數據包發送,需要雙方發檢測包以維持此連接 (心跳包),一般需要自己做在線維持。短連接是指通信雙方有數據交互時,就建立一個 TCP 連接,數據發送完成後,則斷開此 TCP 連接。比如 Http 的,只是連接、請求、關閉,過程時間較短, 服務器若是一段時間內沒有收到請求即可關閉連接。其實長連接是相對於通常的短連接而說的,也就是長時間保持客戶端與服務端的連接狀態。
通常的短連接操作步驟是:
連接→數據傳輸→關閉連接;
而長連接通常就是:
連接→數據傳輸→保持連接 (心跳)→數據傳輸→保持連接 (心跳)→……→關閉連接;
什麼時候用長連接,短連接?
長連接多用於操作頻繁,點對點的通訊,而且連接數不能太多情況,。每個 TCP 連接都需要三步握手,這需要時間,如果每個操作都是先連接,再操作的話那麼處理 速度會降低很多,所以每個操作完後都不斷開,次處理時直接發送數據包就 OK 了,不用建立 TCP 連接。例如:數據庫的連接用長連接, 如果用短連接頻繁的通信會造成 Socket 錯誤,而且頻繁的 Socket 創建也是對資源的浪費。
什麼是心跳包爲什麼需要:
心跳包就是在客戶端和服務端間定時通知對方自己狀態的一個自己定義的命令字,按照一定的時間間隔發送,類似於心跳,所以叫做心跳包。網絡中的接收和發送數據都是使用 Socket 進行實現。但是如果此套接字已經斷開(比如一方斷網了),那發送數據和接收數據的時候就一定會有問題。可是如何判斷這個套接字是否還可以使用呢?這個就需要在系統中創建心跳機制。其實 TCP 中已經爲我們實現了一個叫做心跳的機制。如果你設置了心跳,那 TCP 就會在一定的時間(比如你設置的是 3 秒鐘)內發送你設置的次數的心跳(比如說 2 次),並且此信息不會影響你自己定義的協議。也可以自己定義,所謂 “心跳” 就是定時發送一個自定義的結構體(心跳包或心跳幀),讓對方知道自己“在線”, 以確保鏈接的有效性。
實現:
服務端:
const net = require('net');
let clientList = [];
const heartbeat = 'HEARTBEAT'; // 定義心跳包內容確保和平時發送的數據不會衝突
const server = net.createServer();
server.on('connection', (client) => {
console.log('客戶端建立連接:', client.remoteAddress + ':' + client.remotePort);
clientList.push(client);
client.on('data', (chunk) => {
let content = chunk.toString();
if (content === heartbeat) {
console.log('收到客戶端發過來的一個心跳包');
} else {
console.log('收到客戶端發過來的數據:', content);
client.write('服務端的數據:' + content);
}
});
client.on('end', () => {
console.log('收到客戶端end');
clientList.splice(clientList.indexOf(client), 1);
});
client.on('error', () => {
clientList.splice(clientList.indexOf(client), 1);
})
});
server.listen(9000);
setInterval(broadcast, 10000); // 定時發送心跳包
function broadcast() {
console.log('broadcast heartbeat', clientList.length);
let cleanup = []
for (let i=0;i<clientList.length;i+=1) {
if (clientList[i].writable) { // 先檢查 sockets 是否可寫
clientList[i].write(heartbeat);
} else {
console.log('一個無效的客戶端');
cleanup.push(clientList[i]); // 如果不可寫,收集起來銷燬。銷燬之前要 Socket.destroy() 用 API 的方法銷燬。
clientList[i].destroy();
}
}
//Remove dead Nodes out of write loop to avoid trashing loop index
for (let i=0; i<cleanup.length; i+=1) {
console.log('刪除無效的客戶端:', cleanup[i].name);
clientList.splice(clientList.indexOf(cleanup[i]), 1);
}
}
服務端輸出結果:
客戶端建立連接: ::ffff:127.0.0.1:57125
broadcast heartbeat 1
收到客戶端發過來的數據: Thu, 29 Mar 2018 03:45:15 GMT
收到客戶端發過來的一個心跳包
收到客戶端發過來的數據: Thu, 29 Mar 2018 03:45:20 GMT
broadcast heartbeat 1
收到客戶端發過來的數據: Thu, 29 Mar 2018 03:45:25 GMT
收到客戶端發過來的一個心跳包
客戶端建立連接: ::ffff:127.0.0.1:57129
收到客戶端發過來的一個心跳包
收到客戶端發過來的數據: Thu, 29 Mar 2018 03:46:00 GMT
收到客戶端發過來的數據: Thu, 29 Mar 2018 03:46:04 GMT
broadcast heartbeat 2
收到客戶端發過來的數據: Thu, 29 Mar 2018 03:46:05 GMT
收到客戶端發過來的一個心跳包
客戶端代碼:
const net = require('net');
const heartbeat = 'HEARTBEAT';
const client = new net.Socket();
client.connect(9000, '127.0.0.1', () => {});
client.on('data', (chunk) => {
let content = chunk.toString();
if (content === heartbeat) {
console.log('收到心跳包:', content);
} else {
console.log('收到數據:', content);
}
});
// 定時發送數據
setInterval(() => {
console.log('發送數據', new Date().toUTCString());
client.write(new Date().toUTCString());
}, 5000);
// 定時發送心跳包
setInterval(function () {
client.write(heartbeat);
}, 10000);
客戶端輸出結果:
發送數據 Thu, 29 Mar 2018 03:46:04 GMT
收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:04 GMT
收到心跳包: HEARTBEAT
發送數據 Thu, 29 Mar 2018 03:46:09 GMT
收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:09 GMT
發送數據 Thu, 29 Mar 2018 03:46:14 GMT
收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:14 GMT
收到心跳包: HEARTBEAT
發送數據 Thu, 29 Mar 2018 03:46:19 GMT
收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:19 GMT
發送數據 Thu, 29 Mar 2018 03:46:24 GMT
收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:24 GMT
收到心跳包: HEARTBEAT
定義自己的協議
如果想要使傳輸的數據有意義,則必須使用到應用層協議比如 Http、Mqtt、Dubbo 等。基於 TCP 協議上自定義自己的應用層的協議需要解決的幾個問題:
-
心跳包格式的定義及處理
-
報文頭的定義,就是你發送數據的時候需要先發送報文頭,報文裏面能解析出你將要發送的數據長度
-
你發送數據包的格式,是 json 的還是其他序列化的方式
下面我們就一起來定義自己的協議,並編寫服務的和客戶端進行調用:
定義報文頭格式:length:000000000xxxx; xxxx 代表數據的長度,總長度 20, 舉例子不嚴謹。
數據表的格式: Json
服務端:
const net = require('net');
const server = net.createServer();
let clientList = [];
const heartBeat = 'HeartBeat'; // 定義心跳包內容確保和平時發送的數據不會衝突
const getHeader = (num) => {
return 'length:' + (Array(13).join(0) + num).slice(-13);
}
server.on('connection', (client) => {
client.name = client.remoteAddress + ':' + client.remotePort
// client.write('Hi ' + client.name + '!\n');
console.log('客戶端建立連接', client.name);
clientList.push(client)
let chunks = [];
let length = 0;
client.on('data', (chunk) => {
let content = chunk.toString();
console.log("content:", content, content.length);
if (content === heartBeat) {
console.log('收到客戶端發過來的一個心跳包');
} else {
if (content.indexOf('length:') === 0){
length = parseInt(content.substring(7,20));
console.log('length', length);
chunks =[chunk.slice(20, chunk.length)];
} else {
chunks.push(chunk);
}
let heap = Buffer.concat(chunks);
console.log('heap.length', heap.length)
if (heap.length >= length) {
try {
console.log('收到數據', JSON.parse(heap.toString()));
let data = '服務端的數據數據:' + heap.toString();;
let dataBuff = Buffer.from(JSON.stringify(data));
let header = getHeader(dataBuff.length)
client.write(header);
client.write(dataBuff);
} catch (err) {
console.log('數據解析失敗');
}
}
}
})
client.on('end', () => {
console.log('收到客戶端end');
clientList.splice(clientList.indexOf(client), 1);
});
client.on('error', () => {
clientList.splice(clientList.indexOf(client), 1);
})
});
server.listen(9000);
setInterval(broadcast, 10000); // 定時檢查客戶端 併發送心跳包
function broadcast() {
console.log('broadcast heartbeat', clientList.length);
let cleanup = []
for(var i=0;i<clientList.length;i+=1) {
if(clientList[i].writable) { // 先檢查 sockets 是否可寫
// clientList[i].write(heartBeat); // 發送心跳數據
} else {
console.log('一個無效的客戶端')
cleanup.push(clientList[i]) // 如果不可寫,收集起來銷燬。銷燬之前要 Socket.destroy() 用 API 的方法銷燬。
clientList[i].destroy();
}
}
// 刪除無效的客戶端
for(i=0; i<cleanup.length; i+=1) {
console.log('刪除無效的客戶端:', cleanup[i].name);
clientList.splice(clientList.indexOf(cleanup[i]), 1)
}
}
日誌打印:
客戶端建立連接 ::ffff:127.0.0.1:50178
content: length:0000000000031 20
length 31
heap.length 0
content: "Tue, 03 Apr 2018 06:12:37 GMT" 31
heap.length 31
收到數據 Tue, 03 Apr 2018 06:12:37 GMT
broadcast heartbeat 1
content: HeartBeat 9
收到客戶端發過來的一個心跳包
content: length:0000000000031"Tue, 03 Apr 2018 06:12:42 GMT" 51
length 31
heap.length 31
收到數據 Tue, 03 Apr 2018 06:12:42 GMT
客戶端
const net = require('net');
const client = new net.Socket();
const heartBeat = 'HeartBeat'; // 定義心跳包內容確保和平時發送的數據不會衝突
const getHeader = (num) => {
return 'length:' + (Array(13).join(0) + num).slice(-13);
}
client.connect(9000, '127.0.0.1', function () {});
let chunks = [];
let length = 0;
client.on('data', (chunk) => {
let content = chunk.toString();
console.log("content:", content, content.length);
if (content === heartBeat) {
console.log('收到服務端發過來的一個心跳包');
} else {
if (content.indexOf('length:') === 0){
length = parseInt(content.substring(7,20));
console.log('length', length);
chunks =[chunk.slice(20, chunk.length)];
} else {
chunks.push(chunk);
}
let heap = Buffer.concat(chunks);
console.log('heap.length', heap.length)
if (heap.length >= length) {
try {
console.log('收到數據', JSON.parse(heap.toString()));
} catch (err) {
console.log('數據解析失敗');
}
}
}
});
// 定時發送數據
setInterval(function () {
let data = new Date().toUTCString();
let dataBuff = Buffer.from(JSON.stringify(data));
let header =getHeader(dataBuff.length);
client.write(header);
client.write(dataBuff);
}, 5000);
// 定時發送心跳包
setInterval(function () {
client.write(heartBeat);
}, 10000);
日誌打印:
content: length:0000000000060 20
length 60
heap.length 0
content: "服務端的數據數據:\"Tue, 03 Apr 2018 06:12:37 GMT\"" 44
heap.length 60
收到數據 服務端的數據數據:"Tue, 03 Apr 2018 06:12:37 GMT"
content: length:0000000000060"服務端的數據數據:\"Tue, 03 Apr 2018 06:12:42 GMT\"" 64
length 60
heap.length 60
收到數據 服務端的數據數據:"Tue, 03 Apr 2018 06:12:42 GMT"
客戶端定時發送自定義協議數據到服務端,先發送頭數據,在發送內容數據,另外一個定時器發送心跳數據,服務端判斷是心跳數據,再判斷是不是頭數據,再是內容數據,然後解析後再發送數據給客戶端。從日誌的打印可以看出客戶端先後 writeheader 和 data 數據,服務端可能在一個 data 事件裏面接收到。
這裏可以看到一個客戶端在同一個時間內處理一個請求可以很好的工作,但是想象這麼一個場景,如果同一時間內讓同一個客戶端去多次調用服務端請求,發送多次頭數據和內容數據,服務端的 data 事件收到的數據就很難區別哪些數據是哪次請求的,比如兩次頭數據同時到達服務端,服務端就會忽略其中一次,而後面的內容數據也不一定就對應於這個頭的。所以想複用長連接並能很好的高併發處理服務端請求,就需要連接池這種方式了。
Socket 連接池
什麼是 Socket 連接池, 池的概念可以聯想到是一種資源的集合,所以 Socket 連接池,就是維護着一定數量 Socket 長連接的集合。它能自動檢測 Socket 長連接的有效性,剔除無效的連接,補充連接池的長連接的數量。從代碼層次上其實是人爲實現這種功能的類,一般一個連接池包含下面幾個屬性:
-
空閒可使用的長連接隊列
-
正在運行的通信的長連接隊列
-
等待去獲取一個空閒長連接的請求的隊列
-
無效長連接的剔除功能
-
長連接資源池的數量配置
-
長連接資源的新建功能
場景:一個請求過來,首先去資源池要求獲取一個長連接資源,如果空閒隊列裏面有長連接,就獲取到這個長連接 Socket, 並把這個 Socket 移到正在運行的長連接隊列。如果空閒隊列裏面沒有,且正在運行的隊列長度小於配置的連接池資源的數量,就新建一個長連接到正在運行的隊列去,如果正在運行的不下於配置的資源池長度,則這個請求進入到等待隊列去。當一個正在運行的 Socket 完成了請求,就從正在運行的隊列移到空閒的隊列,並觸發等待請求隊列去獲取空閒資源,如果有等待的情況。
這裏簡單介紹 Nodejs 的 Socket 連接池 generic-pool 模塊的源碼。
主要文件目錄結構
.
|————lib ------------------------- 代碼庫
| |————DefaultEvictor.js ----------
| |————Deferred.js ----------------
| |————Deque.js -------------------
| |————DequeIterator.js -----------
| |————DoublyLinkedList.js --------
| |————DoublyLinkedListIterator.js-
| |————factoryValidator.js --------
| |————Pool.js -------------------- 連接池主要代碼
| |————PoolDefaults.js ------------
| |————PooledResource.js ----------
| |————Queue.js ------------------- 隊列
| |————ResourceLoan.js ------------
| |————ResourceRequest.js ---------
| |————utils.js ------------------- 工具
|————test ------------------------- 測試目錄
|————README.md ------------------- 項目描述文件
|————.eslintrc ------------------- eslint靜態檢查配置文件
|————.eslintignore --------------- eslint靜態檢查忽略的文件
|————package.json ----------------- npm包依賴配置
下面介紹庫的使用:
初始化連接池
'use strict';
const net = require('net');
const genericPool = require('generic-pool');
function createPool(conifg) {
let options = Object.assign({
fifo: true, // 是否優先使用老的資源
priorityRange: 1, // 優先級
testOnBorrow: true, // 是否開啓獲取驗證
// acquireTimeoutMillis: 10 * 1000, // 獲取的超時時間
autostart: true, // 自動初始化和釋放調度啓用
min: 10, // 初始化連接池保持的長連接最小數量
max: 0, // 最大連接池保持的長連接數量
evictionRunIntervalMillis: 0, // 資源釋放檢驗間隔檢查 設置了下面幾個參數才起效果
numTestsPerEvictionRun: 3, // 每次釋放資源數量
softIdleTimeoutMillis: -1, // 可用的超過了最小的min 且空閒時間時間 達到釋放
idleTimeoutMillis: 30000 // 強制釋放
// maxWaitingClients: 50 // 最大等待
}, conifg.options);
const factory = {
create: function () {
return new Promise((resolve, reject) => {
let socket = new net.Socket();
socket.setKeepAlive(true);
socket.connect(conifg.port, conifg.host);
// TODO 心跳包的處理邏輯
socket.on('connect', () => {
console.log('socket_pool', conifg.host, conifg.port, 'connect' );
resolve(socket);
});
socket.on('close', (err) => { // 先end 事件再close事件
console.log('socket_pool', conifg.host, conifg.port, 'close', err);
});
socket.on('error', (err) => {
console.log('socket_pool', conifg.host, conifg.port, 'error', err);
reject(err);
});
});
},
//銷燬連接
destroy: function (socket) {
return new Promise((resolve) => {
socket.destroy(); // 不會觸發end 事件 第一次會觸發發close事件 如果有message會觸發error事件
resolve();
});
},
validate: function (socket) { //獲取資源池校驗資源有效性
return new Promise((resolve) => {
// console.log('socket.destroyed:', socket.destroyed, 'socket.readable:', socket.readable, 'socket.writable:', socket.writable);
if (socket.destroyed || !socket.readable || !socket.writable) {
return resolve(false);
} else {
return resolve(true);
}
});
}
};
const pool = genericPool.createPool(factory, options);
pool.on('factoryCreateError', (err) => { // 監聽新建長連接出錯 讓請求直接返回錯誤
const clientResourceRequest = pool._waitingClientsQueue.dequeue();
if (clientResourceRequest) {
clientResourceRequest.reject(err);
}
});
return pool;
};
let pool = createPool({
port: 9000,
host: '127.0.0.1',
options: {min: 0, max: 10}
});
使用連接池
下面連接池的使用,使用的協議是我們之前自定義的協議。
let pool = createPool({
port: 9000,
host: '127.0.0.1',
options: {min: 0, max: 10}
});
const getHeader = (num) => {
return 'length:' + (Array(13).join(0) + num).slice(-13);
}
const request = async (requestDataBuff) => {
let client;
try {
client = await pool.acquire();
} catch (e) {
console.log('acquire socket client failed: ', e);
throw e;
}
let timeout = 10000;
return new Promise((resolve, reject) => {
let chunks = [];
let length = 0;
client.setTimeout(timeout);
client.removeAllListeners('error');
client.on('error', (err) => {
client.removeAllListeners('error');
client.removeAllListeners('data');
client.removeAllListeners('timeout');
pool.destroyed(client);
reject(err);
});
client.on('timeout', () => {
client.removeAllListeners('error');
client.removeAllListeners('data');
client.removeAllListeners('timeout');
// 應該銷燬以防下一個req的data事件監聽才返回數據
pool.destroy(client);
// pool.release(client);
reject(`socket connect timeout set ${timeout}`);
});
let header = getHeader(requestDataBuff.length);
client.write(header);
client.write(requestDataBuff);
client.on('data', (chunk) => {
let content = chunk.toString();
console.log('content', content, content.length);
// TODO 過濾心跳包
if (content.indexOf('length:') === 0){
length = parseInt(content.substring(7,20));
console.log('length', length);
chunks =[chunk.slice(20, chunk.length)];
} else {
chunks.push(chunk);
}
let heap = Buffer.concat(chunks);
console.log('heap.length', heap.length);
if (heap.length >= length) {
pool.release(client);
client.removeAllListeners('error');
client.removeAllListeners('data');
client.removeAllListeners('timeout');
try {
// console.log('收到數據', JSON.parse(heap.toString()));
resolve(JSON.parse(heap.toString()));
} catch (err) {
reject(err);
console.log('數據解析失敗');
}
}
});
});
}
request(Buffer.from(JSON.stringify({a: 'a'})))
.then((data) => {
console.log('收到服務的數據',data)
}).catch(err => {
console.log(err);
});
request(Buffer.from(JSON.stringify({b: 'b'})))
.then((data) => {
console.log('收到服務的數據',data)
}).catch(err => {
console.log(err);
});
setTimeout(function () { //查看是否會複用Socket 有沒有建立新的連接
request(Buffer.from(JSON.stringify({c: 'c'})))
.then((data) => {
console.log('收到服務的數據',data)
}).catch(err => {
console.log(err);
});
request(Buffer.from(JSON.stringify({d: 'd'})))
.then((data) => {
console.log('收到服務的數據',data)
}).catch(err => {
console.log(err);
});
}, 1000)
日誌打印:
socket_pool 127.0.0.1 9000 connect
socket_pool 127.0.0.1 9000 connect
content length:0000000000040"服務端的數據數據:{\"a\":\"a\"}" 44
length 40
heap.length 40
收到服務的數據 服務端的數據數據:{"a":"a"}
content length:0000000000040"服務端的數據數據:{\"b\":\"b\"}" 44
length 40
heap.length 40
收到服務的數據 服務端的數據數據:{"b":"b"}
content length:0000000000040 20
length 40
heap.length 0
content "服務端的數據數據:{\"c\":\"c\"}" 24
heap.length 40
收到服務的數據 服務端的數據數據:{"c":"c"}
content length:0000000000040"服務端的數據數據:{\"d\":\"d\"}" 44
length 40
heap.length 40
收到服務的數據 服務端的數據數據:{"d":"d"}
這裏看到前面兩個請求都建立了新的 Socket 連接 socket_pool 127.0.0.1 9000 connect,定時器結束後重新發起兩個請求就沒有建立新的 Socket 連接了,直接從連接池裏面獲取 Socket 連接資源。
源碼分析
發現主要的代碼就位於 lib 文件夾中的 Pool.js
構造函數:
lib/Pool.js
/**
* Generate an Object pool with a specified `factory` and `config`.
*
* @param {typeof DefaultEvictor} Evictor
* @param {typeof Deque} Deque
* @param {typeof PriorityQueue} PriorityQueue
* @param {Object} factory
* Factory to be used for generating and destroying the items.
* @param {Function} factory.create
* Should create the item to be acquired,
* and call it's first callback argument with the generated item as it's argument.
* @param {Function} factory.destroy
* Should gently close any resources that the item is using.
* Called before the items is destroyed.
* @param {Function} factory.validate
* Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false
* If it should be removed from pool.
* @param {Object} options
*/
constructor(Evictor, Deque, PriorityQueue, factory, options) {
super();
factoryValidator(factory); // 檢驗我們定義的factory的有效性包含create destroy validate
this._config = new PoolOptions(options); // 連接池配置
// TODO: fix up this ugly glue-ing
this._Promise = this._config.Promise;
this._factory = factory;
this._draining = false;
this._started = false;
/**
* Holds waiting clients
* @type {PriorityQueue}
*/
this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange); // 請求的對象管管理隊列queue 初始化queue的size 1 { _size: 1, _slots: [ Queue { _list: [Object] } ] }
/**
* Collection of promises for resource creation calls made by the pool to factory.create
* @type {Set}
*/
this._factoryCreateOperations = new Set(); // 正在創建的長連接
/**
* Collection of promises for resource destruction calls made by the pool to factory.destroy
* @type {Set}
*/
this._factoryDestroyOperations = new Set(); // 正在銷燬的長連接
/**
* A queue/stack of pooledResources awaiting acquisition
* TODO: replace with LinkedList backed array
* @type {Deque}
*/
this._availableObjects = new Deque(); // 空閒的資源長連接
/**
* Collection of references for any resource that are undergoing validation before being acquired
* @type {Set}
*/
this._testOnBorrowResources = new Set(); // 正在檢驗有效性的資源
/**
* Collection of references for any resource that are undergoing validation before being returned
* @type {Set}
*/
this._testOnReturnResources = new Set();
/**
* Collection of promises for any validations currently in process
* @type {Set}
*/
this._validationOperations = new Set();// 正在校驗的中間temp
/**
* All objects associated with this pool in any state (except destroyed)
* @type {Set}
*/
this._allObjects = new Set(); // 所有的鏈接資源 是一個 PooledResource對象
/**
* Loans keyed by the borrowed resource
* @type {Map}
*/
this._resourceLoans = new Map(); // 被借用的對象的map release的時候用到
/**
* Infinitely looping iterator over available object
* @type {DequeIterator}
*/
this._evictionIterator = this._availableObjects.iterator(); // 一個迭代器
this._evictor = new Evictor();
/**
* handle for setTimeout for next eviction run
* @type {(number|null)}
*/
this._scheduledEviction = null;
// create initial resources (if factory.min > 0)
if (this._config.autostart === true) { // 初始化最小的連接數量
this.start();
}
}
可以看到包含之前說的空閒的資源隊列,正在請求的資源隊列,正在等待的請求隊列等。
下面查看 Pool.acquire 方法
lib/Pool.js
/**
* Request a new resource. The callback will be called,
* when a new resource is available, passing the resource to the callback.
* TODO: should we add a seperate "acquireWithPriority" function
*
* @param {Number} [priority=0]
* Optional. Integer between 0 and (priorityRange - 1). Specifies the priority
* of the caller if there are no available resources. Lower numbers mean higher
* priority.
*
* @returns {Promise}
*/
acquire(priority) { // 空閒資源隊列資源是有優先等級的
if (this._started === false && this._config.autostart === false) {
this.start(); // 會在this._allObjects 添加min的連接對象數
}
if (this._draining) { // 如果是在資源釋放階段就不能再請求資源了
return this._Promise.reject(
new Error("pool is draining and cannot accept work")
);
}
// 如果要設置了等待隊列的長度且要等待 如果超過了就返回資源不可獲取
// TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime
if (
this._config.maxWaitingClients !== undefined &&
this._waitingClientsQueue.length >= this._config.maxWaitingClients
) {
return this._Promise.reject(
new Error("max waitingClients count exceeded")
);
}
const resourceRequest = new ResourceRequest(
this._config.acquireTimeoutMillis, // 對象裏面的超時配置 表示等待時間 會啓動一個定時 超時了就觸發resourceRequest.promise 的reject觸發
this._Promise
);
// console.log(resourceRequest)
this._waitingClientsQueue.enqueue(resourceRequest, priority); // 請求進入等待請求隊列
this._dispense(); // 進行資源分發 最終會觸發resourceRequest.promise的resolve(client)
return resourceRequest.promise; // 返回的是一個promise對象resolve卻是在其他地方觸發
}
/**
* Attempt to resolve an outstanding resource request using an available resource from
* the pool, or creating new ones
*
* @private
*/
_dispense() {
/**
* Local variables for ease of reading/writing
* these don't (shouldn't) change across the execution of this fn
*/
const numWaitingClients = this._waitingClientsQueue.length; // 正在等待的請求的隊列長度 各個優先級的總和
console.log('numWaitingClients', numWaitingClients) // 1
// If there aren't any waiting requests then there is nothing to do
// so lets short-circuit
if (numWaitingClients < 1) {
return;
}
// max: 10, min: 4
console.log('_potentiallyAllocableResourceCount', this._potentiallyAllocableResourceCount) // 目前潛在空閒可用的連接數量
const resourceShortfall =
numWaitingClients - this._potentiallyAllocableResourceCount; // 還差幾個可用的 小於零表示不需要 大於0表示需要新建長連接的數量
console.log('spareResourceCapacity', this.spareResourceCapacity) // 距離max數量的還有幾個沒有創建
const actualNumberOfResourcesToCreate = Math.min(
this.spareResourceCapacity, // -6
resourceShortfall // 這個是 -3
); // 如果resourceShortfall>0 表示需要新建但是這新建的數量不能超過spareResourceCapacity最多可創建的
console.log('actualNumberOfResourcesToCreate', actualNumberOfResourcesToCreate) // 如果actualNumberOfResourcesToCreate >0 表示需要創建連接
for (let i = 0; actualNumberOfResourcesToCreate > i; i++) {
this._createResource(); // 新增新的長連接
}
// If we are doing test-on-borrow see how many more resources need to be moved into test
// to help satisfy waitingClients
if (this._config.testOnBorrow === true) { // 如果開啓了使用前校驗資源的有效性
// how many available resources do we need to shift into test
const desiredNumberOfResourcesToMoveIntoTest =
numWaitingClients - this._testOnBorrowResources.size;// 1
const actualNumberOfResourcesToMoveIntoTest = Math.min(
this._availableObjects.length, // 3
desiredNumberOfResourcesToMoveIntoTest // 1
);
for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) { // 需要有效性校驗的數量 至少滿足最小的waiting clinet
this._testOnBorrow(); // 資源有效校驗後再分發
}
}
// if we aren't testing-on-borrow then lets try to allocate what we can
if (this._config.testOnBorrow === false) { // 如果沒有開啓有效性校驗 就開啓有效資源的分發
const actualNumberOfResourcesToDispatch = Math.min(
this._availableObjects.length,
numWaitingClients
);
for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) { // 開始分發資源
this._dispatchResource();
}
}
}
/**
* Attempt to move an available resource to a waiting client
* @return {Boolean} [description]
*/
_dispatchResource() {
if (this._availableObjects.length < 1) {
return false;
}
const pooledResource = this._availableObjects.shift(); // 從可以資源池裏面取出一個
this._dispatchPooledResourceToNextWaitingClient(pooledResource); // 分發
return false;
}
/**
* Dispatches a pooledResource to the next waiting client (if any) else
* puts the PooledResource back on the available list
* @param {PooledResource} pooledResource [description]
* @return {Boolean} [description]
*/
_dispatchPooledResourceToNextWaitingClient(pooledResource) {
const clientResourceRequest = this._waitingClientsQueue.dequeue(); // 可能是undefined 取出一個等待的quene
console.log('clientResourceRequest.state', clientResourceRequest.state);
if (clientResourceRequest === undefined ||
clientResourceRequest.state !== Deferred.PENDING) {
console.log('沒有等待的')
// While we were away either all the waiting clients timed out
// or were somehow fulfilled. put our pooledResource back.
this._addPooledResourceToAvailableObjects(pooledResource); // 在可用的資源裏面添加一個
// TODO: do need to trigger anything before we leave?
return false;
}
// TODO clientResourceRequest 的state是否需要判斷 如果已經是resolve的狀態 已經超時回去了 這個是否有問題
const loan = new ResourceLoan(pooledResource, this._Promise);
this._resourceLoans.set(pooledResource.obj, loan); // _resourceLoans 是個map k=>value pooledResource.obj 就是socket本身
pooledResource.allocate(); // 標識資源的狀態是正在被使用
clientResourceRequest.resolve(pooledResource.obj); // acquire方法返回的promise對象的resolve在這裏執行的
return true;
}
上面的代碼就按種情況一直走下到最終獲取到長連接的資源,其他更多代碼大家可以自己去深入瞭解。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/GLQFzQuGEDQZQurv6ZUa_Q