mod branch; mod load; pub mod status; pub mod webhook; #[cfg(test)] mod tests; use std::time::Duration; use actix::prelude::*; use config::RegisteredWebhook; use git::validation::positions::{validate_positions, Positions}; use crate as repo_actor; use git_next_config as config; use git_next_forge as forge; use git_next_git as git; use kxio::network::Network; use tracing::{debug, info, warn, Instrument}; #[derive(Debug, derive_more::Display)] #[display("{}:{}:{}", generation, repo_details.forge.forge_alias(), repo_details.repo_alias)] pub struct RepoActor { generation: git::Generation, message_token: MessageToken, repo_details: git::RepoDetails, webhook: config::server::Webhook, webhook_id: Option, // INFO: if [None] then no webhook is configured webhook_auth: Option, // INFO: if [None] then no webhook is configured last_main_commit: Option, last_next_commit: Option, last_dev_commit: Option, repository: git::Repository, open_repository: Option, net: Network, forge: forge::Forge, } impl RepoActor { pub fn new( details: git::RepoDetails, webhook: config::server::Webhook, generation: git::Generation, net: Network, repo: git::Repository, ) -> Self { let forge = match details.forge.forge_type() { #[cfg(feature = "forgejo")] config::ForgeType::ForgeJo => forge::Forge::new_forgejo(details.clone(), net.clone()), config::ForgeType::GitHub => forge::Forge::new_github(details.clone(), net.clone()), config::ForgeType::MockForge => forge::Forge::new_mock(), }; debug!(?forge, "new"); Self { generation, message_token: MessageToken::new(), repo_details: details, webhook, webhook_id: None, webhook_auth: None, last_main_commit: None, last_next_commit: None, last_dev_commit: None, repository: repo, open_repository: None, net, forge, } } } impl Actor for RepoActor { type Context = Context; #[tracing::instrument(name = "RepoActor::stopping", skip_all, fields(repo = %self.repo_details))] fn stopping(&mut self, ctx: &mut Self::Context) -> Running { info!("Checking webhook"); match self.webhook_id.take() { Some(webhook_id) => { info!(%webhook_id, "Unregistring webhook"); let forge = self.forge.clone(); async move { if let Err(err) = forge.unregister_webhook(&webhook_id).await { warn!("unregistering webhook: {err}"); } } .in_current_span() .into_actor(self) .wait(ctx); Running::Continue } None => Running::Stop, } } } #[derive(Message)] #[rtype(result = "()")] pub struct CloneRepo; impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self.repo_details, gitdir = %self.repo_details.gitdir))] fn handle(&mut self, _msg: CloneRepo, ctx: &mut Self::Context) -> Self::Result { let gitdir = self.repo_details.gitdir.clone(); match git::repository::open(&self.repository, &self.repo_details, gitdir) { Ok(repository) => { self.open_repository.replace(repository); if self.repo_details.repo_config.is_none() { ctx.address().do_send(LoadConfigFromRepo); } else { ctx.address().do_send(ValidateRepo { message_token: self.message_token, }); } } Err(err) => warn!("Could not open repo: {err}"), } } } #[derive(Message)] #[rtype(result = "()")] pub struct LoadConfigFromRepo; impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self.repo_details))] fn handle(&mut self, _msg: LoadConfigFromRepo, ctx: &mut Self::Context) -> Self::Result { let details = self.repo_details.clone(); let addr = ctx.address(); let Some(open_repository) = self.open_repository.clone() else { warn!("missing open repository - can't load configuration"); return; }; repo_actor::load::load_file(details, addr, open_repository) .in_current_span() .into_actor(self) .wait(ctx); } } #[derive(Message)] #[rtype(result = "()")] struct LoadedConfig(git_next_config::RepoConfig); impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.repo_details, branches = %msg.0))] fn handle(&mut self, msg: LoadedConfig, ctx: &mut Self::Context) -> Self::Result { let repo_config = msg.0; self.repo_details.repo_config.replace(repo_config); ctx.address().do_send(ValidateRepo { message_token: self.message_token, }); } } #[derive(derive_more::Constructor, Message)] #[rtype(result = "()")] pub struct ValidateRepo { message_token: MessageToken, } impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.message_token))] fn handle(&mut self, msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result { match msg.message_token { message_token if self.message_token < message_token => { info!(%message_token, "New message token"); self.message_token = msg.message_token; } message_token if self.message_token > message_token => { info!("Dropping message from previous generation"); return; // message is expired } _ => { // do nothing } } if self.webhook_id.is_none() { let forge_alias = self.repo_details.forge.forge_alias(); let repo_alias = &self.repo_details.repo_alias; let webhook_url = self.webhook.url(forge_alias, repo_alias); let forge = self.forge.clone(); let addr = ctx.address(); async move { if let Err(err) = forge .register_webhook(&webhook_url) .await .and_then(|registered_webhook| { addr.try_send(WebhookRegistered::from(registered_webhook)) .map_err(|e| { git::forge::webhook::Error::FailedToNotifySelf(e.to_string()) }) }) { warn!("registering webhook: {err}"); } } .in_current_span() .into_actor(self) .wait(ctx); } if let (Some(repository), Some(repo_config)) = ( self.open_repository.clone(), self.repo_details.repo_config.clone(), ) { let repo_details = self.repo_details.clone(); let addr = ctx.address(); let message_token = self.message_token; async move { match validate_positions(&repository, &repo_details, repo_config) { Ok(Positions { main, next, dev, dev_commit_history, }) => { addr.do_send(StartMonitoring::new(main, next, dev, dev_commit_history)); } Err(err) => { warn!("{:?}", err); tokio::time::sleep(Duration::from_secs(10)).await; addr.do_send(ValidateRepo::new(message_token)); } } } .in_current_span() .into_actor(self) .wait(ctx); } } } #[derive(Debug, derive_more::Constructor, Message)] #[rtype(result = "()")] pub struct StartMonitoring { main: git::Commit, next: git::Commit, dev: git::Commit, dev_commit_history: Vec, } impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all, fields(token = %self.message_token, repo = %self.repo_details, main = %msg.main, next= %msg.next, dev = %msg.dev)) ] fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result { let Some(repo_config) = self.repo_details.repo_config.clone() else { warn!("No config loaded"); return; }; let next_ahead_of_main = msg.main != msg.next; let dev_ahead_of_next = msg.next != msg.dev; info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring"); let addr = ctx.address(); let forge = self.forge.clone(); if next_ahead_of_main { status::check_next(msg.next, addr, forge, self.message_token) .in_current_span() .into_actor(self) .wait(ctx); } else if dev_ahead_of_next { if let Some(repository) = self.open_repository.clone() { branch::advance_next( msg.next, msg.dev_commit_history, self.repo_details.clone(), repo_config, repository, addr, self.message_token, ) .in_current_span() .into_actor(self) .wait(ctx); } } } } #[derive(Message)] #[rtype(result = "()")] pub struct WebhookRegistered(config::WebhookId, config::WebhookAuth); impl From for WebhookRegistered { fn from(value: RegisteredWebhook) -> Self { Self(value.id().clone(), value.auth().clone()) } } impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(repo = %self.repo_details, webhook_id = %msg.0))] fn handle(&mut self, msg: WebhookRegistered, _ctx: &mut Self::Context) -> Self::Result { self.webhook_id.replace(msg.0); self.webhook_auth.replace(msg.1); } } #[derive(Message)] #[rtype(result = "()")] pub struct AdvanceMainTo(git::Commit); impl Handler for RepoActor { type Result = (); #[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(repo = %self.repo_details, commit = %msg.0))] fn handle(&mut self, msg: AdvanceMainTo, ctx: &mut Self::Context) -> Self::Result { let Some(repo_config) = self.repo_details.repo_config.clone() else { warn!("No config loaded"); return; }; let Some(repository) = self.open_repository.clone() else { warn!("No repository opened"); return; }; let repo_details = self.repo_details.clone(); let addr = ctx.address(); let message_token = self.message_token; async move { branch::advance_main(msg.0, &repo_details, &repo_config, &repository).await; match repo_config.source() { git_next_config::RepoConfigSource::Repo => addr.do_send(LoadConfigFromRepo), git_next_config::RepoConfigSource::Server => { addr.do_send(ValidateRepo { message_token }) } } } .in_current_span() .into_actor(self) .wait(ctx); } } #[derive(Copy, Clone, Default, Debug, PartialEq, Eq, PartialOrd, Ord, derive_more::Display)] pub struct MessageToken(u32); impl MessageToken { pub fn new() -> Self { Self::default() } pub const fn next(&self) -> Self { Self(self.0 + 1) } }