This commit is contained in:
Dominic Grimm 2023-05-21 18:58:10 +02:00
commit e82f35da2a
No known key found for this signature in database
GPG key ID: B6FFE500AAD54A3A
78 changed files with 10821 additions and 0 deletions

View file

@ -0,0 +1,53 @@
use anyhow::Result;
use celery::{error::TaskError, task::TaskResult};
use diesel::prelude::*;
use std::fs;
use uuidv7::Uuid;
use crate::{db, CONFIG};
fn do_task(db_conn: &mut db::Connection, id: Uuid) -> Result<()> {
let (user_id, name) = db::schema::repositories::table
.select((
db::schema::repositories::user_id,
db::schema::repositories::name,
))
.filter(db::schema::repositories::id.eq(id))
.first::<(Uuid, String)>(db_conn)?;
let user_name = db::schema::users::table
.select(db::schema::users::name)
.filter(db::schema::users::id.eq(user_id))
.first::<String>(db_conn)?;
diesel::delete(db::schema::repositories::table.filter(db::schema::repositories::id.eq(id)))
.execute(db_conn)?;
if db::schema::repositories::table
.filter(db::schema::repositories::user_id.eq(user_id))
.count()
.get_result::<i64>(db_conn)?
== 0
{
diesel::delete(db::schema::users::table.filter(db::schema::users::id.eq(user_id)))
.execute(db_conn)?;
fs::remove_dir_all(format!("{}/{}", CONFIG.repos_dir, user_name))?;
} else {
fs::remove_dir_all(format!("{}/{}/{}", CONFIG.repos_dir, user_name, name))?;
}
Ok(())
}
#[celery::task]
pub fn delete_repo(id: Uuid) -> TaskResult<()> {
let db_conn = &mut match db::POOL.get() {
Ok(x) => x,
Err(e) => return Err(TaskError::UnexpectedError(format!("{:?}", e))),
};
if let Err(e) = do_task(db_conn, id) {
return Err(TaskError::UnexpectedError(format!("{:?}", e)));
}
Ok(())
}

View file

@ -0,0 +1,100 @@
use anyhow::{bail, Context, Result};
use celery::{error::TaskError, task::TaskResult};
use diesel::prelude::*;
use std::{fs, path::Path};
use url::Url;
use uuidv7::Uuid;
use crate::{db, CONFIG};
fn get_repo_name(db_conn: &mut db::Connection, id: Uuid) -> Result<(String, String)> {
let (user_id, name) = db::schema::repositories::table
.select((
db::schema::repositories::user_id,
db::schema::repositories::name,
))
.filter(db::schema::repositories::id.eq(id))
.first::<(Uuid, String)>(db_conn)?;
let user_name = db::schema::users::table
.select(db::schema::users::name)
.filter(db::schema::users::id.eq(user_id))
.first::<String>(db_conn)?;
Ok((user_name, name))
}
fn repo_dir(user: &str, repo: &str) -> (String, String, String) {
let parent = format!("{}/{}", CONFIG.repos_dir, user);
let dir = format!("{}/{}", parent, repo);
(parent, dir, format!("{}/{}.git", user, repo))
}
fn do_task(parent_dir: &str, repo_dir: &str, full_name_path: &str) -> Result<()> {
let path = Path::new(repo_dir);
if path.exists() && path.is_dir() {
let repo = git2::Repository::open(repo_dir)?;
repo.find_remote("origin")?
.fetch(&[&CONFIG.gitea_pull_branch], None, None)?;
let fetch_head = repo.find_reference("FETCH_HEAD")?;
let fetch_commit = repo.reference_to_annotated_commit(&fetch_head)?;
let analysis = repo.merge_analysis(&[&fetch_commit])?;
if !analysis.0.is_up_to_date() {
if analysis.0.is_fast_forward() {
let refname = format!("refs/heads/{}", CONFIG.gitea_pull_branch);
let mut reference = repo.find_reference(&refname)?;
reference.set_target(fetch_commit.id(), "Fast-Forward")?;
repo.set_head(&refname)?;
repo.checkout_head(Some(git2::build::CheckoutBuilder::default().force()))?;
} else {
bail!("Fast-forward only!");
}
}
} else {
fs::create_dir_all(parent_dir)?;
let repo = git2::Repository::clone(
Url::parse(CONFIG.gitea_pull_url.as_str())?
.join(full_name_path)?
.as_str(),
repo_dir,
)?;
let (object, reference) =
repo.revparse_ext(&format!("remotes/origin/{}", CONFIG.gitea_pull_branch))?;
repo.checkout_tree(&object, None)?;
match reference {
Some(gref) => repo.set_head(gref.name().context("Could not get ref name")?),
None => repo.set_head_detached(object.id()),
}
.context("Failed to set HEAD")?;
};
Ok(())
}
#[celery::task]
pub async fn get_repo(id: Uuid) -> TaskResult<()> {
let db_conn = &mut match db::POOL.get() {
Ok(x) => x,
Err(e) => return Err(TaskError::UnexpectedError(format!("{:?}", e))),
};
let (user_name, repo_name) = match get_repo_name(db_conn, id) {
Ok(x) => x,
Err(e) => return Err(TaskError::UnexpectedError(format!("{:?}", e))),
};
let (parent_dir, repo_dir, full_name_path) = repo_dir(&user_name, &repo_name);
if let Err(e) = do_task(&parent_dir, &repo_dir, &full_name_path) {
if let Err(err) = fs::remove_dir_all(repo_dir) {
return Err(TaskError::UnexpectedError(format!("{:?}", err)));
}
return Err(TaskError::UnexpectedError(format!("{:?}", e)));
}
Ok(())
}

