diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..351b8ea --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "commentTranslate.hover.enabled": true +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index df3b174..734efcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -484,6 +484,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.8.0" @@ -889,6 +903,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -906,7 +926,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" dependencies = [ - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1296,7 +1316,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1935,11 +1955,11 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" dependencies = [ - "zerocopy 0.7.35", + "zerocopy", ] [[package]] @@ -2053,7 +2073,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.3", - "zerocopy 0.8.23", + "zerocopy", ] [[package]] @@ -2396,18 +2416,18 @@ checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" [[package]] name = "serde" -version = "1.0.218" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.218" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -2464,6 +2484,7 @@ dependencies = [ "axum", "axum-extra", "chrono", + "dashmap", "domain", "error-stack", "futures", @@ -2639,7 +2660,7 @@ dependencies = [ "futures-intrusive", "futures-io", "futures-util", - "hashbrown", + "hashbrown 0.15.2", "hashlink", "indexmap", "log", @@ -2855,9 +2876,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.99" +version = "2.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" dependencies = [ "proc-macro2", "quote", @@ -3924,34 +3945,13 @@ dependencies = [ "synstructure", ] -[[package]] -name = "zerocopy" -version = "0.7.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" -dependencies = [ - "byteorder", - "zerocopy-derive 0.7.35", -] - [[package]] name = "zerocopy" version = "0.8.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" dependencies = [ - "zerocopy-derive 0.8.23", -] - -[[package]] -name = "zerocopy-derive" -version = "0.7.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "zerocopy-derive", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d900ff1..99824a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,4 +63,5 @@ deadpool-redis = "0.20.0" chrono-tz = "0.10.0" inventory = "0.3.17" oauth2 = "5.0.0" -futures = "0.3" \ No newline at end of file +futures = "0.3" +dashmap = "6.1.0" \ No newline at end of file diff --git a/server/Cargo.toml b/server/Cargo.toml index b4ed77f..c8860dd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -22,6 +22,7 @@ moka = { workspace = true, features = ["future", "logging"] } lazy_static = { workspace = true } tokio-cron-scheduler = { workspace = true } tower = { workspace = true } +dashmap = { workspace = true } library = { path = "../library" } domain = { path = "../domain" } diff --git a/server/src/service/websocket_service.rs b/server/src/service/websocket_service.rs index 34ad600..141c84f 100644 --- a/server/src/service/websocket_service.rs +++ b/server/src/service/websocket_service.rs @@ -2,25 +2,37 @@ use std::net::SocketAddr; use std::ops::ControlFlow; use axum::extract::ws::{Message, WebSocket}; +use dashmap::DashMap; use futures::stream::{SplitSink, SplitStream}; -use futures::{SinkExt, StreamExt}; +use futures::StreamExt; +use lazy_static::lazy_static; use library::context::Context; +lazy_static!{ + static ref WS_POOL: DashMap> = DashMap::>::new(); +} + /// Actual websocket statemachine (one will be spawned per connection) pub async fn handle_socket(socket: WebSocket, _who: SocketAddr, context: Context) { let account = context.get_account().unwrap(); tracing::info!("`{:?}` at {:?} connected, user is {:?}", account, _who, account.username); let (sender, receiver) = socket.split(); - tokio::spawn(write(sender)); - tokio::spawn(read(receiver)); + // tokio::spawn(write(sender)); + tokio::spawn(read(account.id.clone(), receiver)); + + WS_POOL.insert(account.id.clone(), sender); } -async fn read(mut receiver: SplitStream) { +async fn read(account_id: String, mut receiver: SplitStream) { loop { match receiver.next().await { Some(Ok(msg)) => { - process_message(msg); + if let ControlFlow::Break(_) = process_message(msg) { + // 收到关闭通知,移除该连接,并跳出循环 + WS_POOL.remove(&account_id); + break; + } }, Some(Err(err)) => { tracing::error!("读取消息失败 {:?}", err); @@ -30,20 +42,6 @@ async fn read(mut receiver: SplitStream) { } } -async fn write(mut sender: SplitSink) { - loop { - match sender.send(Message::text("haha")).await { - Ok(_) => { - // tracing::info!("发送成功"); - }, - Err(err) => { - tracing::error!("发送失败 {:?}", err); - }, - } - tokio::time::sleep(std::time::Duration::from_millis(2000)).await; - } -} - fn process_message(msg: Message) -> ControlFlow<(), ()> { match msg { Message::Text(t) => { @@ -51,10 +49,7 @@ fn process_message(msg: Message) -> ControlFlow<(), ()> { } Message::Close(c) => { if let Some(cf) = c { - tracing::info!( - "收到关闭通知 code {}, reason `{}`", - cf.code, cf.reason - ); + tracing::info!("收到关闭通知 code {}, reason `{}`", cf.code, cf.reason); } else { tracing::info!(">>> somehow sent close message without CloseFrame"); } @@ -69,6 +64,21 @@ fn process_message(msg: Message) -> ControlFlow<(), ()> { ControlFlow::Continue(()) } +// async fn write(mut sender: SplitSink) { +// loop { +// match sender.send(Message::text("haha")).await { +// Ok(_) => { +// // tracing::info!("发送成功"); +// }, +// Err(err) => { +// tracing::error!("发送失败 {:?}", err); +// }, +// } +// tokio::time::sleep(std::time::Duration::from_millis(2000)).await; +// } +// } + + // fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> { // match msg { // Message::Text(t) => {