Compare commits
2 commits
0adfcff93d
...
31859a0c69
Author | SHA1 | Date | |
---|---|---|---|
31859a0c69 | |||
731c22826c |
10 changed files with 92 additions and 25 deletions
|
@ -79,6 +79,7 @@ bytes = "1.6"
|
||||||
ulid = "1.1"
|
ulid = "1.1"
|
||||||
warp = "0.3"
|
warp = "0.3"
|
||||||
time = "0.3"
|
time = "0.3"
|
||||||
|
standardwebhooks = "1.0"
|
||||||
|
|
||||||
# boilerplate
|
# boilerplate
|
||||||
derive_more = { version = "1.0.0-beta.6", features = [
|
derive_more = { version = "1.0.0-beta.6", features = [
|
||||||
|
|
|
@ -8,6 +8,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use kxio::fs::FileSystem;
|
use kxio::fs::FileSystem;
|
||||||
|
use secrecy::Secret;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use crate::{newtype, ForgeAlias, ForgeConfig, RepoAlias};
|
use crate::{newtype, ForgeAlias, ForgeConfig, RepoAlias};
|
||||||
|
@ -42,7 +43,7 @@ type Result<T> = core::result::Result<T, Error>;
|
||||||
pub struct ServerConfig {
|
pub struct ServerConfig {
|
||||||
http: Http,
|
http: Http,
|
||||||
webhook: InboundWebhook,
|
webhook: InboundWebhook,
|
||||||
notifications: Notification,
|
notification: Notification,
|
||||||
storage: ServerStorage,
|
storage: ServerStorage,
|
||||||
pub forge: BTreeMap<String, ForgeConfig>,
|
pub forge: BTreeMap<String, ForgeConfig>,
|
||||||
}
|
}
|
||||||
|
@ -65,11 +66,11 @@ impl ServerConfig {
|
||||||
&self.storage
|
&self.storage
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const fn notifications(&self) -> &Notification {
|
pub const fn notification(&self) -> &Notification {
|
||||||
&self.notifications
|
&self.notification
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const fn webhook(&self) -> &InboundWebhook {
|
pub const fn inbound_webhook(&self) -> &InboundWebhook {
|
||||||
&self.webhook
|
&self.webhook
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,9 +206,17 @@ impl Notification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const fn r#type(&self) -> NotificationType {
|
||||||
|
self.r#type
|
||||||
|
}
|
||||||
|
|
||||||
pub fn webhook_url(&self) -> Option<String> {
|
pub fn webhook_url(&self) -> Option<String> {
|
||||||
self.webhook.clone().map(|x| x.url)
|
self.webhook.clone().map(|x| x.url)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn webhook_secret(&self) -> Option<Secret<String>> {
|
||||||
|
self.webhook.clone().map(|x| x.secret).map(Secret::new)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
impl Default for Notification {
|
impl Default for Notification {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
|
@ -223,10 +232,10 @@ impl Default for Notification {
|
||||||
Eq,
|
Eq,
|
||||||
PartialOrd,
|
PartialOrd,
|
||||||
Ord,
|
Ord,
|
||||||
derive_more::AsRef,
|
|
||||||
serde::Deserialize,
|
serde::Deserialize,
|
||||||
derive_more::Constructor,
|
derive_more::Constructor,
|
||||||
)]
|
)]
|
||||||
pub struct OutboundWebhook {
|
pub struct OutboundWebhook {
|
||||||
url: String,
|
url: String,
|
||||||
|
secret: String,
|
||||||
}
|
}
|
||||||
|
|
|
@ -468,10 +468,10 @@ mod server {
|
||||||
let http = &server_config.http()?;
|
let http = &server_config.http()?;
|
||||||
let http_addr = http.ip();
|
let http_addr = http.ip();
|
||||||
let http_port = server_config.http()?.port();
|
let http_port = server_config.http()?.port();
|
||||||
let webhook_url = server_config.webhook().base_url();
|
let webhook_url = server_config.inbound_webhook().base_url();
|
||||||
let storage_path = server_config.storage().path();
|
let storage_path = server_config.storage().path();
|
||||||
let notification_webhook_url = server_config
|
let notification_webhook_url = server_config
|
||||||
.notifications()
|
.notification()
|
||||||
.webhook_url()
|
.webhook_url()
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
let forge_alias = server_config
|
let forge_alias = server_config
|
||||||
|
|
|
@ -4,10 +4,10 @@ use serde_json::json;
|
||||||
|
|
||||||
use crate::messages::NotifyUser;
|
use crate::messages::NotifyUser;
|
||||||
|
|
||||||
impl From<NotifyUser> for serde_json::Value {
|
impl NotifyUser {
|
||||||
fn from(value: NotifyUser) -> Self {
|
pub fn as_json(self, timestamp: time::OffsetDateTime) -> serde_json::Value {
|
||||||
let timestamp = time::OffsetDateTime::now_utc().unix_timestamp().to_string();
|
let timestamp = timestamp.unix_timestamp().to_string();
|
||||||
match value.deref() {
|
match self.deref() {
|
||||||
UserNotification::CICheckFailed {
|
UserNotification::CICheckFailed {
|
||||||
forge_alias,
|
forge_alias,
|
||||||
repo_alias,
|
repo_alias,
|
||||||
|
|
|
@ -31,6 +31,10 @@ actix = { workspace = true }
|
||||||
# Webhooks
|
# Webhooks
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
ulid = { workspace = true }
|
||||||
|
time = { workspace = true }
|
||||||
|
secrecy = { workspace = true }
|
||||||
|
standardwebhooks = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
# Testing
|
# Testing
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
|
use git_next_config::server::NotificationType;
|
||||||
use git_next_repo_actor::messages::NotifyUser;
|
use git_next_repo_actor::messages::NotifyUser;
|
||||||
|
|
||||||
use crate::ServerActor;
|
use crate::ServerActor;
|
||||||
|
@ -9,9 +10,31 @@ impl Handler<NotifyUser> for ServerActor {
|
||||||
type Result = ();
|
type Result = ();
|
||||||
|
|
||||||
fn handle(&mut self, msg: NotifyUser, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: NotifyUser, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
let _payload = serde_json::Value::from(msg);
|
let Some(server_config) = &self.server_config else {
|
||||||
tracing::info!("{}", _payload.to_string());
|
return;
|
||||||
// TODO: (#95) should notify user
|
};
|
||||||
// send post to notification webhook url
|
let notification_type = server_config.notification().r#type();
|
||||||
|
match notification_type {
|
||||||
|
NotificationType::None => { /* do nothing */ }
|
||||||
|
NotificationType::Webhook => {
|
||||||
|
let message_id = format!("msg_{}", ulid::Ulid::new());
|
||||||
|
let timestamp = time::OffsetDateTime::now_utc(); //.unix_timestamp().to_string();
|
||||||
|
let payload = msg.as_json(timestamp).to_string();
|
||||||
|
let timestamp = timestamp.unix_timestamp();
|
||||||
|
let to_sign = format!("{message_id}.{timestamp}.{payload}");
|
||||||
|
tracing::info!(?to_sign, "");
|
||||||
|
let Some(webhook) = self.webhook.as_ref() else {
|
||||||
|
tracing::warn!("Invalid notification configuration - can't sent notification");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
#[allow(clippy::expect_used)]
|
||||||
|
let signature = webhook
|
||||||
|
.sign(&message_id, timestamp, payload.as_ref())
|
||||||
|
.expect("signature");
|
||||||
|
tracing::info!(?signature, "");
|
||||||
|
// TODO: (#95) should notify user
|
||||||
|
// send post to notification webhook url
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ impl Handler<ReceiveServerConfig> for ServerActor {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
if msg.webhook().base_url().ends_with('/') {
|
if msg.inbound_webhook().base_url().ends_with('/') {
|
||||||
tracing::error!("webhook.url must not end with a '/'");
|
tracing::error!("webhook.url must not end with a '/'");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
|
use git_next_config::server::NotificationType;
|
||||||
use git_next_webhook_actor::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter};
|
use git_next_webhook_actor::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter};
|
||||||
|
use standardwebhooks::Webhook;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
messages::{ReceiveValidServerConfig, ValidServerConfig},
|
messages::{ReceiveValidServerConfig, ValidServerConfig},
|
||||||
|
@ -15,14 +17,35 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
|
||||||
socket_address,
|
socket_address,
|
||||||
server_storage,
|
server_storage,
|
||||||
} = msg.unwrap();
|
} = msg.unwrap();
|
||||||
if let Some(webhook) = self.webhook.take() {
|
// shutdown any existing webhook actor
|
||||||
webhook.do_send(ShutdownWebhook);
|
if let Some(webhook_actor_addr) = self.webhook_actor_addr.take() {
|
||||||
|
webhook_actor_addr.do_send(ShutdownWebhook);
|
||||||
|
}
|
||||||
|
match server_config.notification().r#type() {
|
||||||
|
NotificationType::None => { /* do nothing */ }
|
||||||
|
NotificationType::Webhook => {
|
||||||
|
// Create webhook signer
|
||||||
|
use secrecy::ExposeSecret;
|
||||||
|
let webhook = server_config
|
||||||
|
.notification()
|
||||||
|
.webhook_secret()
|
||||||
|
.map(|secret| Webhook::new(secret.expose_secret()))
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| {
|
||||||
|
tracing::error!(
|
||||||
|
"Invalid notification webhook secret (will not send notifications): {e}"
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.ok()
|
||||||
|
.flatten();
|
||||||
|
self.webhook = webhook;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.generation.inc();
|
self.generation.inc();
|
||||||
// Webhook Server
|
// Webhook Server
|
||||||
tracing::info!("Starting Webhook Server...");
|
tracing::info!("Starting Webhook Server...");
|
||||||
let webhook_router = WebhookRouter::default().start();
|
let webhook_router = WebhookRouter::default().start();
|
||||||
let webhook = server_config.webhook();
|
let inbound_webhook = server_config.inbound_webhook();
|
||||||
// Forge Actors
|
// Forge Actors
|
||||||
for (forge_alias, forge_config) in server_config.forges() {
|
for (forge_alias, forge_config) in server_config.forges() {
|
||||||
let repo_actors = self
|
let repo_actors = self
|
||||||
|
@ -30,7 +53,7 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
|
||||||
forge_config,
|
forge_config,
|
||||||
forge_alias.clone(),
|
forge_alias.clone(),
|
||||||
&server_storage,
|
&server_storage,
|
||||||
webhook,
|
inbound_webhook,
|
||||||
ctx.address().recipient(),
|
ctx.address().recipient(),
|
||||||
)
|
)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -51,7 +74,9 @@ impl Handler<ReceiveValidServerConfig> for ServerActor {
|
||||||
.insert((forge_alias.clone(), repo_alias), addr);
|
.insert((forge_alias.clone(), repo_alias), addr);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start();
|
let webhook_actor_addr =
|
||||||
self.webhook.replace(webhook);
|
WebhookActor::new(socket_address, webhook_router.recipient()).start();
|
||||||
|
self.webhook_actor_addr.replace(webhook_actor_addr);
|
||||||
|
self.server_config.replace(server_config);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ impl Handler<Shutdown> for ServerActor {
|
||||||
tracing::debug!(%forge_alias, %repo_alias, "removed webhook");
|
tracing::debug!(%forge_alias, %repo_alias, "removed webhook");
|
||||||
});
|
});
|
||||||
tracing::debug!("server shutdown");
|
tracing::debug!("server shutdown");
|
||||||
if let Some(webhook) = self.webhook.take() {
|
if let Some(webhook) = self.webhook_actor_addr.take() {
|
||||||
tracing::debug!("shuting down webhook");
|
tracing::debug!("shuting down webhook");
|
||||||
webhook.do_send(ShutdownWebhook);
|
webhook.do_send(ShutdownWebhook);
|
||||||
tracing::debug!("webhook shutdown");
|
tracing::debug!("webhook shutdown");
|
||||||
|
|
|
@ -14,6 +14,7 @@ use git_next_repo_actor::messages::NotifyUser;
|
||||||
use git_next_repo_actor::{messages::CloneRepo, RepoActor};
|
use git_next_repo_actor::{messages::CloneRepo, RepoActor};
|
||||||
use git_next_webhook_actor as webhook;
|
use git_next_webhook_actor as webhook;
|
||||||
use kxio::{fs::FileSystem, network::Network};
|
use kxio::{fs::FileSystem, network::Network};
|
||||||
|
use standardwebhooks::Webhook;
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
|
@ -45,13 +46,15 @@ type Result<T> = core::result::Result<T, Error>;
|
||||||
#[derive(derive_with::With)]
|
#[derive(derive_with::With)]
|
||||||
#[with(message_log)]
|
#[with(message_log)]
|
||||||
pub struct ServerActor {
|
pub struct ServerActor {
|
||||||
|
server_config: Option<ServerConfig>,
|
||||||
generation: Generation,
|
generation: Generation,
|
||||||
webhook: Option<Addr<WebhookActor>>,
|
webhook_actor_addr: Option<Addr<WebhookActor>>,
|
||||||
fs: FileSystem,
|
fs: FileSystem,
|
||||||
net: Network,
|
net: Network,
|
||||||
repository_factory: Box<dyn RepositoryFactory>,
|
repository_factory: Box<dyn RepositoryFactory>,
|
||||||
sleep_duration: std::time::Duration,
|
sleep_duration: std::time::Duration,
|
||||||
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
|
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
|
||||||
|
webhook: Option<Webhook>,
|
||||||
|
|
||||||
// testing
|
// testing
|
||||||
message_log: Option<Arc<RwLock<Vec<String>>>>,
|
message_log: Option<Arc<RwLock<Vec<String>>>>,
|
||||||
|
@ -69,13 +72,15 @@ impl ServerActor {
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let generation = Generation::default();
|
let generation = Generation::default();
|
||||||
Self {
|
Self {
|
||||||
|
server_config: None,
|
||||||
generation,
|
generation,
|
||||||
webhook: None,
|
webhook_actor_addr: None,
|
||||||
fs,
|
fs,
|
||||||
net,
|
net,
|
||||||
repository_factory: repo,
|
repository_factory: repo,
|
||||||
sleep_duration,
|
sleep_duration,
|
||||||
repo_actors: BTreeMap::new(),
|
repo_actors: BTreeMap::new(),
|
||||||
|
webhook: None,
|
||||||
message_log: None,
|
message_log: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue