diff --git a/library/src/cache.rs b/library/src/cache.rs index 910f9a8..f02738a 100644 --- a/library/src/cache.rs +++ b/library/src/cache.rs @@ -4,7 +4,9 @@ use domain::entities::account::Account; use lazy_static::lazy_static; use library::config; use moka::{ - future::{Cache, CacheBuilder}, notification::RemovalCause, policy::EvictionPolicy + future::{Cache, CacheBuilder}, + notification::{ListenerFuture, RemovalCause}, + policy::EvictionPolicy, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -18,46 +20,57 @@ pub struct CacheAccount { pub struct ServerCache { pub cache: Cache, + pub cache_type: Arc, } -impl Default for ServerCache -where - S: std::hash::Hash + Eq + Debug + ToString + Send + Sync + 'static, - D: Debug + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, -{ - fn default() -> Self { - ServerCache { - cache: CacheBuilder::new(20480) - .name("login_cache") - .eviction_policy(EvictionPolicy::lru()) - .time_to_live(Duration::from_secs(config!().jwt.expires as u64)) - .async_eviction_listener(|key: Arc, value: D, cause| Box::pin(async move { - tracing::info!( - "内存缓存数据移除 key: {:?}, value: {:?}, cause: {:?}", - key, - value, - cause - ); - if cause != RemovalCause::Replaced { - RedisCache { - cache_type: std::any::type_name::().to_string(), - cache_key: key.to_string(), - ..Default::default() - }.remove().await; - } - })) - .build(), - } - } -} +type CacheEvictionListener = Box, D, RemovalCause) -> ListenerFuture + Send + Sync + 'static>; impl ServerCache where D: Debug + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, { + fn default(cache_type: &str) -> Self { + let cache_type = Arc::new(cache_type.to_string()); + let cache_type_raw = cache_type.clone(); + let cache_eviction_listener: CacheEvictionListener = + Box::new(move |key: Arc, value: D, cause: RemovalCause| { + Box::pin({ + let catch_type: Arc = cache_type_raw.clone(); + async move { + tracing::info!( + "内存缓存数据移除 key: {:?}, value: {:?}, cause: {:?}", + key, + value, + cause + ); + // 如果缓存数据不是过期或被替换,则从redis中同步删除缓存数据 + if cause != RemovalCause::Expired && cause != RemovalCause::Replaced { + RedisCache { + cache_type: catch_type, + cache_key: key.to_string(), + ..Default::default() + } + .remove() + .await; + } + } + }) + }); + let cache = CacheBuilder::new(20480) + .name("login_cache") + .eviction_policy(EvictionPolicy::lru()) + .time_to_live(Duration::from_secs(config!().jwt.expires as u64)) + .async_eviction_listener(cache_eviction_listener) + .build(); + ServerCache { + cache, + cache_type, + } + } + + /// 初始化缓存,从redis中读取缓存数据 pub async fn init(&self) { - let cache_type = std::any::type_name::().to_string(); - let cache_datas = RedisCache::get(cache_type).await; + let cache_datas = RedisCache::get(self.cache_type.clone()).await; for element in cache_datas { let cache_key = element.cache_key.clone(); let data_json = element.data; @@ -66,13 +79,15 @@ where } } + /// 添加内存缓存 async fn add_cache(&self, key: String, value: D) { self.cache.insert(key, value).await; } + /// 添加缓存,包括内存缓存和redis缓存 pub async fn insert(&self, key: String, value: D) { - let cache_data = RedisCache{ - cache_type: std::any::type_name::().to_string(), + let cache_data = RedisCache { + cache_type: self.cache_type.clone(), data: serde_json::to_string(&value).unwrap(), cache_key: key.to_string(), ..Default::default() @@ -81,33 +96,52 @@ where self.cache.insert(key, value).await; } + /// 从内存获取缓存 pub async fn get(&self, key: &str) -> Option { self.cache.get(key).await } + /// 从内存移除缓存 pub async fn remove(&self, key: &str) { + let cache_data = RedisCache { + cache_type: self.cache_type.clone(), + cache_key: key.to_string(), + ..Default::default() + }; + cache_data.remove().await; self.cache.remove(key).await; } } +/// Redis缓存封装 #[derive(Debug, Clone, Default)] struct RedisCache { - pub cache_type: String, + pub cache_type: Arc, pub cache_key: String, pub data: String, } impl RedisCache { + /// 向redis缓存Hash中插入数据 pub async fn insert(&self) { - match REDIS_CONN.hset(&self.cache_type, &self.cache_key, &self.data).await { - Ok(_) => {}, + match REDIS_CONN + .hset_ex( + &self.cache_type, + &self.cache_key, + &self.data, + config!().jwt.expires, + ) + .await + { + Ok(_) => {} Err(err) => { - tracing::error!("向缓存Set插入数据失败: {:?}", err); + tracing::error!("向缓存Hash插入数据失败: {:?}", err); } } } - pub async fn get(cache_type: String) -> Vec { + /// 从redis缓存Hash中获取数据 + pub async fn get(cache_type: Arc) -> Vec { let mut result = Vec::new(); match REDIS_CONN.hgetall(&cache_type).await { Ok(cache_datas) => { @@ -119,130 +153,32 @@ impl RedisCache { }; result.push(cache_data); } - }, + } Err(err) => { - tracing::error!("从缓存Set获取数据失败: {:?}", err); - }, + tracing::error!("从缓存Hash获取数据失败: {:?}", err); + } } - + result } + /// 从redis缓存Hash中删除数据 pub async fn remove(&self) { match REDIS_CONN.hdel(&self.cache_type, &self.cache_key).await { - Ok(_) => {}, + Ok(_) => { + tracing::info!("从缓存Hash删除数据成功"); + } Err(err) => { - tracing::error!("从缓存Set删除数据失败: {:?}", err); + tracing::error!("从缓存Hash删除数据失败: {:?}", err); } } } } -// impl CacheData { -// /// 向缓存数据表中插入数据 -// /// -// /// 先判断数据是否存在,若已经存在,则删除 -// /// -// /// 然后保存新的缓存数据 -// pub async fn insert(&self) { -// let mut transaction = db!().begin().await.unwrap(); -// let exist_cache_data = match sqlx::query_as!( -// CacheData, -// r#"select * from cache_data where cache_key = $1"#, -// self.cache_key -// ).fetch_all(&mut *transaction).await { -// Ok(data) => data, -// Err(err) => { -// tracing::error!("从数据库获取缓存数据失败: {:?}", err); -// tracing::error!("{}", err); -// Vec::new() -// } -// }; -// for ele in exist_cache_data { -// match sqlx::query!( -// r#" -// delete from cache_data where id = $1 -// "#, -// ele.id -// ) -// .execute(&mut *transaction) -// .await { -// Ok(_) => {}, -// Err(err) => { -// tracing::error!("从数据库删除缓存数据失败: {:?}", err); -// tracing::error!("{}", err); -// transaction.rollback().await.unwrap(); -// return ; -// } -// } -// } - -// match sqlx::query!( -// r#" -// insert into cache_data (cache_type, cache_key, data) -// values ($1, $2, $3) -// "#, -// self.cache_type, -// self.cache_key, -// self.data -// ) -// .execute(&mut *transaction) -// .await -// { -// Ok(_) => transaction.commit().await.unwrap(), -// Err(err) => { -// tracing::error!("向数据库插入缓存数据失败: {:?}", err); -// transaction.rollback().await.unwrap() -// }, -// } -// } - -// pub async fn get(cache_type: String) -> Vec { -// let mut cache_data = Vec::new(); -// match sqlx::query_as!( -// CacheData, -// r#" -// select id, cache_type, cache_key, data from cache_data where cache_type = $1 -// "#, -// cache_type -// ) -// .fetch_all(db!()) -// .await -// { -// Ok(data) => { -// cache_data = data; -// } -// Err(err) => { -// tracing::error!("从数据库获取缓存数据失败: {:?}", err); -// tracing::error!("{}", err); -// } -// } -// cache_data -// } - -// pub async fn remove(cache_key: String) { -// match sqlx::query!( -// r#" -// delete from cache_data where cache_key = $1 -// "#, -// cache_key -// ) -// .execute(db!()) -// .await -// { -// Ok(_) => {}, -// Err(err) => { -// tracing::error!("从数据库删除缓存数据失败: {:?}", err); -// tracing::error!("{}", err); -// } -// } -// } -// } - pub async fn init_cache() { LOGIN_ACCOUNT_CACHE.init().await; } lazy_static! { - pub static ref LOGIN_ACCOUNT_CACHE: ServerCache> = ServerCache::default(); + pub static ref LOGIN_ACCOUNT_CACHE: ServerCache> = ServerCache::default("CACHE_ACCOUNT"); } diff --git a/library/src/core/redis.rs b/library/src/core/redis.rs index d6b5642..2d4930b 100644 --- a/library/src/core/redis.rs +++ b/library/src/core/redis.rs @@ -1,6 +1,6 @@ use deadpool_redis::{Config, Connection, Pool}; use lazy_static::lazy_static; -use redis::Cmd; +use redis::{Cmd, ExpireOption}; use crate::{config, model::response::{ResErr, ResResult}}; @@ -75,6 +75,25 @@ impl RedisConnManager { } } + pub async fn hset_ex(&self, key: &str, field: &str, value: &str, expire: i64) -> ResResult<()> { + let mut conn = self.get_conn().await.unwrap(); + match Cmd::hset(key, field, value).query_async::(&mut conn).await { + Ok(_) => { + match Cmd::hexpire(key, expire, ExpireOption::NONE, field).query_async::>(&mut conn).await { + Ok(_) => Ok(()), + Err(err) => { + tracing::error!("redis hexpire key:{} field:{} value:{} error:{}", key, field, value, err); + Err(ResErr::service("redis 执行 hexpire 命令失败")) + }, + } + }, + Err(err) => { + tracing::error!("redis hset key:{} field:{} value:{} error:{}", key, field, value, err); + Err(ResErr::service("redis 执行 hset 命令失败")) + }, + } + } + pub async fn hget(&self, key: &str, field: &str) -> ResResult { let mut conn = self.get_conn().await.unwrap(); match Cmd::hget(key, field).query_async::(&mut conn).await {