diff --git a/server/src/service/websocket_service.rs b/server/src/service/websocket_service.rs index 2d42d93..2e0de31 100644 --- a/server/src/service/websocket_service.rs +++ b/server/src/service/websocket_service.rs @@ -9,7 +9,7 @@ use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use lazy_static::lazy_static; use library::context::Context; -use library::model::response::ResResult; +use library::model::response::{ResErr, ResResult}; lazy_static!{ static ref WS_POOL: DashMap>>> = DashMap::>>>::new(); @@ -24,15 +24,17 @@ pub async fn handle_socket(socket: WebSocket, _who: SocketAddr, context: Context // tokio::spawn(write(sender)); tokio::spawn(read(account.id.clone(), receiver)); + // 测试消息发送 tokio::spawn(send()); WS_POOL.insert(account.id.clone(), Arc::new(Mutex::new(sender))); } +/// 测试消息发送 async fn send() { loop { tokio::time::sleep(std::time::Duration::from_millis(2000)).await; - send_message(Message::Text("哈哈".into())).await; + let _ = broadcast_message(Message::Text("哈哈".into())).await; } } @@ -76,14 +78,26 @@ fn process_message(msg: Message) -> ControlFlow<(), ()> { ControlFlow::Continue(()) } -pub async fn send_message(msg: Message) -> ResResult<()> { +/// 发送广播消息 +pub async fn broadcast_message(msg: Message) { for ele in WS_POOL.iter() { let mut sender = ele.value().lock().await; if let Err(err) = sender.send(msg.clone()).await { tracing::error!("Failed to send message: {:?}", err); } } - Ok(()) +} + +/// 发送定向消息 +pub async fn send_message(msg: Message, accoudIds: Vec) { + for ele in WS_POOL.iter() { + let mut sender = ele.value().lock().await; + if accoudIds.contains(&ele.key().to_string()) { + if let Err(err) = sender.send(msg.clone()).await { + tracing::error!("Failed to send message: {:?}", err); + } + } + } } // async fn write(mut sender: SplitSink) {