feat: unregister webhooks form forge during shutdown

Closes kemitix/git-next#46
This commit is contained in:
Paul Campbell 2024-07-15 07:39:06 +01:00
parent 6c92f64f8b
commit b715755b91
7 changed files with 58 additions and 2 deletions

View file

@ -6,6 +6,7 @@ pub mod load_config_from_repo;
pub mod receive_ci_status;
pub mod receive_repo_config;
pub mod register_webhook;
pub mod unregister_webhook;
pub mod validate_repo;
pub mod webhook_notification;
pub mod webhook_registered;

View file

@ -0,0 +1,27 @@
//
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
use actor::{messages::UnRegisterWebhook, RepoActor};
impl Handler<UnRegisterWebhook> for RepoActor {
type Result = ();
fn handle(&mut self, _msg: UnRegisterWebhook, ctx: &mut Self::Context) -> Self::Result {
if let Some(webhook_id) = self.webhook_id.take() {
let forge = self.forge.duplicate();
tracing::debug!("unregistering webhook");
async move {
match forge.unregister_webhook(&webhook_id).await {
Ok(_) => tracing::debug!("unregistered webhook"),
Err(err) => tracing::warn!(?err, "unregistering webhook"),
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
tracing::debug!("unregistering webhook done");
}
}
}

View file

@ -38,6 +38,8 @@ impl From<config::RegisteredWebhook> for WebhookRegistered {
}
}
message!(UnRegisterWebhook: "Request that the webhook be removed from the forge, so they will stop notifying us.");
newtype!(MessageToken: u32, Copy, Default, Display, PartialOrd, Ord: r#"An incremental token used to identify the current set of messages.
Primarily used by [ValidateRepo] to reduce duplicate messages. The token is incremented when a new Webhook message is

View file

@ -25,13 +25,25 @@ impl Handler<ReceiveValidServerConfig> for Server {
let webhook = server_config.webhook();
// Forge Actors
for (forge_alias, forge_config) in server_config.forges() {
self.create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook)
let repo_actors = self
.create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook)
.into_iter()
.map(|a| self.start_actor(a))
.collect::<Vec<_>>();
repo_actors
.iter()
.map(|(repo_alias, addr)| {
AddWebhookRecipient::new(forge_alias.clone(), repo_alias, addr.recipient())
AddWebhookRecipient::new(
forge_alias.clone(),
repo_alias.clone(),
addr.clone().recipient(),
)
})
.for_each(|msg| webhook_router.do_send(msg));
repo_actors.into_iter().for_each(|(repo_alias, addr)| {
self.repo_actors
.insert((forge_alias.clone(), repo_alias), addr);
});
}
let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start();
self.webhook.replace(webhook);

View file

@ -8,8 +8,18 @@ impl Handler<Shutdown> for Server {
type Result = ();
fn handle(&mut self, _msg: Shutdown, _ctx: &mut Self::Context) -> Self::Result {
self.repo_actors
.iter()
.for_each(|((forge_alias, repo_alias), addr)| {
tracing::debug!(%forge_alias, %repo_alias, "removing webhook");
addr.do_send(git_next_repo_actor::messages::UnRegisterWebhook::new());
tracing::debug!(%forge_alias, %repo_alias, "removed webhook");
});
tracing::debug!("server shutdown");
if let Some(webhook) = self.webhook.take() {
tracing::debug!("shuting down webhook");
webhook.do_send(ShutdownWebhook);
tracing::debug!("webhook shutdown");
}
}
}

View file

@ -14,6 +14,7 @@ use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use git_next_webhook_actor as webhook;
use kxio::{fs::FileSystem, network::Network};
use std::{
collections::BTreeMap,
path::PathBuf,
sync::{Arc, RwLock},
};
@ -49,6 +50,7 @@ pub struct Server {
net: Network,
repository_factory: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
// testing
message_log: Option<Arc<RwLock<Vec<String>>>>,
@ -72,6 +74,7 @@ impl Server {
net,
repository_factory: repo,
sleep_duration,
repo_actors: BTreeMap::new(),
message_log: None,
}
}

View file

@ -57,6 +57,7 @@ pub fn start(
let _ = actix_rt::signal::ctrl_c().await;
info!("Ctrl-C received, shutting down...");
server.do_send(git_next_server_actor::messages::Shutdown);
actix_rt::time::sleep(std::time::Duration::from_millis(200)).await;
System::current().stop();
};
let system = System::new();