使用inventory实现定时任务自动处理,无需手动编写代码注册定时任务

This commit is contained in:
李运家 2024-10-28 20:22:46 +08:00
parent 3e08824a2d
commit 3579475c9b
7 changed files with 52 additions and 19 deletions

7
Cargo.lock generated
View File

@ -1093,6 +1093,12 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "inventory"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f958d3d68f4167080a18141e10381e7634563984a537f2a49a30fd8e53ac5767"
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.9.0" version = "2.9.0"
@ -1179,6 +1185,7 @@ dependencies = [
"http-body-util", "http-body-util",
"hyper", "hyper",
"i18n", "i18n",
"inventory",
"jsonwebtoken", "jsonwebtoken",
"lazy_static", "lazy_static",
"macro", "macro",

View File

@ -60,4 +60,5 @@ strum_macros = "0.26.3"
hex = "0.4.3" hex = "0.4.3"
redis = "0.27.4" redis = "0.27.4"
deadpool-redis = "0.18.0" deadpool-redis = "0.18.0"
chrono-tz = "0.10.0" chrono-tz = "0.10.0"
inventory = "0.3.15"

View File

@ -39,6 +39,7 @@ hex = { workspace = true }
redis = { workspace = true, features = ["tokio-comp", "json"] } redis = { workspace = true, features = ["tokio-comp", "json"] }
deadpool-redis = { workspace = true } deadpool-redis = { workspace = true }
chrono-tz = { workspace = true } chrono-tz = { workspace = true }
inventory = { workspace = true }
domain = { path = "../domain" } domain = { path = "../domain" }
i18n = { path = "../i18n" } i18n = { path = "../i18n" }

View File

@ -3,8 +3,14 @@ use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
use chrono_tz::Tz; use chrono_tz::Tz;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError}; use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};
pub use inventory::submit;
type TaskFun = dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync; type TaskFun = dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync;
pub trait TaskMethod: Send + Sync + 'static {
fn ge_task(&self) -> Task;
}
pub struct Task{ pub struct Task{
pub job: Arc<TaskFun>, pub job: Arc<TaskFun>,
pub cron: Option<String>, pub cron: Option<String>,
@ -13,14 +19,15 @@ pub struct Task{
} }
/// 启动定时任务 /// 启动定时任务
pub async fn start(tasks: Vec<Task>) { pub async fn start() {
let tasks = get_task_methods();
match schedule(tasks).await { match schedule(tasks).await {
Ok(_) => tracing::info!("定时任务启动成功"), Ok(_) => tracing::info!("定时任务启动成功"),
Err(err) => tracing::error!(error = ?err, "定时任务启动失败"), Err(err) => tracing::error!(error = ?err, "定时任务启动失败"),
} }
} }
async fn schedule(tasks: Vec<Task>) -> Result<(), JobSchedulerError> { async fn schedule(tasks: Vec<Arc<Task>>) -> Result<(), JobSchedulerError> {
let mut scheduler = JobScheduler::new().await?; let mut scheduler = JobScheduler::new().await?;
scheduler.init().await?; scheduler.init().await?;
@ -61,3 +68,22 @@ async fn schedule(tasks: Vec<Task>) -> Result<(), JobSchedulerError> {
scheduler.start().await?; scheduler.start().await?;
Ok(()) Ok(())
} }
inventory::collect!(&'static dyn TaskMethod);
#[macro_export]
macro_rules! submit_task {
($ty:ident) => {
::library::task::submit! {
&$ty as &dyn ::library::task::TaskMethod
}
};
}
fn get_task_methods() -> Vec<Arc<Task>> {
let mut result = Vec::<Arc<Task>>::new();
for method in inventory::iter::<&dyn TaskMethod> {
result.push(Arc::new(method.ge_task()));
}
result
}

View File

@ -77,12 +77,12 @@ pub fn gen_task(attr: TokenStream, item: TokenStream) -> TokenStream {
let TaskArgs{ cron, interval, time_zone } = parse_macro_input!(attr as TaskArgs); let TaskArgs{ cron, interval, time_zone } = parse_macro_input!(attr as TaskArgs);
// 将 method 转换为小写并生成标识符 // 将 method 转换为小写并生成标识符
let vis = func.vis.clone(); // let vis = func.vis.clone();
let ident = func.sig.ident.clone(); let ident = func.sig.ident.clone();
let result = quote!{ let result = quote!{
let mut _result = library::task::Task { let mut _result = library::task::Task {
job: std::sync::Arc::new(|| Box::pin(#ident ())), job: std::sync::Arc::new(|| Box::pin(#ident::#ident ())),
cron: None, cron: None,
interval: None, interval: None,
time_zone: None, time_zone: None,
@ -100,11 +100,18 @@ pub fn gen_task(attr: TokenStream, item: TokenStream) -> TokenStream {
}; };
let generated = quote! { let generated = quote! {
#vis fn #ident () -> library::task::Task { #[allow(non_camel_case_types)]
struct #ident;
impl #ident {
#func #func
#result
} }
impl library::task::TaskMethod for #ident {
fn ge_task(&self) -> library::task::Task {
#result
}
}
::library::submit_task!(#ident);
}; };
TokenStream::from(generated) TokenStream::from(generated)

View File

@ -1,6 +1,5 @@
use axum::{body::Body, extract::Request, http, routing::get, Router}; use axum::{body::Body, extract::Request, http, routing::get, Router};
use library::{config, task}; use library::{config, task};
use tasks::get_tasks;
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
@ -15,7 +14,7 @@ pub async fn serve() {
tracing::info!("服务监听地址: {}", addr); tracing::info!("服务监听地址: {}", addr);
// 启动任务 // 启动任务
task::start(get_tasks()).await; task::start().await;
// 启动应用服务 // 启动应用服务
axum::serve(listener, init()).await.unwrap(); axum::serve(listener, init()).await.unwrap();
} }

View File

@ -1,9 +1 @@
use library::task::Task; pub mod google_tasks;
pub mod google_tasks;
/// 定时任务维护器
pub fn get_tasks() -> Vec<Task> {
vec![google_tasks::xxx_task(), google_tasks::xxx_task2()]
}