使用 Rust 和 WebSocket 構建點對點網絡
在 WebSocket 基礎設施上創建點對點 (P2P) 網絡似乎是一項艱鉅的任務。在這篇文章中,我們將介紹基於 Rust 的 P2P 網絡的關鍵組件,探索每個部分如何構建無縫的 WebSocket 基礎設施。使用它們構建一個健壯且高效的 P2P 網絡,允許節點之間的實時通信。
P2P 網絡簡介
點對點網絡支持節點之間的分散通信,允許數據交換而不依賴於中心服務器。網絡中的每個參與者,或 “對等端”,可以同時充當客戶端和服務器端,促進直接連接和通信。在我們的示例中,我們使用 WebSocket,它在單個 TCP 連接上提供全雙工通信通道,以促進這種實時交互。
項目概述
我們的項目旨在演示如何在 Rust 中使用 WebSocket 建立 P2P 網絡,利用 Tokio 異步運行時的強大功能。我們將探討以下關鍵組件:
-
命令行參數解析:使用 clap 解析對等 url 和綁定地址。
-
WebSocket Actor:管理到對等端的 WebSocket 連接。
-
網絡狀態管理:維護網絡的狀態,包括連接的對等點。
-
連接處理:管理對等端 WebSocket 連接的生命週期。
-
廣播消息:定期向所有連接的對等端發送消息。
-
優雅關閉:優雅地處理中斷以關閉網絡。
項目開發
使用以下命令創建一個 Rust 新項目:
cargo new p2p-ws-example
在 Cargo.toml 文件中加入以下依賴項:
[dependencies]
clap = { version = "4.5.13", features = ["derive"] }
env_logger = "0.11.5"
futures = "0.3.30"
log = "0.4.22"
serde = "1.0.204"
tokio = { version = "1.39.2", features = ["full"] }
tokio-tungstenite = "0.23.1"
tokio-util = { version = "0.7.11", features = ["full"] }
命令行參數解析
我們將從使用 clap 定義命令行參數開始。這允許我們在啓動 P2P 節點時指定對等 url 和綁定地址。
use std::net::ToSocketAddrs;
use clap::Parser;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// 要連接的客戶端地址列表
#[arg(short, long, value_delimiter = ',', value_parser = parse_peer)]
peers: Vec<String>,
/// 綁定服務器的地址
#[arg(short, long, value_parser = parse_bind)]
bind: String,
}
/// 解析並驗證客戶端url
fn parse_peer(s: &str) -> Result<String, String> {
// 驗證以ws://或wss://開頭的URL
if s.starts_with("ws://") {
let ip_port = &s[5..];
if let Ok(_socket_addr) = ip_port.to_socket_addrs() {
return Ok(s.to_string());
}
}
Err(format!("Invalid client URL: {}", s))
}
/// 解析並驗證綁定地址
fn parse_bind(s: &str) -> Result<String, String> {
if let Ok(_socket_addr) = s.to_socket_addrs() {
return Ok(s.to_string());
}
Err(format!("Invalid bind address: {}", s))
}
-
peers:以逗號分隔的 WebSocket url 列表,以對等端的形式連接。
-
bind:連接綁定服務器的地址。
WebSocket Actor
WebSocketActor 結構體管理 WebSocket 連接。它建立到給定 URL 的連接,並處理消息的發送和接收。
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
// 定義 WebSocket actor
struct WebSocketActor {
ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
impl WebSocketActor {
async fn connect(url: &str) -> Option<Self> {
match connect_async(url).await {
Ok((conn, _)) => {
log::info!("Connected successfully to {}", url);
Some(WebSocketActor { ws_stream: conn })
}
Err(e) => {
log::error!("Connection to {} failed: {:?}", url, e);
None
}
}
}
}
網絡狀態管理
P2PWebsocketNetwork 結構體維護網絡的狀態,包括連接的對等點和用於消息廣播的主發送方。
use std::{
collections::HashMap,
net::{SocketAddr, ToSocketAddrs},
sync::{Arc, Mutex},
};
use clap::Parser;
use tokio::{net::TcpStream, sync::mpsc::UnboundedSender};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
struct P2PWebsocketNetwork {
addresses: Arc<Mutex<HashMap<SocketAddr, UnboundedSender<P2PInnerMessage>>>>,
master: Arc<Mutex<UnboundedSender<P2PInnerMessage>>>,
}
#[derive(Debug)]
struct P2PInnerMessage {
message: Message,
tx_handler: UnboundedSender<P2PInnerMessage>,
}
連接管理
我們將使用 handle_connection 和 handle_server_connection 函數處理傳入和傳出的連接。這些函數管理 WebSocket 連接的生命週期,發送和接收消息。
async fn handle_connection(
state: Arc<P2PWebsocketNetwork>,
conn: WebSocketActor,
token: CancellationToken,
) {
// 處理連接和消息交換的邏輯
}
-
state:包含網絡信息的共享狀態 (Arc),包括已連接的對等端和消息處理程序。
-
conn:表示與對等端連接的 WebSocketActor 實例。
-
token:用於處理任務取消,以實現安全關機。
handle_connection 函數體邏輯如下
1,提取套接字地址:
// 提取套接字地址作爲客戶端列表的鍵
let addr = match conn.ws_stream.get_ref() {
MaybeTlsStream::Plain(f) => f.peer_addr().unwrap(),
_ => {
panic!("tls is not supported yet");
}
};
addr:對等端的 socket 地址。代碼從 WebSocket 流中提取對等端的地址。目前只支持非 tls (plain) 流。
2,設置消息通道:
// 這個tx應該在網絡狀態下共享
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
{
let mut list = state.addresses.lock().unwrap();
list.insert(addr, tx.clone());
}
-
tx 和 rx:用於在此函數和應用程序的其他部分之間發送和接收消息 (P2PInnerMessage) 的無界通道。
-
state.addresses:連接的對等端的地址存儲在一個共享的 HashMap 中,新對等端的地址和它的發送者 (tx) 被添加到列表中。
3,拆分 WebSocket 流:
let (mut ws_tx, mut ws_rx) = conn.ws_stream.split();
ws_tx 和 ws_rx:WebSocket 流分爲發送方 (ws_tx) 和接收方(ws_rx),以異步方式處理傳入和傳出的消息。
4,循環消息處理
loop {
tokio::select! {
Some(msg) = ws_rx.next() => {
log::debug!("Received: {:?}", msg);
match msg {
Ok(msg) => {
if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
message: msg,
tx_handler: tx.clone(),
}) {
log::error!("Failed to send message to master: {:?}", e);
}
},
Err(e) => {
log::error!("Error receiving message or connection closed: {:?}", e);
break
}
}
}
Some(msg) = rx.recv() => {
log::debug!("Sending: {:?}", msg);
if let Err(e) = ws_tx.send(msg.message).await {
log::error!("Failed to send message on socket: {:?}", e);
}
}
_ = token.cancelled() => {
log::warn!("task cancelled");
break
}
}
}
-
傳入消息 (ws_rx):循環使用 tokio::select! 等待多個異步事件。它首先檢查來自對等端的傳入消息 (ws_rx.next())。如果成功接收到消息,則通過 state.master 將其轉發給主處理程序。這可以對消息進行集中處理或路由。如果發生錯誤 (例如,連接關閉),循環中斷,有效地終止連接。
-
傳出消息 (rx):循環還檢查從應用程序的內部通道(rx.recv()) 接收到的打算髮送給對等端的消息。使用 ws_tx.send(msg.message).await 將消息發送到對等端。發送中的錯誤將被記錄。
-
取消令牌:如果取消令牌被觸發 (Token .cancelled()),循環中斷,允許任務乾淨地退出。
5,從列表中刪除對等端
{
// 從列表中刪除客戶端
let mut list = state.addresses.lock().unwrap();
list.remove(&addr);
}
一旦循環退出 (由於錯誤或取消),對等體的地址將從連接地址列表中刪除,以確保狀態準確地反映當前網絡連接。
handle_server_connection 函數的操作邏輯與上述一樣,代碼如下:
async fn handle_server_connection(
state: Arc<P2PWebsocketNetwork>,
raw_stream: TcpStream,
addr: SocketAddr,
token: CancellationToken,
) {
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
{
let mut list = state.addresses.lock().unwrap();
list.insert(addr, tx.clone());
}
log::info!("Incoming TCP connection from: {}", addr);
let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
Ok(ws) => ws,
Err(e) => {
log::error!("WebSocket handshake error: {:?}", e);
return;
}
};
log::info!("WebSocket connection established: {}", addr);
let (mut ws_tx, mut ws_rx) = ws_stream.split();
loop {
tokio::select! {
Some(msg) = ws_rx.next() => {
log::debug!("Received: {:?}", msg);
match msg {
Ok(msg) => {
if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
message: msg,
tx_handler: tx.clone(),
}) {
log::error!("Failed to send message to master: {:?}", e);
}
},
Err(e) => {
log::error!("Error receiving message or connection closed: {:?}", e);
break
}
}
}
Some(msg) = rx.recv() => {
log::debug!("Sending: {:?}", msg);
if let Err(e) = ws_tx.send(msg.message).await {
log::error!("Failed to send message on socket: {:?}", e);
}
}
_ = token.cancelled() => {
log::warn!("task cancelled");
break
}
}
}
{
// 從列表中刪除客戶端
let mut list = state.addresses.lock().unwrap();
list.remove(&addr);
}
}
廣播消息
我們的廣播功能定期向所有連接的對等點發送消息,展示了 P2P 網絡傳播信息的能力。
async fn broadcast(
state: Arc<P2PWebsocketNetwork>,
tx: UnboundedSender<P2PInnerMessage>,
bind: String,
) {
log::debug!("Broadcast start");
// 廣播到已連接的客戶端
let list = state.addresses.lock().unwrap();
for (i, cl) in list.iter().enumerate() {
log::debug!("Broadcasting to {} ", cl.0);
if let Err(e) = cl.1.send(P2PInnerMessage {
message: tungstenite::protocol::Message::text(format!(
"Message to client {} from {}",
i, bind
)),
tx_handler: tx.clone(),
}) {
log::error!("Failed to send broadcast message: {:?}", e);
}
}
log::debug!("Broadcast end");
}
main 函數
#[tokio::main]
async fn main() {
let args = Args::parse();
env_logger::init();
let cancelation_token = CancellationToken::new();
let tracker = TaskTracker::new();
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
let network_state: Arc<P2PWebsocketNetwork> = Arc::new(P2PWebsocketNetwork {
addresses: Arc::new(Mutex::new(HashMap::new())),
master: Arc::new(Mutex::new(tx.clone())),
});
for url in &args.peers {
log::info!("connecting to {} ...", url);
if let Some(conn) = WebSocketActor::connect(url).await {
tracker.spawn(handle_connection(
network_state.clone(),
conn,
cancelation_token.clone(),
));
} else {
log::warn!("could not connect to server: {url}");
}
}
let listener = TcpListener::bind(&args.bind).await.expect("Failed to bind");
loop {
tokio::select! {
Ok((stream, addr)) = listener.accept() => {
tracker.spawn(handle_server_connection(
network_state.clone(),
stream, addr, cancelation_token.clone()));
}
Some(msg) = rx.recv() => {
log::debug!("consuming ->{msg:?}");
}
_ = tokio::signal::ctrl_c() => {
log::warn!("Received Ctrl+C, shutting down...");
tracker.close();
cancelation_token.cancel();
break
}
_ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
tracker.spawn(broadcast(network_state.clone(), tx.clone(), args.bind.clone()));
}
}
}
log::info!("waiting for all tasks");
tracker.wait().await;
log::debug!("tasks all are stoped");
}
在這裏 我們將通過處理 Ctrl+C 信號並相應地取消任務來確保我們的網絡可以優雅地關閉。
運行項目
在不同的終端中,使用這些參數按以下順序運行程序:
$ RUST_LOG=debug cargo run -- --bind localhost:8080 # 啓動第一個節點
$ RUST_LOG=debug cargo run -- --peers ws://localhost:8080 --bind localhost:8085 # 啓動第二個節點
$ RUST_LOG=debug cargo run -- --peers ws://localhost:8080,ws://localhost:8085 --bind localhost:8086 # 啓動第三個節點
然後,將看到從每個對等點向所有其他對等點廣播消息。這是對所有人的廣播!
總結
這篇文章使用 Rust 構建基於 WebSocket 的 P2P 網絡示例,它提供了一種強大而有效的方式來實現節點之間的實時通信。通過理解代碼的每個部分,可以擴展和定製這個示例以滿足特定需求,無論是分散的應用程序,實時數據共享還是分佈式計算任務。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/I4QCy5YvyVU5lnsx548iJA