From 6de8e4f988edc70e7a19b8ea6617c7916b26cecc Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Sat, 3 Aug 2024 22:31:17 +0100 Subject: [PATCH] feat: prevent duplicate alerts Closes kemitix/git-next#128 --- crates/cli/src/alerts/desktop.rs | 7 +- crates/cli/src/alerts/email.rs | 13 +-- crates/cli/src/alerts/handlers/notify_user.rs | 30 +++---- crates/cli/src/alerts/history.rs | 52 +++++++++++- crates/cli/src/alerts/messages.rs | 80 ------------------ crates/cli/src/alerts/mod.rs | 17 ++-- crates/cli/src/alerts/tests/history.rs | 66 +++++++++++++++ crates/cli/src/alerts/tests/mod.rs | 1 + crates/cli/src/alerts/webhook.rs | 12 ++- crates/cli/src/repo/mod.rs | 2 +- crates/cli/src/server/actor/tests/given.rs | 4 +- crates/cli/src/server/mod.rs | 6 +- crates/core/src/config/branch_name.rs | 3 +- crates/core/src/config/repo_alias.rs | 2 +- crates/core/src/git/user_notification.rs | 81 ++++++++++++++++++- 15 files changed, 249 insertions(+), 127 deletions(-) create mode 100644 crates/cli/src/alerts/tests/history.rs create mode 100644 crates/cli/src/alerts/tests/mod.rs diff --git a/crates/cli/src/alerts/desktop.rs b/crates/cli/src/alerts/desktop.rs index 78d9c1cc..8e79f5ff 100644 --- a/crates/cli/src/alerts/desktop.rs +++ b/crates/cli/src/alerts/desktop.rs @@ -1,8 +1,9 @@ // -use crate::alerts::{messages::NotifyUser, short_message}; +use crate::alerts::short_message; +use git_next_core::git::UserNotification; -pub(super) fn send_desktop_notification(msg: &NotifyUser) { - let message = short_message(msg); +pub(super) fn send_desktop_notification(user_notification: &UserNotification) { + let message = short_message(user_notification); if let Err(err) = notifica::notify("git-next", &message) { tracing::warn!(?err, "failed to send desktop notification"); } diff --git a/crates/cli/src/alerts/email.rs b/crates/cli/src/alerts/email.rs index 5f1efbd6..2dfadbfc 100644 --- a/crates/cli/src/alerts/email.rs +++ b/crates/cli/src/alerts/email.rs @@ -1,7 +1,10 @@ // -use git_next_core::server::{EmailConfig, SmtpConfig}; +use git_next_core::{ + git::UserNotification, + server::{EmailConfig, SmtpConfig}, +}; -use crate::alerts::{full_message, messages::NotifyUser, short_message}; +use crate::alerts::{full_message, short_message}; #[derive(Debug)] struct EmailMessage { @@ -11,12 +14,12 @@ struct EmailMessage { body: String, } -pub(super) fn send_email(msg: &NotifyUser, email_config: &EmailConfig) { +pub(super) fn send_email(user_notification: &UserNotification, email_config: &EmailConfig) { let email_message = EmailMessage { from: email_config.from().to_string(), to: email_config.to().to_string(), - subject: short_message(msg), - body: full_message(msg), + subject: short_message(user_notification), + body: full_message(user_notification), }; match email_config.smtp() { Some(smtp) => send_email_smtp(email_message, smtp), diff --git a/crates/cli/src/alerts/handlers/notify_user.rs b/crates/cli/src/alerts/handlers/notify_user.rs index 485dda88..6b5e9ff5 100644 --- a/crates/cli/src/alerts/handlers/notify_user.rs +++ b/crates/cli/src/alerts/handlers/notify_user.rs @@ -1,6 +1,7 @@ // use actix::prelude::*; -use tracing::{info, instrument, Instrument as _}; + +use tracing::{info, Instrument as _}; use crate::alerts::{ desktop::send_desktop_notification, email::send_email, messages::NotifyUser, @@ -10,7 +11,6 @@ use crate::alerts::{ impl Handler for AlertsActor { type Result = (); - #[instrument] fn handle(&mut self, msg: NotifyUser, ctx: &mut Self::Context) -> Self::Result { let Some(shout) = &self.shout else { info!("No shout config available"); @@ -18,19 +18,21 @@ impl Handler for AlertsActor { }; let net = self.net.clone(); let shout = shout.clone(); - async move { - if let Some(webhook_config) = shout.webhook() { - send_webhook(&msg, webhook_config, &net).await; - } - if let Some(email_config) = shout.email() { - send_email(&msg, email_config); - } - if shout.desktop() { - send_desktop_notification(&msg); + if let Some(user_notification) = self.history.sendable(msg.unwrap()) { + async move { + if let Some(webhook_config) = shout.webhook() { + send_webhook(&user_notification, webhook_config, &net).await; + } + if let Some(email_config) = shout.email() { + send_email(&user_notification, email_config); + } + if shout.desktop() { + send_desktop_notification(&user_notification); + } } + .in_current_span() + .into_actor(self) + .wait(ctx); } - .in_current_span() - .into_actor(self) - .wait(ctx); } } diff --git a/crates/cli/src/alerts/history.rs b/crates/cli/src/alerts/history.rs index 2848572b..99fe61f9 100644 --- a/crates/cli/src/alerts/history.rs +++ b/crates/cli/src/alerts/history.rs @@ -1,2 +1,50 @@ -#[derive(Debug, Default)] -pub struct History {} +// +use git_next_core::git::UserNotification; +use tracing::info; + +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; + +#[derive(Debug)] +pub struct History { + /// The maximum age of an item in the history. + /// + /// Items older than this will be dropped. + max_age_seconds: Duration, + + /// Maps a user notification to when it was last seen. + /// + /// The user notification will not be sent until after max_age_seconds from last seen. + /// + /// Each time we see a given user notification, the last seen time will be updated. + items: HashMap, +} +impl History { + pub fn new(max_age_seconds: Duration) -> Self { + Self { + max_age_seconds, + items: HashMap::default(), + } + } + + pub fn sendable(&mut self, user_notification: UserNotification) -> Option { + let now = Instant::now(); + self.prune(&now); // remove expired items first + let contains_key = self.items.contains_key(&user_notification); + self.items.insert(user_notification.clone(), now); + if contains_key { + info!("previously sent"); + return None; + } + info!("not sent before"); + Some(user_notification) + } + + pub fn prune(&mut self, now: &Instant) { + if let Some(threshold) = now.checked_sub(self.max_age_seconds) { + self.items.retain(|_, last_seen| *last_seen > threshold) + }; + } +} diff --git a/crates/cli/src/alerts/messages.rs b/crates/cli/src/alerts/messages.rs index ef5f46e0..79d0fd33 100644 --- a/crates/cli/src/alerts/messages.rs +++ b/crates/cli/src/alerts/messages.rs @@ -1,86 +1,6 @@ // -use derive_more::Deref as _; use git_next_core::{git::UserNotification, message, server::Shout}; -use serde_json::json; message!(UpdateShout: Shout: "Updated Shout configuration"); message!(NotifyUser: UserNotification: "Request to send the message payload to the notification webhook"); -impl NotifyUser { - pub fn as_json(&self, timestamp: time::OffsetDateTime) -> serde_json::Value { - let timestamp = timestamp.unix_timestamp().to_string(); - match self.deref() { - UserNotification::CICheckFailed { - forge_alias, - repo_alias, - commit, - } => json!({ - "type": "cicheck.failed", - "timestamp": timestamp, - "data": { - "forge_alias": forge_alias, - "repo_alias": repo_alias, - "commit": { - "sha": commit.sha(), - "message": commit.message() - } - } - }), - UserNotification::RepoConfigLoadFailure { - forge_alias, - repo_alias, - reason, - } => json!({ - "type": "config.load.failed", - "timestamp": timestamp, - "data": { - "forge_alias": forge_alias, - "repo_alias": repo_alias, - "reason": reason - } - }), - UserNotification::WebhookRegistration { - forge_alias, - repo_alias, - reason, - } => json!({ - "type": "webhook.registration.failed", - "timestamp": timestamp, - "data": { - "forge_alias": forge_alias, - "repo_alias": repo_alias, - "reason": reason - } - }), - UserNotification::DevNotBasedOnMain { - forge_alias, - repo_alias, - dev_branch, - main_branch, - dev_commit, - main_commit, - } => json!({ - "type": "branch.dev.not-on-main", - "timestamp": timestamp, - "data": { - "forge_alias": forge_alias, - "repo_alias": repo_alias, - "branches": { - "dev": dev_branch, - "main": main_branch - }, - "commits": { - "dev": { - "sha": dev_commit.sha(), - "message": dev_commit.message() - }, - "main": { - "sha": main_commit.sha(), - "message": main_commit.message() - } - } - } - }), - } - } -} diff --git a/crates/cli/src/alerts/mod.rs b/crates/cli/src/alerts/mod.rs index 96257920..d2980535 100644 --- a/crates/cli/src/alerts/mod.rs +++ b/crates/cli/src/alerts/mod.rs @@ -1,5 +1,3 @@ -use std::ops::Deref as _; - // use actix::prelude::*; @@ -8,7 +6,6 @@ use derive_more::derive::Constructor; use git_next_core::{git::UserNotification, server::Shout}; pub use history::History; -use messages::NotifyUser; mod desktop; mod email; @@ -17,11 +14,13 @@ mod history; pub mod messages; mod webhook; +#[cfg(test)] +mod tests; + #[derive(Debug, Constructor)] pub struct AlertsActor { shout: Option, // config for sending alerts to users - #[allow(dead_code)] // TODO (#128) Prevent duplicate user notifications - history: History, // record of alerts sent recently (e.g. 24 hours) + history: History, // record of alerts sent recently (e.g. 24 hours) net: kxio::network::Network, } @@ -29,8 +28,8 @@ impl Actor for AlertsActor { type Context = Context; } -fn short_message(msg: &NotifyUser) -> String { - let tail = match msg.deref() { +fn short_message(user_notification: &UserNotification) -> String { + let tail = match user_notification { UserNotification::CICheckFailed { forge_alias, repo_alias, @@ -58,8 +57,8 @@ fn short_message(msg: &NotifyUser) -> String { format!("[git-next] {tail}") } -fn full_message(msg: &NotifyUser) -> String { - match msg.deref() { +fn full_message(user_notification: &UserNotification) -> String { + match user_notification { UserNotification::CICheckFailed { forge_alias, repo_alias, diff --git a/crates/cli/src/alerts/tests/history.rs b/crates/cli/src/alerts/tests/history.rs new file mode 100644 index 00000000..2235e2fd --- /dev/null +++ b/crates/cli/src/alerts/tests/history.rs @@ -0,0 +1,66 @@ +use std::time::Duration; + +use assert2::let_assert; +use git_next_core::git::UserNotification; + +use crate::{alerts::History, repo::tests::given}; + +#[test] +fn when_history_is_empty_then_message_is_passed() { + let mut history = History::new(Duration::from_millis(1)); + + let user_notification = UserNotification::RepoConfigLoadFailure { + forge_alias: given::a_forge_alias(), + repo_alias: given::a_repo_alias(), + reason: given::a_name(), + }; + + let result = history.sendable(user_notification); + + assert!(result.is_some()); +} + +#[test] +fn when_history_has_expired_then_message_is_passed() { + let dur = Duration::from_millis(1); + let mut history = History::new(dur); + + let user_notification = UserNotification::RepoConfigLoadFailure { + forge_alias: given::a_forge_alias(), + repo_alias: given::a_repo_alias(), + reason: given::a_name(), + }; + + // add message to history + let result = history.sendable(user_notification); + let_assert!(Some(user_notification) = result); + + std::thread::sleep(dur); + + // after dur, message has expired, so is still valid + let result = history.sendable(user_notification); + assert!(result.is_some()); +} + +#[test] +fn when_history_has_unexpired_then_message_is_blocked() { + let dur = Duration::from_millis(1); + let mut history = History::new(dur); + + let user_notification = UserNotification::RepoConfigLoadFailure { + forge_alias: given::a_forge_alias(), + repo_alias: given::a_repo_alias(), + reason: given::a_name(), + }; + + // add message to history + let result = history.sendable(user_notification); + let_assert!(Some(user_notification) = result); + + // no time passed + // std::thread::sleep(dur); + + // after dur, message has expired, so is still valid + let result = history.sendable(user_notification); + assert!(result.is_none()); +} diff --git a/crates/cli/src/alerts/tests/mod.rs b/crates/cli/src/alerts/tests/mod.rs new file mode 100644 index 00000000..fe5dccc6 --- /dev/null +++ b/crates/cli/src/alerts/tests/mod.rs @@ -0,0 +1 @@ +mod history; diff --git a/crates/cli/src/alerts/webhook.rs b/crates/cli/src/alerts/webhook.rs index 0108d15f..3d7cdec7 100644 --- a/crates/cli/src/alerts/webhook.rs +++ b/crates/cli/src/alerts/webhook.rs @@ -1,12 +1,10 @@ // -use git_next_core::server::OutboundWebhook; +use git_next_core::{git::UserNotification, server::OutboundWebhook}; use secrecy::ExposeSecret as _; use standardwebhooks::Webhook; -use crate::alerts::messages::NotifyUser; - pub(super) async fn send_webhook( - msg: &NotifyUser, + user_notification: &UserNotification, webhook_config: &OutboundWebhook, net: &kxio::network::Network, ) { @@ -16,18 +14,18 @@ pub(super) async fn send_webhook( tracing::warn!("Invalid notification configuration (signer) - can't sent notification"); return; }; - do_send_webhook(msg, webhook, webhook_config, net).await; + do_send_webhook(user_notification, webhook, webhook_config, net).await; } async fn do_send_webhook( - msg: &NotifyUser, + user_notification: &UserNotification, webhook: Webhook, webhook_config: &OutboundWebhook, net: &kxio::network::Network, ) { let message_id = format!("msg_{}", ulid::Ulid::new()); let timestamp = time::OffsetDateTime::now_utc(); - let payload = msg.as_json(timestamp); + let payload = user_notification.as_json(timestamp); let timestamp = timestamp.unix_timestamp(); let to_sign = format!("{message_id}.{timestamp}.{payload}"); tracing::info!(?to_sign, ""); diff --git a/crates/cli/src/repo/mod.rs b/crates/cli/src/repo/mod.rs index c8b33250..c9cd263b 100644 --- a/crates/cli/src/repo/mod.rs +++ b/crates/cli/src/repo/mod.rs @@ -24,7 +24,7 @@ pub mod messages; mod notifications; #[cfg(test)] -mod tests; +pub mod tests; #[derive(Clone, Debug, Default)] pub struct RepoActorLog(std::sync::Arc>>); diff --git a/crates/cli/src/server/actor/tests/given.rs b/crates/cli/src/server/actor/tests/given.rs index 559a67e7..330215b7 100644 --- a/crates/cli/src/server/actor/tests/given.rs +++ b/crates/cli/src/server/actor/tests/given.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use actix::prelude::*; use crate::alerts::{AlertsActor, History}; @@ -12,5 +14,5 @@ pub fn a_network() -> kxio::network::MockNetwork { } pub fn an_alerts_actor(net: kxio::network::Network) -> Addr { - AlertsActor::new(None, History::default(), net).start() + AlertsActor::new(None, History::new(Duration::from_millis(1)), net).start() } diff --git a/crates/cli/src/server/mod.rs b/crates/cli/src/server/mod.rs index 3c0d8f93..537b5553 100644 --- a/crates/cli/src/server/mod.rs +++ b/crates/cli/src/server/mod.rs @@ -17,7 +17,9 @@ use anyhow::{Context, Result}; use kxio::{fs::FileSystem, network::Network}; use tracing::info; -use std::path::PathBuf; +use std::{path::PathBuf, time::Duration}; + +const A_DAY: Duration = Duration::from_secs(24 * 60 * 60); pub fn init(fs: FileSystem) -> Result<()> { let file_name = "git-next-server.toml"; @@ -45,7 +47,7 @@ pub fn start( let execution = async move { info!("Starting Alert Dispatcher..."); - let alerts_addr = AlertsActor::new(None, History::default(), net.clone()).start(); + let alerts_addr = AlertsActor::new(None, History::new(A_DAY), net.clone()).start(); info!("Starting Server..."); let server = ServerActor::new( diff --git a/crates/core/src/config/branch_name.rs b/crates/core/src/config/branch_name.rs index 192e7405..26e329b6 100644 --- a/crates/core/src/config/branch_name.rs +++ b/crates/core/src/config/branch_name.rs @@ -1,3 +1,4 @@ +use derive_more::derive::Display; use serde::Serialize; -crate::newtype!(BranchName: String, derive_more::Display, Default, Serialize: "The name of a Git branch"); +crate::newtype!(BranchName: String, Display, Default, Hash, Serialize: "The name of a Git branch"); diff --git a/crates/core/src/config/repo_alias.rs b/crates/core/src/config/repo_alias.rs index 7cedbf5f..67bfaac8 100644 --- a/crates/core/src/config/repo_alias.rs +++ b/crates/core/src/config/repo_alias.rs @@ -3,6 +3,6 @@ use serde::Serialize; use crate::newtype; -newtype!(RepoAlias: String, Display, Default, PartialOrd, Ord, Serialize: r#"The alias of a repo. +newtype!(RepoAlias: String, Display, Default, Hash, PartialOrd, Ord, Serialize: r#"The alias of a repo. This is the alias for the repo within `git-next-server.toml`."#); diff --git a/crates/core/src/git/user_notification.rs b/crates/core/src/git/user_notification.rs index e077338f..c25f36bd 100644 --- a/crates/core/src/git/user_notification.rs +++ b/crates/core/src/git/user_notification.rs @@ -1,7 +1,8 @@ // use crate::{git::Commit, BranchName, ForgeAlias, RepoAlias}; +use serde_json::json; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum UserNotification { CICheckFailed { forge_alias: ForgeAlias, @@ -27,3 +28,81 @@ pub enum UserNotification { main_commit: Commit, }, } +impl UserNotification { + pub fn as_json(&self, timestamp: time::OffsetDateTime) -> serde_json::Value { + let timestamp = timestamp.unix_timestamp().to_string(); + match self { + Self::CICheckFailed { + forge_alias, + repo_alias, + commit, + } => json!({ + "type": "cicheck.failed", + "timestamp": timestamp, + "data": { + "forge_alias": forge_alias, + "repo_alias": repo_alias, + "commit": { + "sha": commit.sha(), + "message": commit.message() + } + } + }), + Self::RepoConfigLoadFailure { + forge_alias, + repo_alias, + reason, + } => json!({ + "type": "config.load.failed", + "timestamp": timestamp, + "data": { + "forge_alias": forge_alias, + "repo_alias": repo_alias, + "reason": reason + } + }), + Self::WebhookRegistration { + forge_alias, + repo_alias, + reason, + } => json!({ + "type": "webhook.registration.failed", + "timestamp": timestamp, + "data": { + "forge_alias": forge_alias, + "repo_alias": repo_alias, + "reason": reason + } + }), + Self::DevNotBasedOnMain { + forge_alias, + repo_alias, + dev_branch, + main_branch, + dev_commit, + main_commit, + } => json!({ + "type": "branch.dev.not-on-main", + "timestamp": timestamp, + "data": { + "forge_alias": forge_alias, + "repo_alias": repo_alias, + "branches": { + "dev": dev_branch, + "main": main_branch + }, + "commits": { + "dev": { + "sha": dev_commit.sha(), + "message": dev_commit.message() + }, + "main": { + "sha": main_commit.sha(), + "message": main_commit.message() + } + } + } + }), + } + } +}