增加hash field过期时间

This commit is contained in:
liyunjia 2024-10-11 18:13:54 +08:00
parent 160abfb763
commit 274ef8b68d
2 changed files with 104 additions and 149 deletions

View File

@ -4,7 +4,9 @@ use domain::entities::account::Account;
use lazy_static::lazy_static;
use library::config;
use moka::{
future::{Cache, CacheBuilder}, notification::RemovalCause, policy::EvictionPolicy
future::{Cache, CacheBuilder},
notification::{ListenerFuture, RemovalCause},
policy::EvictionPolicy,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
@ -18,46 +20,57 @@ pub struct CacheAccount {
pub struct ServerCache<S, D> {
pub cache: Cache<S, D>,
pub cache_type: Arc<String>,
}
impl<S, D> Default for ServerCache<S, D>
type CacheEvictionListener<D> = Box<dyn Fn(Arc<String>, D, RemovalCause) -> ListenerFuture + Send + Sync + 'static>;
impl<D> ServerCache<String, D>
where
S: std::hash::Hash + Eq + Debug + ToString + Send + Sync + 'static,
D: Debug + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
{
fn default() -> Self {
ServerCache {
cache: CacheBuilder::new(20480)
.name("login_cache")
.eviction_policy(EvictionPolicy::lru())
.time_to_live(Duration::from_secs(config!().jwt.expires as u64))
.async_eviction_listener(|key: Arc<S>, value: D, cause| Box::pin(async move {
fn default(cache_type: &str) -> Self {
let cache_type = Arc::new(cache_type.to_string());
let cache_type_raw = cache_type.clone();
let cache_eviction_listener: CacheEvictionListener<D> =
Box::new(move |key: Arc<String>, value: D, cause: RemovalCause| {
Box::pin({
let catch_type: Arc<String> = cache_type_raw.clone();
async move {
tracing::info!(
"内存缓存数据移除 key: {:?}, value: {:?}, cause: {:?}",
key,
value,
cause
);
if cause != RemovalCause::Replaced {
// 如果缓存数据不是过期或被替换则从redis中同步删除缓存数据
if cause != RemovalCause::Expired && cause != RemovalCause::Replaced {
RedisCache {
cache_type: std::any::type_name::<D>().to_string(),
cache_type: catch_type,
cache_key: key.to_string(),
..Default::default()
}.remove().await;
}
}))
.build(),
.remove()
.await;
}
}
})
});
let cache = CacheBuilder::new(20480)
.name("login_cache")
.eviction_policy(EvictionPolicy::lru())
.time_to_live(Duration::from_secs(config!().jwt.expires as u64))
.async_eviction_listener(cache_eviction_listener)
.build();
ServerCache {
cache,
cache_type,
}
}
impl<D> ServerCache<String, D>
where
D: Debug + Send + Sync + Clone + Serialize + DeserializeOwned + 'static,
{
/// 初始化缓存从redis中读取缓存数据
pub async fn init(&self) {
let cache_type = std::any::type_name::<D>().to_string();
let cache_datas = RedisCache::get(cache_type).await;
let cache_datas = RedisCache::get(self.cache_type.clone()).await;
for element in cache_datas {
let cache_key = element.cache_key.clone();
let data_json = element.data;
@ -66,13 +79,15 @@ where
}
}
/// 添加内存缓存
async fn add_cache(&self, key: String, value: D) {
self.cache.insert(key, value).await;
}
/// 添加缓存包括内存缓存和redis缓存
pub async fn insert(&self, key: String, value: D) {
let cache_data = RedisCache {
cache_type: std::any::type_name::<D>().to_string(),
cache_type: self.cache_type.clone(),
data: serde_json::to_string(&value).unwrap(),
cache_key: key.to_string(),
..Default::default()
@ -81,33 +96,52 @@ where
self.cache.insert(key, value).await;
}
/// 从内存获取缓存
pub async fn get(&self, key: &str) -> Option<D> {
self.cache.get(key).await
}
/// 从内存移除缓存
pub async fn remove(&self, key: &str) {
let cache_data = RedisCache {
cache_type: self.cache_type.clone(),
cache_key: key.to_string(),
..Default::default()
};
cache_data.remove().await;
self.cache.remove(key).await;
}
}
/// Redis缓存封装
#[derive(Debug, Clone, Default)]
struct RedisCache {
pub cache_type: String,
pub cache_type: Arc<String>,
pub cache_key: String,
pub data: String,
}
impl RedisCache {
/// 向redis缓存Hash中插入数据
pub async fn insert(&self) {
match REDIS_CONN.hset(&self.cache_type, &self.cache_key, &self.data).await {
Ok(_) => {},
match REDIS_CONN
.hset_ex(
&self.cache_type,
&self.cache_key,
&self.data,
config!().jwt.expires,
)
.await
{
Ok(_) => {}
Err(err) => {
tracing::error!("向缓存Set插入数据失败: {:?}", err);
tracing::error!("向缓存Hash插入数据失败: {:?}", err);
}
}
}
pub async fn get(cache_type: String) -> Vec<RedisCache> {
/// 从redis缓存Hash中获取数据
pub async fn get(cache_type: Arc<String>) -> Vec<RedisCache> {
let mut result = Vec::new();
match REDIS_CONN.hgetall(&cache_type).await {
Ok(cache_datas) => {
@ -119,130 +153,32 @@ impl RedisCache {
};
result.push(cache_data);
}
},
}
Err(err) => {
tracing::error!("从缓存Set获取数据失败: {:?}", err);
},
tracing::error!("从缓存Hash获取数据失败: {:?}", err);
}
}
result
}
/// 从redis缓存Hash中删除数据
pub async fn remove(&self) {
match REDIS_CONN.hdel(&self.cache_type, &self.cache_key).await {
Ok(_) => {},
Ok(_) => {
tracing::info!("从缓存Hash删除数据成功");
}
Err(err) => {
tracing::error!("从缓存Set删除数据失败: {:?}", err);
tracing::error!("从缓存Hash删除数据失败: {:?}", err);
}
}
}
}
// impl CacheData {
// /// 向缓存数据表中插入数据
// ///
// /// 先判断数据是否存在,若已经存在,则删除
// ///
// /// 然后保存新的缓存数据
// pub async fn insert(&self) {
// let mut transaction = db!().begin().await.unwrap();
// let exist_cache_data = match sqlx::query_as!(
// CacheData,
// r#"select * from cache_data where cache_key = $1"#,
// self.cache_key
// ).fetch_all(&mut *transaction).await {
// Ok(data) => data,
// Err(err) => {
// tracing::error!("从数据库获取缓存数据失败: {:?}", err);
// tracing::error!("{}", err);
// Vec::new()
// }
// };
// for ele in exist_cache_data {
// match sqlx::query!(
// r#"
// delete from cache_data where id = $1
// "#,
// ele.id
// )
// .execute(&mut *transaction)
// .await {
// Ok(_) => {},
// Err(err) => {
// tracing::error!("从数据库删除缓存数据失败: {:?}", err);
// tracing::error!("{}", err);
// transaction.rollback().await.unwrap();
// return ;
// }
// }
// }
// match sqlx::query!(
// r#"
// insert into cache_data (cache_type, cache_key, data)
// values ($1, $2, $3)
// "#,
// self.cache_type,
// self.cache_key,
// self.data
// )
// .execute(&mut *transaction)
// .await
// {
// Ok(_) => transaction.commit().await.unwrap(),
// Err(err) => {
// tracing::error!("向数据库插入缓存数据失败: {:?}", err);
// transaction.rollback().await.unwrap()
// },
// }
// }
// pub async fn get(cache_type: String) -> Vec<CacheData> {
// let mut cache_data = Vec::new();
// match sqlx::query_as!(
// CacheData,
// r#"
// select id, cache_type, cache_key, data from cache_data where cache_type = $1
// "#,
// cache_type
// )
// .fetch_all(db!())
// .await
// {
// Ok(data) => {
// cache_data = data;
// }
// Err(err) => {
// tracing::error!("从数据库获取缓存数据失败: {:?}", err);
// tracing::error!("{}", err);
// }
// }
// cache_data
// }
// pub async fn remove(cache_key: String) {
// match sqlx::query!(
// r#"
// delete from cache_data where cache_key = $1
// "#,
// cache_key
// )
// .execute(db!())
// .await
// {
// Ok(_) => {},
// Err(err) => {
// tracing::error!("从数据库删除缓存数据失败: {:?}", err);
// tracing::error!("{}", err);
// }
// }
// }
// }
pub async fn init_cache() {
LOGIN_ACCOUNT_CACHE.init().await;
}
lazy_static! {
pub static ref LOGIN_ACCOUNT_CACHE: ServerCache<String, Arc<CacheAccount>> = ServerCache::default();
pub static ref LOGIN_ACCOUNT_CACHE: ServerCache<String, Arc<CacheAccount>> = ServerCache::default("CACHE_ACCOUNT");
}

View File

@ -1,6 +1,6 @@
use deadpool_redis::{Config, Connection, Pool};
use lazy_static::lazy_static;
use redis::Cmd;
use redis::{Cmd, ExpireOption};
use crate::{config, model::response::{ResErr, ResResult}};
@ -75,6 +75,25 @@ impl RedisConnManager {
}
}
pub async fn hset_ex(&self, key: &str, field: &str, value: &str, expire: i64) -> ResResult<()> {
let mut conn = self.get_conn().await.unwrap();
match Cmd::hset(key, field, value).query_async::<i64>(&mut conn).await {
Ok(_) => {
match Cmd::hexpire(key, expire, ExpireOption::NONE, field).query_async::<Vec<i64>>(&mut conn).await {
Ok(_) => Ok(()),
Err(err) => {
tracing::error!("redis hexpire key:{} field:{} value:{} error:{}", key, field, value, err);
Err(ResErr::service("redis 执行 hexpire 命令失败"))
},
}
},
Err(err) => {
tracing::error!("redis hset key:{} field:{} value:{} error:{}", key, field, value, err);
Err(ResErr::service("redis 执行 hset 命令失败"))
},
}
}
pub async fn hget(&self, key: &str, field: &str) -> ResResult<String> {
let mut conn = self.get_conn().await.unwrap();
match Cmd::hget(key, field).query_async::<String>(&mut conn).await {