diff --git a/server/src/service/websocket_service.rs b/server/src/service/websocket_service.rs index 141c84f..111e2bf 100644 --- a/server/src/service/websocket_service.rs +++ b/server/src/service/websocket_service.rs @@ -1,15 +1,19 @@ +use std::cell::RefCell; use std::net::SocketAddr; use std::ops::ControlFlow; +use std::sync::Arc; use axum::extract::ws::{Message, WebSocket}; use dashmap::DashMap; +use futures::lock::Mutex; use futures::stream::{SplitSink, SplitStream}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use lazy_static::lazy_static; use library::context::Context; +use library::model::response::ResResult; lazy_static!{ - static ref WS_POOL: DashMap> = DashMap::>::new(); + static ref WS_POOL: DashMap>>> = DashMap::>>>::new(); } /// Actual websocket statemachine (one will be spawned per connection) @@ -21,7 +25,16 @@ pub async fn handle_socket(socket: WebSocket, _who: SocketAddr, context: Context // tokio::spawn(write(sender)); tokio::spawn(read(account.id.clone(), receiver)); - WS_POOL.insert(account.id.clone(), sender); + tokio::spawn(send()); + + WS_POOL.insert(account.id.clone(), Arc::new(Mutex::new(sender))); +} + +async fn send() { + loop { + tokio::time::sleep(std::time::Duration::from_millis(2000)).await; + send_message(Message::Text("哈哈".into())).await; + } } async fn read(account_id: String, mut receiver: SplitStream) { @@ -64,6 +77,16 @@ fn process_message(msg: Message) -> ControlFlow<(), ()> { ControlFlow::Continue(()) } +pub async fn send_message(msg: Message) -> ResResult<()> { + for ele in WS_POOL.iter() { + let mut sender = ele.value().lock().await; + if let Err(err) = sender.send(msg.clone()).await { + tracing::error!("Failed to send message: {:?}", err); + } + } + Ok(()) +} + // async fn write(mut sender: SplitSink) { // loop { // match sender.send(Message::text("haha")).await {