89
backend/src/worker/mod.rs Normal file
View file

@ -0,0 +1,89 @@
use anyhow::Result;
use async_once::AsyncOnce;
use async_trait::async_trait;
use celery::beat::{Beat, DeltaSchedule, LocalSchedulerBackend};
use celery::prelude::*;
use celery::Celery;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::time::Duration;
use stdext::duration::DurationExt;
pub mod delete_repo;
pub mod get_repo;
pub mod update_repos;
use crate::CONFIG;
pub const QUEUE_NAME: &str = "gitea_pages";
pub async fn app() -> Result<Arc<Celery>, CeleryError> {
celery::app!(
broker = AMQPBroker { &CONFIG.amqp_url },
tasks = [
get_repo::get_repo,
delete_repo::delete_repo,
update_repos::update_repos,
],
task_routes = [
"*" => QUEUE_NAME,
],
prefetch_count = 2,
heartbeat = Some(10)
)
.await
}
pub async fn beat() -> Result<Beat<LocalSchedulerBackend>, BeatError> {
celery::beat!(
broker = AMQPBroker { &CONFIG.amqp_url },
tasks = [
// "cleanup_tokens" => {
// cleanup_tokens::cleanup_tokens,
// schedule = DeltaSchedule::new(Duration::from_hours(1)),
// args = (),
// }
"update_repos" => {
update_repos::update_repos,
schedule = DeltaSchedule::new(Duration::from_days(1)),
args = (),
},
],
task_routes = [
"*" => QUEUE_NAME,
]
)
.await
}
pub type Connection = Arc<Celery>;
pub struct ConnectionManager;
#[async_trait]
impl bb8::ManageConnection for ConnectionManager {
type Connection = Connection;
type Error = CeleryError;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
app().await
}
async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
Ok(())
}
fn has_broken(&self, _: &mut Self::Connection) -> bool {
false
}
}
pub type Pool = bb8::Pool<ConnectionManager>;
pub async fn pool() -> Result<Pool> {
Ok(bb8::Pool::builder().build(ConnectionManager).await?)
}
lazy_static! {
pub static ref POOL: AsyncOnce<Pool> = AsyncOnce::new(async { pool().await.unwrap() });
}

View file

@ -0,0 +1,31 @@
use anyhow::Result;
use celery::prelude::*;
use diesel::prelude::*;
use uuidv7::Uuid;
use crate::{db, worker};
async fn do_task() -> Result<()> {
let db_conn = &mut db::POOL.get()?;
let repo_ids = db::schema::repositories::table
.select(db::schema::repositories::id)
.load::<Uuid>(db_conn)?;
let worker_conn = worker::POOL.get().await.get().await?;
for id in repo_ids {
worker_conn
.send_task(worker::get_repo::get_repo::new(id))
.await?;
}
Ok(())
}
#[celery::task]
pub async fn update_repos() -> TaskResult<()> {
if let Err(e) = do_task().await {
return Err(TaskError::UnexpectedError(format!("{:?}", e)));
}
Ok(())
}