refactor: extract alerts into own actor
All checks were successful
Rust / build (push) Successful in 1m42s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful

This commit is contained in:
Paul Campbell 2024-08-03 12:53:59 +01:00
parent 9a2fa2e8a5
commit 421e85cb0b
17 changed files with 203 additions and 50 deletions

View file

@ -1,5 +1,5 @@
// //
use crate::{repo::messages::NotifyUser, server::actor::handlers::notify_user::short_message}; use crate::alerts::{messages::NotifyUser, short_message};
pub(super) fn send_desktop_notification(msg: &NotifyUser) { pub(super) fn send_desktop_notification(msg: &NotifyUser) {
let message = short_message(msg); let message = short_message(msg);

View file

@ -1,9 +1,7 @@
// //
use git_next_core::server::{EmailConfig, SmtpConfig}; use git_next_core::server::{EmailConfig, SmtpConfig};
use crate::repo::messages::NotifyUser; use crate::alerts::{full_message, messages::NotifyUser, short_message};
use super::{full_message, short_message};
#[derive(Debug)] #[derive(Debug)]
struct EmailMessage { struct EmailMessage {

View file

@ -0,0 +1,2 @@
mod notify_user;
mod update_shout;

View file

@ -0,0 +1,36 @@
//
use actix::prelude::*;
use tracing::{info, instrument, Instrument as _};
use crate::alerts::{
desktop::send_desktop_notification, email::send_email, messages::NotifyUser,
webhook::send_webhook, AlertsActor,
};
impl Handler<NotifyUser> for AlertsActor {
type Result = ();
#[instrument]
fn handle(&mut self, msg: NotifyUser, ctx: &mut Self::Context) -> Self::Result {
let Some(shout) = &self.shout else {
info!("No shout config available");
return;
};
let net = self.net.clone();
let shout = shout.clone();
async move {
if let Some(webhook_config) = shout.webhook() {
send_webhook(&msg, webhook_config, &net).await;
}
if let Some(email_config) = shout.email() {
send_email(&msg, email_config);
}
if shout.desktop() {
send_desktop_notification(&msg);
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}

View file

@ -0,0 +1,12 @@
//
use actix::prelude::*;
use crate::alerts::{messages::UpdateShout, AlertsActor};
impl Handler<UpdateShout> for AlertsActor {
type Result = ();
fn handle(&mut self, msg: UpdateShout, _ctx: &mut Self::Context) -> Self::Result {
self.shout.replace(msg.unwrap());
}
}

View file

@ -0,0 +1,2 @@
#[derive(Debug, Default)]
pub struct History {}

View file

@ -0,0 +1,86 @@
//
use derive_more::Deref as _;
use git_next_core::{git::UserNotification, message, server::Shout};
use serde_json::json;
message!(UpdateShout: Shout: "Updated Shout configuration");
message!(NotifyUser: UserNotification: "Request to send the message payload to the notification webhook");
impl NotifyUser {
pub fn as_json(&self, timestamp: time::OffsetDateTime) -> serde_json::Value {
let timestamp = timestamp.unix_timestamp().to_string();
match self.deref() {
UserNotification::CICheckFailed {
forge_alias,
repo_alias,
commit,
} => json!({
"type": "cicheck.failed",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"commit": {
"sha": commit.sha(),
"message": commit.message()
}
}
}),
UserNotification::RepoConfigLoadFailure {
forge_alias,
repo_alias,
reason,
} => json!({
"type": "config.load.failed",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"reason": reason
}
}),
UserNotification::WebhookRegistration {
forge_alias,
repo_alias,
reason,
} => json!({
"type": "webhook.registration.failed",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"reason": reason
}
}),
UserNotification::DevNotBasedOnMain {
forge_alias,
repo_alias,
dev_branch,
main_branch,
dev_commit,
main_commit,
} => json!({
"type": "branch.dev.not-on-main",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"branches": {
"dev": dev_branch,
"main": main_branch
},
"commits": {
"dev": {
"sha": dev_commit.sha(),
"message": dev_commit.message()
},
"main": {
"sha": main_commit.sha(),
"message": main_commit.message()
}
}
}
}),
}
}
}

View file

@ -1,44 +1,32 @@
//
mod desktop;
mod email;
mod webhook;
use std::ops::Deref as _; use std::ops::Deref as _;
//
use actix::prelude::*; use actix::prelude::*;
use desktop::send_desktop_notification; use derive_more::derive::Constructor;
use email::send_email;
use git_next_core::git::UserNotification;
use tracing::Instrument;
use webhook::send_webhook;
use crate::{repo::messages::NotifyUser, server::actor::ServerActor}; use git_next_core::{git::UserNotification, server::Shout};
impl Handler<NotifyUser> for ServerActor { pub use history::History;
type Result = (); use messages::NotifyUser;
fn handle(&mut self, msg: NotifyUser, ctx: &mut Self::Context) -> Self::Result { mod desktop;
let Some(server_config) = &self.server_config else { mod email;
return; mod handlers;
}; mod history;
let shout_config = server_config.shout().clone(); pub mod messages;
let net = self.net.clone(); mod webhook;
async move {
if let Some(webhook_config) = shout_config.webhook() { #[derive(Debug, Constructor)]
send_webhook(&msg, webhook_config, &net).await; pub struct AlertsActor {
} shout: Option<Shout>, // config for sending alerts to users
if let Some(email_config) = shout_config.email() { #[allow(dead_code)] // TODO (#128) Prevent duplicate user notifications
send_email(&msg, email_config); history: History, // record of alerts sent recently (e.g. 24 hours)
} net: kxio::network::Network,
if shout_config.desktop() {
send_desktop_notification(&msg);
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
} }
impl Actor for AlertsActor {
type Context = Context<Self>;
} }
fn short_message(msg: &NotifyUser) -> String { fn short_message(msg: &NotifyUser) -> String {

View file

@ -3,7 +3,7 @@ use git_next_core::server::OutboundWebhook;
use secrecy::ExposeSecret as _; use secrecy::ExposeSecret as _;
use standardwebhooks::Webhook; use standardwebhooks::Webhook;
use crate::repo::messages::NotifyUser; use crate::alerts::messages::NotifyUser;
pub(super) async fn send_webhook( pub(super) async fn send_webhook(
msg: &NotifyUser, msg: &NotifyUser,

View file

@ -1,4 +1,5 @@
// //
mod alerts;
mod file_watcher; mod file_watcher;
mod forge; mod forge;
mod init; mod init;

View file

@ -1,9 +1,9 @@
// //
use actix::prelude::*; use actix::prelude::*;
use crate::alerts::messages::NotifyUser;
use derive_more::Deref; use derive_more::Deref;
use kxio::network::Network; use kxio::network::Network;
use messages::NotifyUser;
use std::time::Duration; use std::time::Duration;
use tracing::{info, warn, Instrument}; use tracing::{info, warn, Instrument};

View file

@ -1,5 +1,4 @@
mod file_updated; mod file_updated;
mod notify_user;
mod receive_server_config; mod receive_server_config;
mod receive_valid_server_config; mod receive_valid_server_config;
mod shutdown; mod shutdown;

View file

@ -4,6 +4,7 @@ use actix::prelude::*;
use tracing::info; use tracing::info;
use crate::{ use crate::{
alerts::messages::UpdateShout,
server::actor::{ server::actor::{
messages::{ReceiveValidServerConfig, ValidServerConfig}, messages::{ReceiveValidServerConfig, ValidServerConfig},
ServerActor, ServerActor,
@ -18,7 +19,7 @@ use crate::{
impl Handler<ReceiveValidServerConfig> for ServerActor { impl Handler<ReceiveValidServerConfig> for ServerActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: ReceiveValidServerConfig, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ReceiveValidServerConfig, _ctx: &mut Self::Context) -> Self::Result {
let ValidServerConfig { let ValidServerConfig {
server_config, server_config,
socket_address, socket_address,
@ -33,6 +34,7 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
info!("Starting Webhook Server..."); info!("Starting Webhook Server...");
let webhook_router = WebhookRouter::default().start(); let webhook_router = WebhookRouter::default().start();
let listen_url = server_config.listen().url(); let listen_url = server_config.listen().url();
let alerts = self.alerts.clone();
// Forge Actors // Forge Actors
for (forge_alias, forge_config) in server_config.forges() { for (forge_alias, forge_config) in server_config.forges() {
let repo_actors = self let repo_actors = self
@ -41,7 +43,7 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
forge_alias.clone(), forge_alias.clone(),
&server_storage, &server_storage,
listen_url, listen_url,
ctx.address().recipient(), alerts.clone().recipient(),
) )
.into_iter() .into_iter()
.map(|a| self.start_actor(a)) .map(|a| self.start_actor(a))
@ -64,6 +66,8 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
let webhook_actor_addr = let webhook_actor_addr =
WebhookActor::new(socket_address, webhook_router.recipient()).start(); WebhookActor::new(socket_address, webhook_router.recipient()).start();
self.webhook_actor_addr.replace(webhook_actor_addr); self.webhook_actor_addr.replace(webhook_actor_addr);
let shout = server_config.shout().clone();
self.server_config.replace(server_config); self.server_config.replace(server_config);
self.alerts.do_send(UpdateShout::new(shout));
} }
} }

View file

@ -10,11 +10,10 @@ mod handlers;
pub mod messages; pub mod messages;
use crate::{ use crate::{
alerts::messages::NotifyUser,
alerts::AlertsActor,
forge::Forge, forge::Forge,
repo::{ repo::{messages::CloneRepo, RepoActor},
messages::{CloneRepo, NotifyUser},
RepoActor,
},
webhook::WebhookActor, webhook::WebhookActor,
}; };
@ -56,6 +55,7 @@ pub struct ServerActor {
webhook_actor_addr: Option<Addr<WebhookActor>>, webhook_actor_addr: Option<Addr<WebhookActor>>,
fs: FileSystem, fs: FileSystem,
net: Network, net: Network,
alerts: Addr<AlertsActor>,
repository_factory: Box<dyn RepositoryFactory>, repository_factory: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration, sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>, repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
@ -71,6 +71,7 @@ impl ServerActor {
pub fn new( pub fn new(
fs: FileSystem, fs: FileSystem,
net: Network, net: Network,
alerts: Addr<AlertsActor>,
repo: Box<dyn RepositoryFactory>, repo: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration, sleep_duration: std::time::Duration,
) -> Self { ) -> Self {
@ -81,6 +82,7 @@ impl ServerActor {
webhook_actor_addr: None, webhook_actor_addr: None,
fs, fs,
net, net,
alerts,
repository_factory: repo, repository_factory: repo,
sleep_duration, sleep_duration,
repo_actors: BTreeMap::new(), repo_actors: BTreeMap::new(),

View file

@ -1,3 +1,8 @@
use actix::prelude::*;
use crate::alerts::{AlertsActor, History};
//
pub fn a_filesystem() -> kxio::fs::FileSystem { pub fn a_filesystem() -> kxio::fs::FileSystem {
kxio::fs::temp().unwrap_or_else(|e| panic!("{}", e)) kxio::fs::temp().unwrap_or_else(|e| panic!("{}", e))
} }
@ -5,3 +10,7 @@ pub fn a_filesystem() -> kxio::fs::FileSystem {
pub fn a_network() -> kxio::network::MockNetwork { pub fn a_network() -> kxio::network::MockNetwork {
kxio::network::MockNetwork::new() kxio::network::MockNetwork::new()
} }
pub fn an_alerts_actor(net: kxio::network::Network) -> Addr<AlertsActor> {
AlertsActor::new(None, History::default(), net).start()
}

View file

@ -18,11 +18,12 @@ async fn when_webhook_url_has_trailing_slash_should_not_send() {
// parameters // parameters
let fs = given::a_filesystem(); let fs = given::a_filesystem();
let net = given::a_network(); let net = given::a_network();
let alerts = given::an_alerts_actor(net.clone().into());
let repo = git::repository::factory::mock(); let repo = git::repository::factory::mock();
let duration = std::time::Duration::from_millis(1); let duration = std::time::Duration::from_millis(1);
// sut // sut
let server = ServerActor::new(fs.clone(), net.into(), repo, duration); let server = ServerActor::new(fs.clone(), net.into(), alerts, repo, duration);
// collaborators // collaborators
let listen = Listen::new( let listen = Listen::new(

View file

@ -6,7 +6,10 @@ mod tests;
use actix::prelude::*; use actix::prelude::*;
use crate::file_watcher::{watch_file, FileUpdated}; use crate::{
alerts::{AlertsActor, History},
file_watcher::{watch_file, FileUpdated},
};
use actor::ServerActor; use actor::ServerActor;
use git_next_core::git::RepositoryFactory; use git_next_core::git::RepositoryFactory;
@ -41,8 +44,18 @@ pub fn start(
init_logging(); init_logging();
let execution = async move { let execution = async move {
info!("Starting Alert Dispatcher...");
let alerts_addr = AlertsActor::new(None, History::default(), net.clone()).start();
info!("Starting Server..."); info!("Starting Server...");
let server = ServerActor::new(fs.clone(), net.clone(), repo, sleep_duration).start(); let server = ServerActor::new(
fs.clone(),
net.clone(),
alerts_addr.clone(),
repo,
sleep_duration,
)
.start();
server.do_send(FileUpdated); server.do_send(FileUpdated);
info!("Starting File Watcher..."); info!("Starting File Watcher...");