diff --git a/Cargo.lock b/Cargo.lock index 9320947..ff07c48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,27 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "chrono-tz" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6dd8046d00723a59a2f8c5f295c515b9bb9a331ee4f8f3d4dd49e428acd3b6" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94fea34d77a245229e7746bd2beb786cd2a896f306ff491fb8cecb3074b10a7" +dependencies = [ + "parse-zoneinfo", + "phf_codegen", +] + [[package]] name = "chuanyue-service" version = "0.1.0" @@ -1148,6 +1169,7 @@ dependencies = [ "axum", "axum-extra", "chrono", + "chrono-tz", "deadpool-redis", "domain", "futures-util", @@ -1534,6 +1556,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" +dependencies = [ + "regex", +] + [[package]] name = "paste" version = "1.0.15" @@ -1565,6 +1596,44 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -2166,6 +2235,12 @@ dependencies = [ "time", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "slab" version = "0.4.9" diff --git a/Cargo.toml b/Cargo.toml index b428a94..f9060aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,4 +59,5 @@ strum = "0.26.3" strum_macros = "0.26.3" hex = "0.4.3" redis = "0.27.4" -deadpool-redis = "0.18.0" \ No newline at end of file +deadpool-redis = "0.18.0" +chrono-tz = "0.10.0" \ No newline at end of file diff --git a/app.toml b/app.toml index 33028ad..884dde3 100644 --- a/app.toml +++ b/app.toml @@ -20,8 +20,8 @@ expires = 1800 refresh_expires = 3600 [redis] -url = "47.95.198.7:33000" -# url = "127.0.0.1:33001" +# url = "47.95.198.7:33000" +url = "127.0.0.1:33001" password = "3aB7kRt9pDf1nQzW" db = 0 diff --git a/library/Cargo.toml b/library/Cargo.toml index 40d4995..2337d57 100644 --- a/library/Cargo.toml +++ b/library/Cargo.toml @@ -38,6 +38,7 @@ hyper = { workspace = true } hex = { workspace = true } redis = { workspace = true, features = ["tokio-comp", "json"] } deadpool-redis = { workspace = true } +chrono-tz = { workspace = true } domain = { path = "../domain" } i18n = { path = "../i18n" } diff --git a/library/src/task.rs b/library/src/task.rs index fd51e18..24a1bc5 100644 --- a/library/src/task.rs +++ b/library/src/task.rs @@ -1,9 +1,15 @@ +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; + +use chrono_tz::Tz; use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError}; -pub struct Task { - pub name: String, - pub job: Box, - pub trigger: String +type TaskFun = dyn Fn() -> Pin + Send>> + Send + Sync; + +pub struct Task{ + pub job: Arc, + pub cron: Option, + pub interval: Option, + pub time_zone: Option, } /// 启动定时任务 @@ -21,11 +27,30 @@ async fn schedule(tasks: Vec) -> Result<(), JobSchedulerError> { // 添加任务 for task in tasks { let task: &'static Task = Box::leak(Box::new(task)); - let _task_uuid = scheduler.add(Job::new_async(task.trigger.as_str(), move |_uuid, _l| { - Box::pin(async move { - (&task.job)(); - }) - })?).await?; + if task.cron.is_some() { + let schedule = task.cron.as_ref().unwrap().as_str(); + if let Some(time_zone) = task.time_zone.as_ref() { + tracing::info!("添加定时任务: {} - {}", task.cron.as_ref().unwrap(), time_zone); + let tz: Tz = time_zone.parse().unwrap(); + let job_fun = task.job.clone(); + let _task_uuid = scheduler.add(Job::new_cron_job_async_tz(schedule, tz, move |_uuid, _l| { + job_fun() + })?).await?; + } else { + tracing::info!("添加定时任务: {}", task.cron.as_ref().unwrap()); + let job_fun = task.job.clone(); + let _task_uuid = scheduler.add(Job::new_cron_job_async(schedule, move |_uuid, _l| { + job_fun() + })?).await?; + } + } else if task.interval.is_some() { + let schedule = task.interval.unwrap(); + tracing::info!("添加定时任务: {}", task.interval.as_ref().unwrap()); + let job_fun = task.job.clone(); + let _task_uuid = scheduler.add(Job::new_repeated_async(Duration::from_secs(schedule), move |_uuid, _l| { + job_fun() + })?).await?; + } } // 添加关闭监听 diff --git a/macro/src/lib.rs b/macro/src/lib.rs index 4536890..ec622b7 100644 --- a/macro/src/lib.rs +++ b/macro/src/lib.rs @@ -9,6 +9,7 @@ use syn::{parse_macro_input, DeriveInput}; // 用于解析宏输入 mod responsable; mod route; +mod task; /// `Responsable`的过程宏,将结构体实现`IntoResponse` trait // #[proc_macro_derive(Responsable, attributes(status, headers))] @@ -95,4 +96,12 @@ pub fn head(attr: TokenStream, item: TokenStream) -> TokenStream { #[proc_macro_attribute] pub fn trace(attr: TokenStream, item: TokenStream) -> TokenStream { route::gen_route(attr, item, "trace") +} + +/// 定时任务 +/// +/// 参数为cron表达式 +#[proc_macro_attribute] +pub fn task(attr: TokenStream, item: TokenStream) -> TokenStream { + task::gen_task(attr, item) } \ No newline at end of file diff --git a/macro/src/task.rs b/macro/src/task.rs new file mode 100644 index 0000000..f325a7e --- /dev/null +++ b/macro/src/task.rs @@ -0,0 +1,111 @@ +extern crate proc_macro2; +extern crate quote; +extern crate syn; + +extern crate proc_macro; + +use parse::{Parse, ParseStream}; +use proc_macro::{Span, TokenStream}; +use punctuated::Punctuated; +// 用于处理宏输入 +use quote::quote; +// 用于生成代码 +use syn::*; +use syn::{parse_macro_input, ItemFn}; + +#[derive(Debug)] +struct TaskArgs { + cron: String, + interval: u64, + time_zone: String, +} + +// 实现 Parse trait 以支持解析参数 +impl Parse for TaskArgs { + fn parse(input: ParseStream) -> Result { + let args = Punctuated::::parse_terminated(input)?; + + let mut task_args = TaskArgs { + cron: String::from(""), + interval: 0, + time_zone: String::from(""), + }; + + for arg in args { + if let Meta::NameValue(meta) = arg { + match meta.path.get_ident().unwrap().to_string().as_str() { + "cron" => { + if let Expr::Lit(ExprLit{ lit, .. }) = meta.value { + if let Lit::Str(value) = lit { + task_args.cron = value.value(); + } + } + } + "interval" => { + if let Expr::Lit(ExprLit{ lit, .. }) = meta.value { + if let Lit::Int(value) = lit { + task_args.interval = value.base10_parse().unwrap(); + } + } + } + "time_zone" => { + if let Expr::Lit(ExprLit{ lit, .. }) = meta.value { + if let Lit::Str(value) = lit { + task_args.time_zone = value.value(); + } + } + } + _ => {} + } + } + } + + if task_args.cron.is_empty() && task_args.interval <= 0 { + return Err(syn::Error::new( + Span::call_site().into(), + "必须设置有效的cron表达式或者interval参数", + )); + } + + Ok(task_args) + } +} + +pub fn gen_task(attr: TokenStream, item: TokenStream) -> TokenStream { + let func = parse_macro_input!(item as ItemFn); + // 解析宏的参数 + let TaskArgs{ cron, interval, time_zone } = parse_macro_input!(attr as TaskArgs); + + // 将 method 转换为小写并生成标识符 + let vis = func.vis.clone(); + let ident = func.sig.ident.clone(); + + let result = quote!{ + let mut _result = library::task::Task { + job: std::sync::Arc::new(|| Box::pin(#ident ())), + cron: None, + interval: None, + time_zone: None, + }; + if !#cron.is_empty() { + _result.cron = Some(#cron.to_string()); + } + if #interval > 0 { + _result.interval = Some(#interval); + } + if !#time_zone.is_empty() { + _result.time_zone = Some(#time_zone.to_string()); + } + _result + }; + + let generated = quote! { + #vis fn #ident () -> library::task::Task { + #func + + #result + } + }; + + TokenStream::from(generated) +} diff --git a/server/src/tasks/google_tasks.rs b/server/src/tasks/google_tasks.rs index 45b0599..74be7dd 100644 --- a/server/src/tasks/google_tasks.rs +++ b/server/src/tasks/google_tasks.rs @@ -1,3 +1,12 @@ -pub fn xxx_task() { - tracing::info!("定时任务执行中"); +use macros::task; + + +#[task(cron = "0 0 0/1 * * ?", time_zone = "Asia/Shanghai")] +pub async fn xxx_task() { + tracing::info!("cron定时任务执行中"); +} + +#[task(interval = 3600)] +pub async fn xxx_task2() { + tracing::info!("interval定时任务执行中"); } \ No newline at end of file diff --git a/server/src/tasks/mod.rs b/server/src/tasks/mod.rs index d24a587..49a9f42 100644 --- a/server/src/tasks/mod.rs +++ b/server/src/tasks/mod.rs @@ -4,9 +4,6 @@ pub mod google_tasks; /// 定时任务维护器 pub fn get_tasks() -> Vec { - vec!(Task { - name: "google_iap".to_string(), - job: Box::new(google_tasks::xxx_task), - trigger: "0 0 0/1 * * ?".to_string(), - }) -} \ No newline at end of file + vec![google_tasks::xxx_task(), google_tasks::xxx_task2()] +} +