mod branch; mod config; pub mod status; pub mod webhook; use actix::prelude::*; use kxio::network::Network; use tracing::{debug, info, warn, Instrument}; use crate::server::{ actors::repo::webhook::WebhookAuth, config::{RepoConfig, RepoDetails, Webhook}, gitforge, types::MessageToken, }; use self::webhook::WebhookId; pub struct RepoActor { message_token: MessageToken, span: tracing::Span, details: RepoDetails, webhook: Webhook, webhook_id: Option, // INFO: if [None] then no webhook is configured webhook_auth: Option, // INFO: if [None] then no webhook is configured last_main_commit: Option, last_next_commit: Option, last_dev_commit: Option, net: Network, forge: gitforge::Forge, } impl RepoActor { pub(crate) fn new(details: RepoDetails, webhook: Webhook, net: Network) -> Self { let span = tracing::info_span!("RepoActor", repo = %details); let forge = match details.forge.forge_type { #[cfg(feature = "forgejo")] crate::server::config::ForgeType::ForgeJo => { gitforge::Forge::new_forgejo(details.clone(), net.clone()) } #[cfg(test)] crate::server::config::ForgeType::MockForge => gitforge::Forge::new_mock(), }; debug!(?forge, "new"); Self { message_token: MessageToken::new(), span, details, webhook, webhook_id: None, webhook_auth: None, last_main_commit: None, last_next_commit: None, last_dev_commit: None, net, forge, } } } impl Actor for RepoActor { type Context = Context; fn stopping(&mut self, ctx: &mut Self::Context) -> Running { let _gaurd = self.span.enter(); info!("Checking webhook"); match self.webhook_id.take() { Some(webhook_id) => { let repo_details = self.details.clone(); let net = self.net.clone(); webhook::unregister(webhook_id, repo_details, net) .in_current_span() .into_actor(self) .wait(ctx); Running::Continue } None => Running::Stop, } } } impl std::fmt::Display for RepoActor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "{}/{}", self.details.forge.forge_name, self.details.repo_alias ) } } #[derive(Message)] #[rtype(result = "()")] pub struct CloneRepo; impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self, gitdir = %self.details.gitdir))] fn handle(&mut self, _msg: CloneRepo, ctx: &mut Self::Context) -> Self::Result { info!("Message Received"); let gitdir = self.details.gitdir.clone(); match self.forge.repo_clone(gitdir) { Ok(_) => ctx.address().do_send(LoadConfigFromRepo), Err(err) => warn!("Could not Clone repo: {err}"), } } } #[derive(Message)] #[rtype(result = "()")] pub struct LoadConfigFromRepo; impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self))] fn handle(&mut self, _msg: LoadConfigFromRepo, ctx: &mut Self::Context) -> Self::Result { info!("Message Received"); let details = self.details.clone(); let addr = ctx.address(); let forge = self.forge.clone(); config::load(details, addr, forge) .in_current_span() .into_actor(self) .wait(ctx); } } #[derive(Message)] #[rtype(result = "()")] struct LoadedConfig(pub RepoConfig); impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.details, branches = %msg.0))] fn handle(&mut self, msg: LoadedConfig, ctx: &mut Self::Context) -> Self::Result { info!("Message Received"); let repo_config = msg.0; self.details.repo_config.replace(repo_config); if self.webhook_id.is_none() { webhook::register( self.details.clone(), self.webhook.clone(), ctx.address(), self.net.clone(), ) .in_current_span() .into_actor(self) .wait(ctx); } ctx.address().do_send(ValidateRepo { message_token: self.message_token, }); } } #[derive(Message)] #[rtype(result = "()")] pub struct ValidateRepo { message_token: MessageToken, } impl ValidateRepo { pub const fn new(message_token: MessageToken) -> Self { Self { message_token } } } impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.details, token = %msg.message_token))] fn handle(&mut self, msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result { match msg.message_token { message_token if self.message_token < message_token => { info!(%message_token, "New message token"); self.message_token = msg.message_token; } message_token if self.message_token > message_token => { info!("Dropping message from previous generation"); return; // message is expired } _ => { // do nothing } } info!("Message Received"); if let Some(repo_config) = self.details.repo_config.clone() { let forge = self.forge.clone(); let addr = ctx.address(); let message_token = self.message_token; async move { forge .branches_validate_positions(repo_config, addr, message_token) .await } .in_current_span() .into_actor(self) .wait(ctx); } } } #[derive(Debug, Message)] #[rtype(result = "()")] pub struct StartMonitoring { pub main: gitforge::Commit, pub next: gitforge::Commit, pub dev: gitforge::Commit, pub dev_commit_history: Vec, } impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all, fields(token = %self.message_token, repo = %self.details, main = %msg.main, next= %msg.next, dev = %msg.dev))] fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result { info!("Message Received"); let Some(repo_config) = self.details.repo_config.clone() else { warn!("No config loaded"); return; }; let next_ahead_of_main = msg.main != msg.next; let dev_ahead_of_next = msg.next != msg.dev; info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring"); let addr = ctx.address(); let forge = self.forge.clone(); if next_ahead_of_main { status::check_next(msg.next, addr, forge, self.message_token) .in_current_span() .into_actor(self) .wait(ctx); } else if dev_ahead_of_next { branch::advance_next( msg.next, msg.dev_commit_history, repo_config, forge, addr, self.message_token, ) .in_current_span() .into_actor(self) .wait(ctx); } } } #[derive(Message)] #[rtype(result = "()")] pub struct WebhookRegistered(pub WebhookId, pub WebhookAuth); impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(webhook_id = %msg.0))] fn handle(&mut self, msg: WebhookRegistered, _ctx: &mut Self::Context) -> Self::Result { info!("Message Received"); self.webhook_id.replace(msg.0); self.webhook_auth.replace(msg.1); } } #[derive(Message)] #[rtype(result = "()")] pub struct AdvanceMainTo(pub gitforge::Commit); impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(commit = %msg.0))] fn handle(&mut self, msg: AdvanceMainTo, ctx: &mut Self::Context) -> Self::Result { info!("Message Received"); let Some(repo_config) = self.details.repo_config.clone() else { warn!("No config loaded"); return; }; let forge = self.forge.clone(); let addr = ctx.address(); branch::advance_main(msg.0, repo_config, forge, addr, self.message_token) .in_current_span() .into_actor(self) .wait(ctx); } }