bvplan/bvplan/src/worker/mod.rs
2023-02-27 11:09:16 +01:00

84 lines
1.9 KiB
Rust

use celery::beat::DeltaSchedule;
use lazy_static::lazy_static;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use stdext::duration::DurationExt;
use crate::config;
pub mod update_info;
pub mod update_meta;
pub const QUEUE_NAME: &str = "celery";
lazy_static! {
pub static ref APP: Mutex<Option<Arc<celery::Celery>>> = Mutex::new(None);
}
#[tokio::main]
pub async fn init() {
*APP.lock().unwrap() = Some(
celery::app!(
broker = AMQPBroker { &config::CONFIG.amqp_url },
tasks = [
update_info::update_info,
update_meta::update_meta,
],
task_routes = [
"*" => QUEUE_NAME,
],
prefetch_count = 2,
heartbeat = Some(10)
)
.await
.unwrap(),
);
}
pub fn beat() -> impl std::future::Future<
Output = Result<
celery::beat::Beat<celery::beat::LocalSchedulerBackend>,
celery::error::BeatError,
>,
> {
celery::beat!(
broker = AMQPBroker { &config::CONFIG.amqp_url },
tasks = [
"update_info" => {
update_info::update_info,
schedule = DeltaSchedule::new(Duration::from_minutes(999)),
args = (),
},
"update_meta" => {
update_meta::update_meta,
schedule = DeltaSchedule::new(Duration::from_hours(6)),
args = (),
}
],
task_routes = [
"*" => QUEUE_NAME,
]
)
}
pub fn init_blocking() {
thread::spawn(|| {
tokio::runtime::Runtime::new().unwrap().spawn_blocking(init);
});
let dur = Duration::from_secs(1);
let mut i = 0;
loop {
if APP.lock().unwrap().is_some() {
break;
}
thread::sleep(dur);
i += 1;
if i >= 10 {
panic!("Worker not initialized after 10 seconds!");
}
}
}