use anyhow::{Context, Result}; use celery::error::TaskError; use celery::task::TaskResult; use chrono::prelude::*; use diesel::prelude::*; use lazy_static::lazy_static; use r2d2_redis::redis; use regex::Regex; use std::ops::DerefMut; use std::thread; use std::time::Duration; use crate::{cache, config, db}; async fn fetch_schoolyears(client: &untis::Client, db_conn: &mut PgConnection) -> Result { let existing_schoolyears = db::schema::schoolyears::table .select(db::schema::schoolyears::untis_id) .load::(db_conn)?; diesel::insert_into(db::schema::schoolyears::table) .values( &client .schoolyears() .await? .iter() .filter(|y| !existing_schoolyears.contains(&y.id)) .map(|y| db::models::NewSchoolyear { untis_id: y.id, name: &y.name, start_date: y.start_date, end_date: y.end_date, }) .collect::>(), ) .execute(db_conn)?; Ok(db::schema::schoolyears::table .filter(db::schema::schoolyears::untis_id.eq(client.current_schoolyear().await?.id)) .select(db::schema::schoolyears::id) .first(db_conn)?) } async fn fetch_current_tenant( client: &untis::Client, db_conn: &mut PgConnection, schoolyear_id: i32, ) -> Result<()> { let tenant = client.current_tenant().await?; if diesel::select(diesel::dsl::not(diesel::dsl::exists( db::schema::tenants::table.filter(db::schema::tenants::untis_id.eq(tenant.id)), ))) .get_result::(db_conn)? { diesel::update(db::schema::tenants::table) .filter(db::schema::tenants::active) .set(db::schema::tenants::active.eq(false)) .execute(db_conn)?; diesel::insert_into(db::schema::tenants::table) .values(db::models::NewTenant { untis_id: tenant.id, schoolyear_id, name: &tenant.display_name, active: true, }) .execute(db_conn)?; } else if diesel::select(diesel::dsl::exists( db::schema::tenants::table .filter(db::schema::tenants::untis_id.eq(tenant.id)) .filter(db::schema::tenants::active.eq(false)), )) .get_result::(db_conn)? { diesel::update(db::schema::tenants::table) .filter(db::schema::tenants::active) .set(( db::schema::tenants::active.eq(false), db::schema::tenants::updated_at.eq(diesel::dsl::now), )) .execute(db_conn)?; diesel::update(db::schema::tenants::table) .filter(db::schema::tenants::untis_id.eq(tenant.id)) .set(( db::schema::tenants::active.eq(true), db::schema::tenants::updated_at.eq(diesel::dsl::now), )) .execute(db_conn)?; } Ok(()) } async fn fetch_teachers( client: &untis::Client, db_conn: &mut PgConnection, schoolyear_id: i32, ) -> Result<()> { let existing_teachers = db::schema::teachers::table .select(db::schema::teachers::untis_id) .filter(db::schema::teachers::schoolyear_id.eq(schoolyear_id)) .load::(db_conn)?; diesel::insert_into(db::schema::teachers::table) .values( &client .teachers() .await? .iter() .filter(|t| !existing_teachers.contains(&t.id)) .map(|t| db::models::NewTeacher { untis_id: t.id, schoolyear_id, name: &t.name, forename: if t.forename.is_empty() { None } else { Some(&t.forename) }, display_name: &t.display_name, }) .collect::>(), ) .execute(db_conn)?; Ok(()) } async fn fetch_classes( client: &untis::Client, db_conn: &mut PgConnection, schoolyear_id: i32, ) -> Result<()> { let existing_classes = db::schema::classes::table .select(db::schema::classes::untis_id) .filter(db::schema::classes::schoolyear_id.eq(schoolyear_id)) .load::(db_conn)?; diesel::insert_into(db::schema::classes::table) .values( &client .classes() .await? .iter() .filter(|c| !existing_classes.contains(&c.id)) .map(|c| db::models::NewClass { untis_id: c.id, schoolyear_id, name: &c.name, long_name: &c.long_name, active: c.active, }) .collect::>(), ) .execute(db_conn)?; Ok(()) } async fn fetch_subjects( client: &untis::Client, db_conn: &mut PgConnection, schoolyear_id: i32, ) -> Result<()> { let existing_classes = db::schema::subjects::table .select(db::schema::subjects::untis_id) .filter(db::schema::subjects::schoolyear_id.eq(schoolyear_id)) .load::(db_conn)?; diesel::insert_into(db::schema::subjects::table) .values( &client .subjects() .await? .iter() .filter(|c| !existing_classes.contains(&c.id)) .map(|c| db::models::NewSubject { untis_id: c.id, schoolyear_id, name: &c.name, long_name: &c.long_name, }) .collect::>(), ) .execute(db_conn)?; Ok(()) } async fn fetch_rooms( client: &untis::Client, db_conn: &mut PgConnection, schoolyear_id: i32, ) -> Result<()> { let existing_classes = db::schema::rooms::table .select(db::schema::rooms::untis_id) .filter(db::schema::rooms::schoolyear_id.eq(schoolyear_id)) .load::(db_conn)?; diesel::insert_into(db::schema::rooms::table) .values( &client .rooms() .await? .iter() .filter(|c| !existing_classes.contains(&c.id)) .map(|c| db::models::NewRoom { untis_id: c.id, schoolyear_id, name: &c.name, long_name: &c.long_name, }) .collect::>(), ) .execute(db_conn)?; Ok(()) } async fn fetch_departments( client: &untis::Client, db_conn: &mut PgConnection, schoolyear_id: i32, ) -> Result<()> { let existing_classes = db::schema::departments::table .select(db::schema::departments::untis_id) .filter(db::schema::departments::schoolyear_id.eq(schoolyear_id)) .load::(db_conn)?; diesel::insert_into(db::schema::departments::table) .values( &client .departments() .await? .iter() .filter(|c| !existing_classes.contains(&c.id)) .map(|c| db::models::NewDepartment { untis_id: c.id, schoolyear_id, name: &c.name, long_name: &c.long_name, }) .collect::>(), ) .execute(db_conn)?; Ok(()) } async fn fetch_holidays( client: &untis::Client, db_conn: &mut PgConnection, schoolyear_id: i32, ) -> Result<()> { let existing_classes = db::schema::holidays::table .select(db::schema::holidays::untis_id) .filter(db::schema::holidays::schoolyear_id.eq(schoolyear_id)) .load::(db_conn)?; diesel::insert_into(db::schema::holidays::table) .values( &client .holidays() .await? .iter() .filter(|c| !existing_classes.contains(&c.id)) .map(|c| db::models::NewHoliday { untis_id: c.id, schoolyear_id, name: &c.name, long_name: &c.long_name, start_date: c.start_date, end_date: c.end_date, }) .collect::>(), ) .execute(db_conn)?; Ok(()) } async fn fetch_substitutions( client: &untis::Client, db_conn: &mut PgConnection, redis_conn: &mut cache::ConnectionPool, schoolyear_id: i32, ) -> Result<()> { lazy_static! { static ref TITLE_SELECTOR: scraper::Selector = scraper::Selector::parse(".mon_title").unwrap(); static ref DATE_REGEX: Regex = Regex::new(r"([^\s]+)").unwrap(); static ref WEEK_TYPE_REGEX: Regex = Regex::new(r"\bWoche\s+\K\S+").unwrap(); } let (date, week_type) = { let html = reqwest::Client::new() .get(&config::CONFIG.untis_vplan_url) .header( reqwest::header::USER_AGENT, &config::CONFIG.untis_client_name, ) .header(reqwest::header::ACCEPT, "text/html") .basic_auth( &config::CONFIG.untis_vplan_username, Some(&config::CONFIG.untis_vplan_password), ) .send() .await? .text() .await?; let document = scraper::Html::parse_document(&html); let title = document .select(&TITLE_SELECTOR) .next() .context("No element in vplan html which is selectable class \"mon_title\"")? .text() .next() .context("\"mon_title\" element is empty")?; dbg!(title); dbg!(DATE_REGEX.captures(title)); dbg!(WEEK_TYPE_REGEX.captures(title)); ( chrono::NaiveDate::parse_from_str("18.1.2023", "%d.%m.%Y")?, db::models::WeekType::A, ) // ( // chrono::NaiveDate::parse_from_str( // title // .0 // .split_whitespace() // .next() // .context("Could not find date")?, // "%d.%m.%Y", // )?, // match title // .1 // .split_whitespace() // .last() // .context("Could not find week type indicator")? // { // "A" => db::models::WeekType::A, // "B" => db::models::WeekType::B, // x => bail!("Invalid week type: {:?}", x), // }, // ) }; let substitution_query_id = diesel::insert_into(db::schema::substitution_queries::table) .values(db::models::NewSubstitutionQuery { schoolyear_id, date, week_type, queried_at: Utc::now().naive_utc(), }) .returning(db::schema::substitution_queries::id) .get_result::(db_conn)?; redis::cmd("SET") .arg(cache::keys::LAST_SUBSTITUTION_QUERY_ID) .arg(substitution_query_id) .query(redis_conn.deref_mut())?; for substitution in client.substitutions(&date, &date, None).await? { let substitution_id = diesel::insert_into(db::schema::substitutions::table) .values(db::models::NewSubstitution { substitution_query_id, subst_type: substitution.subst_type.into(), lesson_id: substitution.lesson_id, start_time: substitution.start_time, end_time: substitution.end_time, text: substitution.text.as_deref(), }) .returning(db::schema::substitutions::id) .get_result::(db_conn)?; diesel::insert_into(db::schema::substitution_classes::table) .values( &substitution .classes .iter() .enumerate() .map(|(i, c)| { Ok(db::models::NewSubstitutionClass { substitution_id, position: i as i16, class_id: db::schema::classes::table .filter(db::schema::classes::untis_id.eq(c.id)) .select(db::schema::classes::id) .get_result::(db_conn)?, }) }) .collect::>>()?, ) .execute(db_conn)?; diesel::insert_into(db::schema::substitution_teachers::table) .values( &substitution .teachers .iter() .enumerate() .map(|(i, t)| { Ok(db::models::NewSubstitutionTeacher { substitution_id, position: i as i16, teacher_id: if t.id == 0 { None } else { Some( db::schema::teachers::table .filter(db::schema::teachers::untis_id.eq(t.id)) .select(db::schema::teachers::id) .get_result::(db_conn)?, ) }, }) }) .collect::>>()?, ) .execute(db_conn)?; diesel::insert_into(db::schema::substitution_subjects::table) .values( &substitution .subjects .iter() .enumerate() .map(|(i, s)| { Ok(db::models::NewSubstitutionSubject { substitution_id, position: i as i16, subject_id: db::schema::subjects::table .filter(db::schema::subjects::untis_id.eq(s.id)) .select(db::schema::subjects::id) .get_result::(db_conn)?, }) }) .collect::>>()?, ) .execute(db_conn)?; diesel::insert_into(db::schema::substitution_rooms::table) .values( &substitution .rooms .iter() .enumerate() .map(|(i, r)| { Ok(db::models::NewSubstitutionRoom { substitution_id, position: i as i16, room_id: if r.id == 0 { None } else { Some( db::schema::rooms::table .filter(db::schema::rooms::untis_id.eq(r.id)) .select(db::schema::rooms::id) .get_result::(db_conn)?, ) }, original_room_id: if let Some(original_id) = r.original_id { Some( db::schema::rooms::table .filter(db::schema::rooms::untis_id.eq(original_id)) .select(db::schema::rooms::id) .get_result::(db_conn)?, ) } else { None }, }) }) .collect::>>()?, ) .execute(db_conn)?; } Ok(()) } #[celery::task] pub async fn update_info() -> TaskResult<()> { let dur = Duration::from_secs(2); thread::sleep(dur); let mut client = match config::untis_from_env() { Ok(x) => x, Err(e) => return Err(TaskError::UnexpectedError(e.to_string())), }; if let Err(e) = client.login().await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); let db_conn = &mut match db::POOL.get() { Ok(x) => x, Err(e) => return Err(TaskError::UnexpectedError(e.to_string())), }; let redis_conn = &mut match cache::POOL.get() { Ok(x) => x, Err(e) => return Err(TaskError::UnexpectedError(e.to_string())), }; let schoolyear_id = match fetch_schoolyears(&client, db_conn).await { Ok(x) => x, Err(e) => return Err(TaskError::UnexpectedError(e.to_string())), }; thread::sleep(dur); if let Err(e) = fetch_current_tenant(&client, db_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = fetch_teachers(&client, db_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = fetch_classes(&client, db_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = fetch_subjects(&client, db_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = fetch_rooms(&client, db_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = fetch_departments(&client, db_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = fetch_holidays(&client, db_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = fetch_substitutions(&client, db_conn, redis_conn, schoolyear_id).await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); if let Err(e) = client.logout().await { return Err(TaskError::UnexpectedError(e.to_string())); } thread::sleep(dur); Ok(()) }