websocket添加链接缓存

This commit is contained in:
李运家 2025-03-11 18:44:55 +08:00
parent 7a6ac2f09d
commit f802c243b6
5 changed files with 74 additions and 59 deletions

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"commentTranslate.hover.enabled": true
}

70
Cargo.lock generated
View File

@ -484,6 +484,20 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.8.0" version = "2.8.0"
@ -889,6 +903,12 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.15.2" version = "0.15.2"
@ -906,7 +926,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
dependencies = [ dependencies = [
"hashbrown", "hashbrown 0.15.2",
] ]
[[package]] [[package]]
@ -1296,7 +1316,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown", "hashbrown 0.15.2",
] ]
[[package]] [[package]]
@ -1935,11 +1955,11 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.20" version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [ dependencies = [
"zerocopy 0.7.35", "zerocopy",
] ]
[[package]] [[package]]
@ -2053,7 +2073,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [ dependencies = [
"rand_chacha 0.9.0", "rand_chacha 0.9.0",
"rand_core 0.9.3", "rand_core 0.9.3",
"zerocopy 0.8.23", "zerocopy",
] ]
[[package]] [[package]]
@ -2396,18 +2416,18 @@ checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.218" version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.218" version = "1.0.219"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2464,6 +2484,7 @@ dependencies = [
"axum", "axum",
"axum-extra", "axum-extra",
"chrono", "chrono",
"dashmap",
"domain", "domain",
"error-stack", "error-stack",
"futures", "futures",
@ -2639,7 +2660,7 @@ dependencies = [
"futures-intrusive", "futures-intrusive",
"futures-io", "futures-io",
"futures-util", "futures-util",
"hashbrown", "hashbrown 0.15.2",
"hashlink", "hashlink",
"indexmap", "indexmap",
"log", "log",
@ -2855,9 +2876,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.99" version = "2.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -3924,34 +3945,13 @@ dependencies = [
"synstructure", "synstructure",
] ]
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive 0.7.35",
]
[[package]] [[package]]
name = "zerocopy" name = "zerocopy"
version = "0.8.23" version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6"
dependencies = [ dependencies = [
"zerocopy-derive 0.8.23", "zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
] ]
[[package]] [[package]]

View File

@ -64,3 +64,4 @@ chrono-tz = "0.10.0"
inventory = "0.3.17" inventory = "0.3.17"
oauth2 = "5.0.0" oauth2 = "5.0.0"
futures = "0.3" futures = "0.3"
dashmap = "6.1.0"

View File

@ -22,6 +22,7 @@ moka = { workspace = true, features = ["future", "logging"] }
lazy_static = { workspace = true } lazy_static = { workspace = true }
tokio-cron-scheduler = { workspace = true } tokio-cron-scheduler = { workspace = true }
tower = { workspace = true } tower = { workspace = true }
dashmap = { workspace = true }
library = { path = "../library" } library = { path = "../library" }
domain = { path = "../domain" } domain = { path = "../domain" }

View File

@ -2,25 +2,37 @@ use std::net::SocketAddr;
use std::ops::ControlFlow; use std::ops::ControlFlow;
use axum::extract::ws::{Message, WebSocket}; use axum::extract::ws::{Message, WebSocket};
use dashmap::DashMap;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt}; use futures::StreamExt;
use lazy_static::lazy_static;
use library::context::Context; use library::context::Context;
lazy_static!{
static ref WS_POOL: DashMap<String, SplitSink<WebSocket, Message>> = DashMap::<String, SplitSink<WebSocket, axum::extract::ws::Message>>::new();
}
/// Actual websocket statemachine (one will be spawned per connection) /// Actual websocket statemachine (one will be spawned per connection)
pub async fn handle_socket(socket: WebSocket, _who: SocketAddr, context: Context) { pub async fn handle_socket(socket: WebSocket, _who: SocketAddr, context: Context) {
let account = context.get_account().unwrap(); let account = context.get_account().unwrap();
tracing::info!("`{:?}` at {:?} connected, user is {:?}", account, _who, account.username); tracing::info!("`{:?}` at {:?} connected, user is {:?}", account, _who, account.username);
let (sender, receiver) = socket.split(); let (sender, receiver) = socket.split();
tokio::spawn(write(sender)); // tokio::spawn(write(sender));
tokio::spawn(read(receiver)); tokio::spawn(read(account.id.clone(), receiver));
WS_POOL.insert(account.id.clone(), sender);
} }
async fn read(mut receiver: SplitStream<WebSocket>) { async fn read(account_id: String, mut receiver: SplitStream<WebSocket>) {
loop { loop {
match receiver.next().await { match receiver.next().await {
Some(Ok(msg)) => { Some(Ok(msg)) => {
process_message(msg); if let ControlFlow::Break(_) = process_message(msg) {
// 收到关闭通知,移除该连接,并跳出循环
WS_POOL.remove(&account_id);
break;
}
}, },
Some(Err(err)) => { Some(Err(err)) => {
tracing::error!("读取消息失败 {:?}", err); tracing::error!("读取消息失败 {:?}", err);
@ -30,20 +42,6 @@ async fn read(mut receiver: SplitStream<WebSocket>) {
} }
} }
async fn write(mut sender: SplitSink<WebSocket, Message>) {
loop {
match sender.send(Message::text("haha")).await {
Ok(_) => {
// tracing::info!("发送成功");
},
Err(err) => {
tracing::error!("发送失败 {:?}", err);
},
}
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
}
}
fn process_message(msg: Message) -> ControlFlow<(), ()> { fn process_message(msg: Message) -> ControlFlow<(), ()> {
match msg { match msg {
Message::Text(t) => { Message::Text(t) => {
@ -51,10 +49,7 @@ fn process_message(msg: Message) -> ControlFlow<(), ()> {
} }
Message::Close(c) => { Message::Close(c) => {
if let Some(cf) = c { if let Some(cf) = c {
tracing::info!( tracing::info!("收到关闭通知 code {}, reason `{}`", cf.code, cf.reason);
"收到关闭通知 code {}, reason `{}`",
cf.code, cf.reason
);
} else { } else {
tracing::info!(">>> somehow sent close message without CloseFrame"); tracing::info!(">>> somehow sent close message without CloseFrame");
} }
@ -69,6 +64,21 @@ fn process_message(msg: Message) -> ControlFlow<(), ()> {
ControlFlow::Continue(()) ControlFlow::Continue(())
} }
// async fn write(mut sender: SplitSink<WebSocket, Message>) {
// loop {
// match sender.send(Message::text("haha")).await {
// Ok(_) => {
// // tracing::info!("发送成功");
// },
// Err(err) => {
// tracing::error!("发送失败 {:?}", err);
// },
// }
// tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
// }
// }
// fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> { // fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
// match msg { // match msg {
// Message::Text(t) => { // Message::Text(t) => {