WIP: post webhook - apply some to previous commits
All checks were successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/cron-docker-builder Pipeline was successful

This commit is contained in:
Paul Campbell 2024-07-21 20:22:42 +01:00
parent 731c22826c
commit 31859a0c69
9 changed files with 89 additions and 22 deletions

View file

@ -79,6 +79,7 @@ bytes = "1.6"
ulid = "1.1" ulid = "1.1"
warp = "0.3" warp = "0.3"
time = "0.3" time = "0.3"
standardwebhooks = "1.0"
# boilerplate # boilerplate
derive_more = { version = "1.0.0-beta.6", features = [ derive_more = { version = "1.0.0-beta.6", features = [

View file

@ -8,6 +8,7 @@ use std::{
}; };
use kxio::fs::FileSystem; use kxio::fs::FileSystem;
use secrecy::Secret;
use tracing::info; use tracing::info;
use crate::{newtype, ForgeAlias, ForgeConfig, RepoAlias}; use crate::{newtype, ForgeAlias, ForgeConfig, RepoAlias};
@ -42,7 +43,7 @@ type Result<T> = core::result::Result<T, Error>;
pub struct ServerConfig { pub struct ServerConfig {
http: Http, http: Http,
webhook: InboundWebhook, webhook: InboundWebhook,
notifications: Notification, notification: Notification,
storage: ServerStorage, storage: ServerStorage,
pub forge: BTreeMap<String, ForgeConfig>, pub forge: BTreeMap<String, ForgeConfig>,
} }
@ -65,11 +66,11 @@ impl ServerConfig {
&self.storage &self.storage
} }
pub const fn notifications(&self) -> &Notification { pub const fn notification(&self) -> &Notification {
&self.notifications &self.notification
} }
pub const fn webhook(&self) -> &InboundWebhook { pub const fn inbound_webhook(&self) -> &InboundWebhook {
&self.webhook &self.webhook
} }
@ -205,9 +206,17 @@ impl Notification {
} }
} }
pub const fn r#type(&self) -> NotificationType {
self.r#type
}
pub fn webhook_url(&self) -> Option<String> { pub fn webhook_url(&self) -> Option<String> {
self.webhook.clone().map(|x| x.url) self.webhook.clone().map(|x| x.url)
} }
pub fn webhook_secret(&self) -> Option<Secret<String>> {
self.webhook.clone().map(|x| x.secret).map(Secret::new)
}
} }
impl Default for Notification { impl Default for Notification {
fn default() -> Self { fn default() -> Self {
@ -223,10 +232,10 @@ impl Default for Notification {
Eq, Eq,
PartialOrd, PartialOrd,
Ord, Ord,
derive_more::AsRef,
serde::Deserialize, serde::Deserialize,
derive_more::Constructor, derive_more::Constructor,
)] )]
pub struct OutboundWebhook { pub struct OutboundWebhook {
url: String, url: String,
secret: String,
} }

View file

@ -468,10 +468,10 @@ mod server {
let http = &server_config.http()?; let http = &server_config.http()?;
let http_addr = http.ip(); let http_addr = http.ip();
let http_port = server_config.http()?.port(); let http_port = server_config.http()?.port();
let webhook_url = server_config.webhook().base_url(); let webhook_url = server_config.inbound_webhook().base_url();
let storage_path = server_config.storage().path(); let storage_path = server_config.storage().path();
let notification_webhook_url = server_config let notification_webhook_url = server_config
.notifications() .notification()
.webhook_url() .webhook_url()
.unwrap_or_default(); .unwrap_or_default();
let forge_alias = server_config let forge_alias = server_config

View file

@ -4,10 +4,10 @@ use serde_json::json;
use crate::messages::NotifyUser; use crate::messages::NotifyUser;
impl From<NotifyUser> for serde_json::Value { impl NotifyUser {
fn from(value: NotifyUser) -> Self { pub fn as_json(self, timestamp: time::OffsetDateTime) -> serde_json::Value {
let timestamp = time::OffsetDateTime::now_utc().unix_timestamp().to_string(); let timestamp = timestamp.unix_timestamp().to_string();
match value.deref() { match self.deref() {
UserNotification::CICheckFailed { UserNotification::CICheckFailed {
forge_alias, forge_alias,
repo_alias, repo_alias,

View file

@ -31,6 +31,10 @@ actix = { workspace = true }
# Webhooks # Webhooks
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
ulid = { workspace = true }
time = { workspace = true }
secrecy = { workspace = true }
standardwebhooks = { workspace = true }
[dev-dependencies] [dev-dependencies]
# Testing # Testing

View file

@ -1,6 +1,7 @@
// //
use actix::prelude::*; use actix::prelude::*;
use git_next_config::server::NotificationType;
use git_next_repo_actor::messages::NotifyUser; use git_next_repo_actor::messages::NotifyUser;
use crate::ServerActor; use crate::ServerActor;
@ -9,9 +10,31 @@ impl Handler<NotifyUser> for ServerActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: NotifyUser, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: NotifyUser, _ctx: &mut Self::Context) -> Self::Result {
let _payload = serde_json::Value::from(msg); let Some(server_config) = &self.server_config else {
tracing::info!("{}", _payload.to_string()); return;
// TODO: (#95) should notify user };
// send post to notification webhook url let notification_type = server_config.notification().r#type();
match notification_type {
NotificationType::None => { /* do nothing */ }
NotificationType::Webhook => {
let message_id = format!("msg_{}", ulid::Ulid::new());
let timestamp = time::OffsetDateTime::now_utc(); //.unix_timestamp().to_string();
let payload = msg.as_json(timestamp).to_string();
let timestamp = timestamp.unix_timestamp();
let to_sign = format!("{message_id}.{timestamp}.{payload}");
tracing::info!(?to_sign, "");
let Some(webhook) = self.webhook.as_ref() else {
tracing::warn!("Invalid notification configuration - can't sent notification");
return;
};
#[allow(clippy::expect_used)]
let signature = webhook
.sign(&message_id, timestamp, payload.as_ref())
.expect("signature");
tracing::info!(?signature, "");
// TODO: (#95) should notify user
// send post to notification webhook url
}
}
} }
} }

View file

@ -21,7 +21,7 @@ impl Handler<ReceiveServerConfig> for ServerActor {
return; return;
}; };
if msg.webhook().base_url().ends_with('/') { if msg.inbound_webhook().base_url().ends_with('/') {
tracing::error!("webhook.url must not end with a '/'"); tracing::error!("webhook.url must not end with a '/'");
return; return;
} }

View file

@ -1,5 +1,7 @@
use actix::prelude::*; use actix::prelude::*;
use git_next_config::server::NotificationType;
use git_next_webhook_actor::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter}; use git_next_webhook_actor::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter};
use standardwebhooks::Webhook;
use crate::{ use crate::{
messages::{ReceiveValidServerConfig, ValidServerConfig}, messages::{ReceiveValidServerConfig, ValidServerConfig},
@ -15,14 +17,35 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
socket_address, socket_address,
server_storage, server_storage,
} = msg.unwrap(); } = msg.unwrap();
if let Some(webhook) = self.webhook_actor_addr.take() { // shutdown any existing webhook actor
webhook.do_send(ShutdownWebhook); if let Some(webhook_actor_addr) = self.webhook_actor_addr.take() {
webhook_actor_addr.do_send(ShutdownWebhook);
}
match server_config.notification().r#type() {
NotificationType::None => { /* do nothing */ }
NotificationType::Webhook => {
// Create webhook signer
use secrecy::ExposeSecret;
let webhook = server_config
.notification()
.webhook_secret()
.map(|secret| Webhook::new(secret.expose_secret()))
.transpose()
.map_err(|e| {
tracing::error!(
"Invalid notification webhook secret (will not send notifications): {e}"
)
})
.ok()
.flatten();
self.webhook = webhook;
}
} }
self.generation.inc(); self.generation.inc();
// Webhook Server // Webhook Server
tracing::info!("Starting Webhook Server..."); tracing::info!("Starting Webhook Server...");
let webhook_router = WebhookRouter::default().start(); let webhook_router = WebhookRouter::default().start();
let webhook = server_config.webhook(); let inbound_webhook = server_config.inbound_webhook();
// Forge Actors // Forge Actors
for (forge_alias, forge_config) in server_config.forges() { for (forge_alias, forge_config) in server_config.forges() {
let repo_actors = self let repo_actors = self
@ -30,7 +53,7 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
forge_config, forge_config,
forge_alias.clone(), forge_alias.clone(),
&server_storage, &server_storage,
webhook, inbound_webhook,
ctx.address().recipient(), ctx.address().recipient(),
) )
.into_iter() .into_iter()
@ -51,7 +74,9 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
.insert((forge_alias.clone(), repo_alias), addr); .insert((forge_alias.clone(), repo_alias), addr);
}); });
} }
let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start(); let webhook_actor_addr =
self.webhook_actor_addr.replace(webhook); WebhookActor::new(socket_address, webhook_router.recipient()).start();
self.webhook_actor_addr.replace(webhook_actor_addr);
self.server_config.replace(server_config);
} }
} }

