将Cargo.toml
文件修改为如下所示:
[package]
name = "server"
version = "0.1.0"
edition = "2022"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
warp = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
tokio-stream = "0.1"
pretty_env_logger = "0.4"
// #![deny(warnings)]
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use futures_util::{SinkExt, StreamExt, TryFutureExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::ws::{Message, WebSocket};
use warp::Filter;
/// 我们的全局唯一用户 ID 计数器。
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
/// 我们当前连接用户的状态。
///
/// - 键是他们的 id
/// - 值是 `warp::ws::Message` 的发送者
type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
// 跟踪所有连接的用户,key是usize,value
// 是一个 websocket 发送者。
let users = Users::default();
// 把我们的“状态”变成一个新的过滤器...
//注释掉了
//let users = warp::any().map(move || users.clone());
// GET /chat -> websocket 升级
let chat = warp::path("chat")
// `ws()` 过滤器将准备 Websocket 握手...
.and(warp::ws())
.and(with_users(users.clone()))
.map(|ws: warp::ws::Ws, users| {
// 如果握手成功,这将调用我们的函数。
ws.on_upgrade(move |socket| user_connected(socket, users))
});
let files = warp::fs::dir("static");
let routes = chat.or(files);
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
//包一下,让它可以克隆
fn with_users(users: Users) -> impl Filter<Extract = (Users,), Error = Infallible> + Clone {
warp::any().map(move || users.clone())
}
async fn user_connected(ws: WebSocket, users: Users) {
// 使用计数器为该用户分配一个新的唯一 ID。
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
eprintln!("new chat user: {}", my_id);
// 将套接字拆分为消息的发送者和接收者。
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
// 使用无界通道来处理消息的缓冲和刷新
// 到 websocket...
let (tx, rx) = mpsc::unbounded_channel();
let mut rx = UnboundedReceiverStream::new(rx);
tokio::task::spawn(async move {
while let Some(message) = rx.next().await {
user_ws_tx
.send(message)
.unwrap_or_else(|e| {
eprintln!("websocket send error: {}", e);
})
.await;
}
});
// 将发件人保存在我们的已连接用户列表中。
users.write().await.insert(my_id, tx.clone());
// 返回一个 `Future`,它基本上是一个状态机管理
// 这个特定用户的连接。
// 用户每次发送消息,广播给
// 所有其他用户...
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
Ok(msg) => {
msg
}
Err(e) => {
eprintln!("websocket error(uid={}): {}", my_id, e);
break;
}
};
//因为我不需要把用户每次发送消息,广播给所有其他用户,先注释掉了
user_message(my_id, msg, &users).await;
}
// 只要用户停留,user_ws_rx 流将继续处理
// 连接的。 一旦他们断开连接,然后...
user_disconnected(my_id, &users).await;
}
async fn user_message(my_id: usize, msg: Message, users: &Users) {
// 跳过任何非文本消息...
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", my_id, msg);
// 来自该用户的新消息,将其发送给其他所有人(相同的 uid 除外)...
for (&uid, tx) in users.read().await.iter() {
if my_id != uid {
if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
// tx 断开连接,我们的 `user_disconnected` 代码
// 应该发生在另一个任务中,仅此而已
// 在这里做。
}
}
}
}
async fn user_disconnected(my_id: usize, users: &Users) {
eprintln!("good bye user: {}", my_id);
// 流关闭,所以从用户列表中删除
users.write().await.remove(&my_id);
}
最后
以上就是英俊小蜜蜂最近收集整理的关于Rust之Websocket 服务端【Warp】的全部内容,更多相关Rust之Websocket内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复