用 Node-js 手寫 WebSocket 協議
我們知道,http 是一問一答的模式,客戶端向服務器發送 http 請求,服務器返回 http 響應。
這種模式對資源、數據的加載足夠用,但是需要數據推送的場景就不合適了。
有同學說,http2 不是有 server push 麼?
那只是推資源用的:
比如瀏覽器請求了 html,服務端可以連帶把 css 一起推送給瀏覽器。瀏覽器可以決定接不接收。
對於即時通訊等實時性要求高的場景,就需要用 websocket 了。
websocket 嚴格來說和 http 沒什麼關係,是另外一種協議格式。但是需要一次從 http 到 websocekt 的切換過程。
切換過程詳細來說是這樣的:
請求的時候帶上這幾個 header:
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Key: Ia3dQjfWrAug/6qm7mTZOg==
前兩個很容易理解,就是升級到 websocket 協議的意思。
第三個 header 是保證安全用的一個 key。
服務端返回這樣的 header:
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: JkE58n3uIigYDMvC+KsBbGZsp1A=
和請求 header 類似,Sec-WebSocket-Accept 是對請求帶過來的 Sec-WebSocket-Key 處理之後的結果。
加入這個 header 的校驗是爲了確定對方一定是有 WebSocket 能力的,不然萬一建立了連接對方卻一直沒消息,那不就白等了麼。
那 Sec-WebSocket-Key 經過什麼處理能得到 Sec-WebSocket-Accept 呢?
我用 node 實現了一下,是這樣的:
const crypto = require('crypto');
function hashKey(key) {
const sha1 = crypto.createHash('sha1');
sha1.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
return sha1.digest('base64');
}
也就是用客戶端傳過來的 key,加上一個固定的字符串,經過 sha1 加密之後,轉成 base64 的結果。
這個字符串 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 是固定的,不信你搜搜看:
隨便找個有 websocket 的網站,比如知乎就有:
過濾出 ws 類型的請求,看看這幾個 header,是不是就是前面說的那些。
這個 Sec-WebSocket-Key 是 wk60yiym2FEwCAMVZE3FgQ==
而響應的 Sec-WebSocket-Accept 是 XRfPnS+8xl11QWZherej/dkHPHM=
我們算算看:
是不是一毛一樣!
這就是 websocket 升級協議時候的 Sec-WebSocket-Key 對應的 Sec-WebSocket-Accept 的計算過程。
這一步之後就換到 websocket 的協議了,那是一個全新的協議:
勾選 message 這一欄可以看到傳輸的消息,可以是文本、可以是二進制:
全新的協議?那具體是什麼樣的協議呢?
這樣的:
大家習慣的 http 協議是 key:value 的 header 帶個 body 的:
它是文本協議,每個 header 都是容易理解的字符。
這樣好懂是好懂,但是傳輸佔的空間太大了。
而 websocket 是二進制協議,一個字節可以用來存儲很多信息:
比如協議的第一個字節,就存儲了 FIN(結束標誌)、opcode(內容類型是 binary 還是 text) 等信息。
第二個字節存儲了 mask(是否有加密),payload(數據長度)。
僅僅兩個字節,存儲了多少信息呀!
這就是二進制協議比文本協議好的地方。
我們看到的 weboscket 的 message 的收發,其實底層都是拼成這樣的格式。
只是瀏覽器幫我們解析了這種格式的協議數據。
這就是 weboscket 的全部流程了。
其實還是挺清晰的,一個切換協議的過程,然後是二進制的 weboscket 協議的收發。
那我們就用 Node.js 自己實現一個 websocket 服務器吧!
定義個 MyWebsocket 的 class:
const { EventEmitter } = require('events');
const http = require('http');
class MyWebsocket extends EventEmitter {
constructor(options) {
super(options);
const server = http.createServer();
server.listen(options.port || 8080);
server.on('upgrade', (req, socket) => {
});
}
}
繼承 EventEmitter 是爲了可以用 emit 發送一些事件,外界可以通過 on 監聽這個事件來處理。
我們在構造函數里創建了一個 http 服務,當 ungrade 事件發生,也就是收到了 Connection: upgrade 的 header 的時候,返回切換協議的 header。
返回的 header 前面已經見過了,就是要對 sec-websocket-key 做下處理。
server.on('upgrade', (req, socket) => {
this.socket = socket;
socket.setKeepAlive(true);
const resHeaders = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Accept: ' + hashKey(req.headers['sec-websocket-key']),
'',
''
].join('\r\n');
socket.write(resHeaders);
socket.on('data', (data) => {
console.log(data)
});
socket.on('close', (error) => {
this.emit('close');
});
});
我們拿到 socket,返回上面的 header,其中 key 做的處理就是前面聊過的算法:
function hashKey(key) {
const sha1 = crypto.createHash('sha1');
sha1.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
return sha1.digest('base64');
}
就這麼簡單,就已經完成協議切換了。
不信我們試試看。
引入我們實現的 ws 服務器,跑起來:
const MyWebSocket = require('./ws');
const ws = new MyWebSocket({ port: 8080 });
ws.on('data', (data) => {
console.log('receive data:' + data);
});
ws.on('close', (code, reason) => {
console.log('close:', code, reason);
});
然後新建這樣一個 html:
<!DOCTYPE HTML>
<html>
<body>
<script>
const ws = new WebSocket("ws://localhost:8080");
ws.onopen = function () {
ws.send("發送數據");
setTimeout(() => {
ws.send("發送數據2");
}, 3000)
};
ws.onmessage = function (evt) {
console.log(evt)
};
ws.onclose = function () {
};
</script>
</body>
</html>
用瀏覽器的 WebSocket api 建立連接,發送消息。
用 npx http-server . 起個靜態服務。
然後瀏覽器訪問這個 html:
這時打開 devtools 你就會發現協議切換成功了:
這 3 個 header 還有 101 狀態碼都是我們返回的。
message 裏也可以看到發送的消息:
再去服務端看看,也收到了這個消息:
只不過是 Buffer 的,也就是二進制的。
接下來只要按照協議格式解析這個 Buffer,並且生成響應格式的協議數據 Buffer 返回就可以收發 websocket 數據了。
這一部分還是比較麻煩的,我們一點點來看。
我們需要第一個字節的後四位,也就是 opcode。
這樣寫:
const byte1 = bufferData.readUInt8(0);
let opcode = byte1 & 0x0f;
讀取 8 位無符號整數的內容,也就是一個字節的內容。參數是偏移的字節,這裏是 0。
通過位運算取出後四位,這就是 opcode 了。
然後再處理第二個字節:
第一位是 mask 標誌位,後 7 位是 payload 長度。
可以這樣取:
const byte2 = bufferData.readUInt8(1);
const str2 = byte2.toString(2);
const MASK = str2[0];
let payloadLength = parseInt(str2.substring(1), 2);
還是用 buffer.readUInt8 讀取一個字節的內容。
先轉成二進制字符串,這時第一位就是 mask,然後再截取後 7 位的子串,parseInt 成數字,這就是 payload 長度了。
這樣前兩個字節的協議內容就解析完了。
有同學可能問了,後面咋還有倆 payload 長度呢?
這是因爲數據不一定有多長,可能需要 16 位存長度,可能需要 32 位。
於是 websocket 協議就規定了如果那個 7 位的內容不超過 125,那它就是 payload 長度。
如果 7 位的內容是 126,那就不用它了,用後面的 16 位的內容作爲 payload 長度。
如果 7 位的內容是 127,也不用它了,用後面那個 64 位的內容作爲 payload 長度。
其實還是容易理解的,就是 3 個 if else。
用代碼寫出來就是這樣的:
let payloadLength = parseInt(str2.substring(1), 2);
let curByteIndex = 2;
if (payloadLength === 126) {
payloadLength = bufferData.readUInt16BE(2);
curByteIndex += 2;
} else if (payloadLength === 127) {
payloadLength = bufferData.readBigUInt64BE(2);
curByteIndex += 8;
}
這裏的 curByteIndex 是存儲當前處理到第幾個字節的。
如果是 126,那就從第 3 個字節開始,讀取 2 個字節也就是 16 位的長度,用 buffer.readUInt16BE 方法。
如果是 127,那就從第 3 個字節開始,讀取 8 個字節也就是 64 位的長度,用 buffer.readBigUInt64BE 方法。
這樣就拿到了 payload 的長度,然後再用這個長度去截取內容就好了。
但在讀取數據之前,還有個 mask 要處理,這個是用來給內容解密的:
讀 4 個字節,就是 mask key。
再後面的就可以根據 payload 長度讀出來。
let realData = null;
if (MASK) {
const maskKey = bufferData.slice(curByteIndex, curByteIndex + 4);
curByteIndex += 4;
const payloadData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);
realData = handleMask(maskKey, payloadData);
} else {
realData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);;
}
然後用 mask key 來解密數據。
這個算法也是固定的,用每個字節的 mask key 和數據的每一位做按位異或就好了:
function handleMask(maskBytes, data) {
const payload = Buffer.alloc(data.length);
for (let i = 0; i < data.length; i++) {
payload[i] = maskBytes[i % 4] ^ data[i];
}
return payload;
}
這樣,我們就拿到了最終的數據!
但是傳給處理程序之前,還要根據類型來處理下,因爲內容分幾種類型,也就是 opcode 有幾種值:
const OPCODES = {
CONTINUE: 0,
TEXT: 1, // 文本
BINARY: 2, // 二進制
CLOSE: 8,
PING: 9,
PONG: 10,
};
我們只處理文本和二進制就好了:
handleRealData(opcode, realDataBuffer) {
switch (opcode) {
case OPCODES.TEXT:
this.emit('data', realDataBuffer.toString('utf8'));
break;
case OPCODES.BINARY:
this.emit('data', realDataBuffer);
break;
default:
this.emit('close');
break;
}
}
文本就轉成 utf-8 的字符串,二進制數據就直接用 buffer 的數據。
這樣,處理程序裏就能拿到解析後的數據。
我們來試一下:
之前我們已經能拿到 weboscket 協議內容的 buffer 了:
而現在我們能正確解析出其中的數據:
至此,我們 websocket 協議的解析成功了!
這樣的協議格式的數據叫做 frame,也就是幀:
解析可以了,接下來我們再實現數據的發送。
發送也是構造一樣的 frame 格式。
定義這樣一個 send 方法:
send(data) {
let opcode;
let buffer;
if (Buffer.isBuffer(data)) {
opcode = OPCODES.BINARY;
buffer = data;
} else if (typeof data === 'string') {
opcode = OPCODES.TEXT;
buffer = Buffer.from(data, 'utf8');
} else {
console.error('暫不支持發送的數據類型')
}
this.doSend(opcode, buffer);
}
doSend(opcode, bufferDatafer) {
this.socket.write(encodeMessage(opcode, bufferDatafer));
}
根據發送的是文本還是二進制數據來對內容作處理。
然後構造 websocket 的 frame:
function encodeMessage(opcode, payload) {
//payload.length < 126
let bufferData = Buffer.alloc(payload.length + 2 + 0);;
let byte1 = parseInt('10000000', 2) | opcode; // 設置 FIN 爲 1
let byte2 = payload.length;
bufferData.writeUInt8(byte1, 0);
bufferData.writeUInt8(byte2, 1);
payload.copy(bufferData, 2);
return bufferData;
}
我們只處理數據長度小於 125 的情況。
第一個字節是 opcode,我們把第一位置 1 ,通過按位或的方式。
服務端給客戶端回消息不需要 mask,所以第二個字節就是 payload 長度。
分別把這前兩個字節的數據寫到 buffer 裏,指定不同的 offset:
bufferData.writeUInt8(byte1, 0);
bufferData.writeUInt8(byte2, 1);
之後把 payload 數據放在後面:
payload.copy(bufferData, 2);
這樣一個 websocket 的 frame 就構造完了。
我們試一下:
收到客戶端消息後,每兩秒回一個消息。
收發消息都成功了!
就這樣,我們自己實現了一個 websocket 服務器,實現了 websocket 協議的解析和生成!
完整代碼如下:
MyWebSocket:
//ws.js
const { EventEmitter } = require('events');
const http = require('http');
const crypto = require('crypto');
function hashKey(key) {
const sha1 = crypto.createHash('sha1');
sha1.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
return sha1.digest('base64');
}
function handleMask(maskBytes, data) {
const payload = Buffer.alloc(data.length);
for (let i = 0; i < data.length; i++) {
payload[i] = maskBytes[i % 4] ^ data[i];
}
return payload;
}
const OPCODES = {
CONTINUE: 0,
TEXT: 1,
BINARY: 2,
CLOSE: 8,
PING: 9,
PONG: 10,
};
function encodeMessage(opcode, payload) {
//payload.length < 126
let bufferData = Buffer.alloc(payload.length + 2 + 0);;
let byte1 = parseInt('10000000', 2) | opcode; // 設置 FIN 爲 1
let byte2 = payload.length;
bufferData.writeUInt8(byte1, 0);
bufferData.writeUInt8(byte2, 1);
payload.copy(bufferData, 2);
return bufferData;
}
class MyWebsocket extends EventEmitter {
constructor(options) {
super(options);
const server = http.createServer();
server.listen(options.port || 8080);
server.on('upgrade', (req, socket) => {
this.socket = socket;
socket.setKeepAlive(true);
const resHeaders = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Accept: ' + hashKey(req.headers['sec-websocket-key']),
'',
''
].join('\r\n');
socket.write(resHeaders);
socket.on('data', (data) => {
this.processData(data);
// console.log(data);
});
socket.on('close', (error) => {
this.emit('close');
});
});
}
handleRealData(opcode, realDataBuffer) {
switch (opcode) {
case OPCODES.TEXT:
this.emit('data', realDataBuffer.toString('utf8'));
break;
case OPCODES.BINARY:
this.emit('data', realDataBuffer);
break;
default:
this.emit('close');
break;
}
}
processData(bufferData) {
const byte1 = bufferData.readUInt8(0);
let opcode = byte1 & 0x0f;
const byte2 = bufferData.readUInt8(1);
const str2 = byte2.toString(2);
const MASK = str2[0];
let curByteIndex = 2;
let payloadLength = parseInt(str2.substring(1), 2);
if (payloadLength === 126) {
payloadLength = bufferData.readUInt16BE(2);
curByteIndex += 2;
} else if (payloadLength === 127) {
payloadLength = bufferData.readBigUInt64BE(2);
curByteIndex += 8;
}
let realData = null;
if (MASK) {
const maskKey = bufferData.slice(curByteIndex, curByteIndex + 4);
curByteIndex += 4;
const payloadData = bufferData.slice(curByteIndex, curByteIndex + payloadLength);
realData = handleMask(maskKey, payloadData);
}
this.handleRealData(opcode, realData);
}
send(data) {
let opcode;
let buffer;
if (Buffer.isBuffer(data)) {
opcode = OPCODES.BINARY;
buffer = data;
} else if (typeof data === 'string') {
opcode = OPCODES.TEXT;
buffer = Buffer.from(data, 'utf8');
} else {
console.error('暫不支持發送的數據類型')
}
this.doSend(opcode, buffer);
}
doSend(opcode, bufferDatafer) {
this.socket.write(encodeMessage(opcode, bufferDatafer));
}
}
module.exports = MyWebsocket;
Index:
const MyWebSocket = require('./ws');
const ws = new MyWebSocket({ port: 8080 });
ws.on('data', (data) => {
console.log('receive data:' + data);
setInterval(() => {
ws.send(data + ' ' + Date.now());
}, 2000)
});
ws.on('close', (code, reason) => {
console.log('close:', code, reason);
});
html:
<!DOCTYPE HTML>
<html>
<body>
<script>
const ws = new WebSocket("ws://localhost:8080");
ws.onopen = function () {
ws.send("發送數據");
setTimeout(() => {
ws.send("發送數據2");
}, 3000)
};
ws.onmessage = function (evt) {
console.log(evt)
};
ws.onclose = function () {
};
</script>
</body>
</html>
總結
實時性較高的需求,我們會用 websocket 實現,比如即時通訊、遊戲等場景。
websocket 和 http 沒什麼關係,但從 http 到 websocket 需要一次切換的過程。
這個切換過程除了要帶 upgrade 的 header 外,還要帶 sec-websocket-key,服務端根據這個 key 算出結果,通過 sec-websocket-accept 返回。響應是 101 Switching Protocols 的狀態碼。
這個計算過程比較固定,就是 key + 固定的字符串 通過 sha1 加密後再 base64 的結果。
加這個機制是爲了確保對方一定是 websocket 服務器,而不是隨意返回了個 101 狀態碼。
之後就是 websocket 協議了,這是個二進制協議,我們根據格式完成了 websocket 幀的解析和生成。
這樣就是一個完整的 websocket 協議的實現了。
我們自己手寫了一個 websocket 服務,有沒有感覺對 websocket 的理解更深了呢?
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/EWE6vIvwWDJ_A6SqikV6Tg