View file

@ -14,6 +14,7 @@ use git_next_repo_actor::messages::NotifyUser;
use git_next_repo_actor::{messages::CloneRepo, RepoActor}; use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use git_next_webhook_actor as webhook; use git_next_webhook_actor as webhook;
use kxio::{fs::FileSystem, network::Network}; use kxio::{fs::FileSystem, network::Network};
use standardwebhooks::Webhook;
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
path::PathBuf, path::PathBuf,
@ -45,6 +46,7 @@ type Result<T> = core::result::Result<T, Error>;
#[derive(derive_with::With)] #[derive(derive_with::With)]
#[with(message_log)] #[with(message_log)]
pub struct ServerActor { pub struct ServerActor {
server_config: Option<ServerConfig>,
generation: Generation, generation: Generation,
webhook_actor_addr: Option<Addr<WebhookActor>>, webhook_actor_addr: Option<Addr<WebhookActor>>,
fs: FileSystem, fs: FileSystem,
@ -52,6 +54,7 @@ pub struct ServerActor {
repository_factory: Box<dyn RepositoryFactory>, repository_factory: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration, sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>, repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
webhook: Option<Webhook>,
// testing // testing
message_log: Option<Arc<RwLock<Vec<String>>>>, message_log: Option<Arc<RwLock<Vec<String>>>>,
@ -69,6 +72,7 @@ impl ServerActor {
) -> Self { ) -> Self {
let generation = Generation::default(); let generation = Generation::default();
Self { Self {
server_config: None,
generation, generation,
webhook_actor_addr: None, webhook_actor_addr: None,
fs, fs,
@ -76,6 +80,7 @@ impl ServerActor {
repository_factory: repo, repository_factory: repo,
sleep_duration, sleep_duration,
repo_actors: BTreeMap::new(), repo_actors: BTreeMap::new(),
webhook: None,
message_log: None, message_log: None,
} }
} }