diff --git a/crates/repo-actor/src/handlers/mod.rs b/crates/repo-actor/src/handlers/mod.rs index 42cf63a..18c9541 100644 --- a/crates/repo-actor/src/handlers/mod.rs +++ b/crates/repo-actor/src/handlers/mod.rs @@ -6,6 +6,7 @@ pub mod load_config_from_repo; pub mod receive_ci_status; pub mod receive_repo_config; pub mod register_webhook; +pub mod unregister_webhook; pub mod validate_repo; pub mod webhook_notification; pub mod webhook_registered; diff --git a/crates/repo-actor/src/handlers/unregister_webhook.rs b/crates/repo-actor/src/handlers/unregister_webhook.rs new file mode 100644 index 0000000..2653f0f --- /dev/null +++ b/crates/repo-actor/src/handlers/unregister_webhook.rs @@ -0,0 +1,27 @@ +// +use actix::prelude::*; +use tracing::Instrument as _; + +use crate as actor; +use actor::{messages::UnRegisterWebhook, RepoActor}; + +impl Handler for RepoActor { + type Result = (); + + fn handle(&mut self, _msg: UnRegisterWebhook, ctx: &mut Self::Context) -> Self::Result { + if let Some(webhook_id) = self.webhook_id.take() { + let forge = self.forge.duplicate(); + tracing::debug!("unregistering webhook"); + async move { + match forge.unregister_webhook(&webhook_id).await { + Ok(_) => tracing::debug!("unregistered webhook"), + Err(err) => tracing::warn!(?err, "unregistering webhook"), + } + } + .in_current_span() + .into_actor(self) + .wait(ctx); + tracing::debug!("unregistering webhook done"); + } + } +} diff --git a/crates/repo-actor/src/messages.rs b/crates/repo-actor/src/messages.rs index 139c335..06e79c5 100644 --- a/crates/repo-actor/src/messages.rs +++ b/crates/repo-actor/src/messages.rs @@ -38,6 +38,8 @@ impl From for WebhookRegistered { } } +message!(UnRegisterWebhook: "Request that the webhook be removed from the forge, so they will stop notifying us."); + newtype!(MessageToken: u32, Copy, Default, Display, PartialOrd, Ord: r#"An incremental token used to identify the current set of messages. Primarily used by [ValidateRepo] to reduce duplicate messages. The token is incremented when a new Webhook message is diff --git a/crates/server-actor/src/handlers/receive_valid_server_config.rs b/crates/server-actor/src/handlers/receive_valid_server_config.rs index 5dba1f4..0fca499 100644 --- a/crates/server-actor/src/handlers/receive_valid_server_config.rs +++ b/crates/server-actor/src/handlers/receive_valid_server_config.rs @@ -25,13 +25,25 @@ impl Handler for Server { let webhook = server_config.webhook(); // Forge Actors for (forge_alias, forge_config) in server_config.forges() { - self.create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook) + let repo_actors = self + .create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook) .into_iter() .map(|a| self.start_actor(a)) + .collect::>(); + repo_actors + .iter() .map(|(repo_alias, addr)| { - AddWebhookRecipient::new(forge_alias.clone(), repo_alias, addr.recipient()) + AddWebhookRecipient::new( + forge_alias.clone(), + repo_alias.clone(), + addr.clone().recipient(), + ) }) .for_each(|msg| webhook_router.do_send(msg)); + repo_actors.into_iter().for_each(|(repo_alias, addr)| { + self.repo_actors + .insert((forge_alias.clone(), repo_alias), addr); + }); } let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start(); self.webhook.replace(webhook); diff --git a/crates/server-actor/src/handlers/shutdown.rs b/crates/server-actor/src/handlers/shutdown.rs index 24bb468..652a4fd 100644 --- a/crates/server-actor/src/handlers/shutdown.rs +++ b/crates/server-actor/src/handlers/shutdown.rs @@ -8,8 +8,18 @@ impl Handler for Server { type Result = (); fn handle(&mut self, _msg: Shutdown, _ctx: &mut Self::Context) -> Self::Result { + self.repo_actors + .iter() + .for_each(|((forge_alias, repo_alias), addr)| { + tracing::debug!(%forge_alias, %repo_alias, "removing webhook"); + addr.do_send(git_next_repo_actor::messages::UnRegisterWebhook::new()); + tracing::debug!(%forge_alias, %repo_alias, "removed webhook"); + }); + tracing::debug!("server shutdown"); if let Some(webhook) = self.webhook.take() { + tracing::debug!("shuting down webhook"); webhook.do_send(ShutdownWebhook); + tracing::debug!("webhook shutdown"); } } } diff --git a/crates/server-actor/src/lib.rs b/crates/server-actor/src/lib.rs index adea345..17257f4 100644 --- a/crates/server-actor/src/lib.rs +++ b/crates/server-actor/src/lib.rs @@ -14,6 +14,7 @@ use git_next_repo_actor::{messages::CloneRepo, RepoActor}; use git_next_webhook_actor as webhook; use kxio::{fs::FileSystem, network::Network}; use std::{ + collections::BTreeMap, path::PathBuf, sync::{Arc, RwLock}, }; @@ -49,6 +50,7 @@ pub struct Server { net: Network, repository_factory: Box, sleep_duration: std::time::Duration, + repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr>, // testing message_log: Option>>>, @@ -72,6 +74,7 @@ impl Server { net, repository_factory: repo, sleep_duration, + repo_actors: BTreeMap::new(), message_log: None, } } diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 67fae80..2506ea4 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -57,6 +57,7 @@ pub fn start( let _ = actix_rt::signal::ctrl_c().await; info!("Ctrl-C received, shutting down..."); server.do_send(git_next_server_actor::messages::Shutdown); + actix_rt::time::sleep(std::time::Duration::from_millis(200)).await; System::current().stop(); }; let system = System::new();