From df2d9d684c5cca1e724d42d3c455ec194a1e876f Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Sat, 13 Apr 2024 16:16:09 +0100 Subject: [PATCH] feat(server/webhook): add webhook server Closes kemitix/git-next#18 --- Cargo.toml | 5 ++++ src/server/actors/mod.rs | 1 + src/server/actors/repo/branch.rs | 2 +- src/server/actors/repo/status.rs | 2 +- src/server/actors/repo/webhook.rs | 19 +++++++++++- src/server/actors/webhook/message.rs | 44 ++++++++++++++++++++++++++++ src/server/actors/webhook/mod.rs | 34 +++++++++++++++++++++ src/server/actors/webhook/router.rs | 43 +++++++++++++++++++++++++++ src/server/actors/webhook/server.rs | 39 ++++++++++++++++++++++++ src/server/config.rs | 2 +- src/server/mod.rs | 18 +++++++----- 11 files changed, 198 insertions(+), 11 deletions(-) create mode 100644 src/server/actors/webhook/message.rs create mode 100644 src/server/actors/webhook/mod.rs create mode 100644 src/server/actors/webhook/router.rs create mode 100644 src/server/actors/webhook/server.rs diff --git a/Cargo.toml b/Cargo.toml index 5b08d88..8887256 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,11 @@ secrecy = "0.8" # Conventional Commit check git-conventional = "0.12" +# Webhooks +bytes = "1.6" +ulid = "1.1" +warp = "0.3" + # error handling terrors = "0.3" diff --git a/src/server/actors/mod.rs b/src/server/actors/mod.rs index c426b23..ce41572 100644 --- a/src/server/actors/mod.rs +++ b/src/server/actors/mod.rs @@ -1 +1,2 @@ pub mod repo; +pub mod webhook; diff --git a/src/server/actors/repo/branch.rs b/src/server/actors/repo/branch.rs index 7ccb7a2..524c18f 100644 --- a/src/server/actors/repo/branch.rs +++ b/src/server/actors/repo/branch.rs @@ -145,7 +145,7 @@ fn reset_next_to_main( } async fn revalidate_positions(addr: Addr) { - // TODO : (#18) sleep and restart while we don't have webhooks + // TODO : (#43) sleep and restart while we don't have webhooks tokio::time::sleep(std::time::Duration::from_secs(10)).await; addr.do_send(ValidateRepo) } diff --git a/src/server/actors/repo/status.rs b/src/server/actors/repo/status.rs index 9c539dd..4f94b50 100644 --- a/src/server/actors/repo/status.rs +++ b/src/server/actors/repo/status.rs @@ -37,7 +37,7 @@ pub async fn check_next( warn!("Checks have failed"); } } - // TODO : (#18) sleep and restart while we don't have webhooks + // TODO : (#43) sleep and restart while we don't have webhooks tokio::time::sleep(std::time::Duration::from_secs(10)).await; addr.do_send(ValidateRepo); } diff --git a/src/server/actors/repo/webhook.rs b/src/server/actors/repo/webhook.rs index 448bcef..005ae49 100644 --- a/src/server/actors/repo/webhook.rs +++ b/src/server/actors/repo/webhook.rs @@ -1,6 +1,12 @@ +use actix::prelude::*; +use tracing::info; + use std::ops::Deref; -use crate::server::actors::repo::ValidateRepo; +use crate::server::actors::{ + repo::{RepoActor, ValidateRepo}, + webhook::WebhookMessage, +}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct WebhookId(String); @@ -36,3 +42,14 @@ pub async fn register( tokio::time::sleep(std::time::Duration::from_secs(10)).await; addr.do_send(ValidateRepo); } + +impl Handler for RepoActor { + type Result = (); + + fn handle(&mut self, msg: WebhookMessage, _ctx: &mut Self::Context) -> Self::Result { + let id = msg.id(); + let body = msg.body(); + info!(?id, ?body, "RepoActor received message"); + // TODO: (#43) do something with the message + } +} diff --git a/src/server/actors/webhook/message.rs b/src/server/actors/webhook/message.rs new file mode 100644 index 0000000..d392ee7 --- /dev/null +++ b/src/server/actors/webhook/message.rs @@ -0,0 +1,44 @@ +// +use actix::prelude::*; + +#[derive(Message, Debug, Clone)] +#[rtype(result = "()")] +pub struct WebhookMessage { + id: String, + path: String, + // query: String, + // headers: warp::http::HeaderMap, + body: String, +} +impl WebhookMessage { + pub const fn new( + id: String, + path: String, + // query: String, + // headers: warp::http::HeaderMap, + body: String, + ) -> Self { + Self { + id, + path, + // query, + // headers, + body, + } + } + pub const fn id(&self) -> &String { + &self.id + } + pub const fn path(&self) -> &String { + &self.path + } + // pub const fn query(&self) -> &String { + // &self.query + // } + // pub const fn headers(&self) -> &warp::http::HeaderMap { + // &self.headers + // } + pub const fn body(&self) -> &String { + &self.body + } +} diff --git a/src/server/actors/webhook/mod.rs b/src/server/actors/webhook/mod.rs new file mode 100644 index 0000000..c8b2f8f --- /dev/null +++ b/src/server/actors/webhook/mod.rs @@ -0,0 +1,34 @@ +// crate::server::actors::webhook + +mod message; +mod router; +mod server; + +use actix::prelude::*; + +pub use message::WebhookMessage; +pub use router::AddWebhookRecipient; +pub use router::WebhookRouter; + +#[derive(Debug)] +pub struct WebhookActor { + spawn_handle: Option, + message_receiver: Recipient, +} +impl WebhookActor { + pub const fn new(message_receiver: Recipient) -> Self { + Self { + message_receiver, + spawn_handle: None, + } + } +} +impl Actor for WebhookActor { + type Context = actix::Context; + fn started(&mut self, ctx: &mut Self::Context) { + let address: Recipient = self.message_receiver.clone(); + let server = server::start(address); + let spawn_handle = ctx.spawn(server.into_actor(self)); + self.spawn_handle.replace(spawn_handle); + } +} diff --git a/src/server/actors/webhook/router.rs b/src/server/actors/webhook/router.rs new file mode 100644 index 0000000..4b23fa2 --- /dev/null +++ b/src/server/actors/webhook/router.rs @@ -0,0 +1,43 @@ +use std::collections::HashMap; + +use actix::prelude::*; +use tracing::info; + +use crate::server::{actors::webhook::message::WebhookMessage, config::RepoAlias}; + +#[derive(Default)] +pub struct WebhookRouter { + repos: HashMap>, +} +impl WebhookRouter { + pub fn new() -> Self { + Self::default() + } +} +impl Actor for WebhookRouter { + type Context = Context; +} + +impl Handler for WebhookRouter { + type Result = (); + + fn handle(&mut self, msg: WebhookMessage, _ctx: &mut Self::Context) -> Self::Result { + let repo_alias = RepoAlias(msg.path().clone()); + info!(?repo_alias, "router..."); + if let Some(recipient) = self.repos.get(&repo_alias) { + info!("Sending to recipient"); + recipient.do_send(msg); + } + } +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct AddWebhookRecipient(pub RepoAlias, pub Recipient); +impl Handler for WebhookRouter { + type Result = (); + + fn handle(&mut self, msg: AddWebhookRecipient, _ctx: &mut Self::Context) -> Self::Result { + self.repos.insert(msg.0, msg.1); + } +} diff --git a/src/server/actors/webhook/server.rs b/src/server/actors/webhook/server.rs new file mode 100644 index 0000000..fb34790 --- /dev/null +++ b/src/server/actors/webhook/server.rs @@ -0,0 +1,39 @@ +use actix::prelude::*; + +use tracing::info; + +use crate::server::actors::webhook::message::WebhookMessage; + +pub async fn start(address: actix::prelude::Recipient) { + // start webhook server + use warp::Filter; + // Define the Warp route to handle incoming HTTP requests + let route = warp::post() + .map(move || address.clone()) + .and(warp::path::param()) + // .and(warp::query::raw()) + // .and(warp::header::headers_cloned()) + .and(warp::body::bytes()) + .and_then( + |recipient: Recipient, + path, + // query: String, + // headers: warp::http::HeaderMap, + body: bytes::Bytes| async move { + let bytes = body.to_vec(); + let request_data = String::from_utf8_lossy(&bytes).to_string(); + let id = ulid::Ulid::new().to_string(); + info!(id, path, "Received webhook"); + let message = + WebhookMessage::new(id, path, /* query, headers, */ request_data); + recipient + .try_send(message) + .map(|_| warp::reply::with_status("OK", warp::http::StatusCode::OK)) + .map_err(|_| warp::reject::reject()) + }, + ); + + // Start the server + info!("Starting webhook server: http://0.0.0.0:8080/"); + warp::serve(route).run(([0, 0, 0, 0], 8080)).await; +} diff --git a/src/server/config.rs b/src/server/config.rs index feadd71..822da2f 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -254,7 +254,7 @@ impl From<(&ForgeName, &Forge)> for ForgeDetails { /// The alias of a repo /// This is the alias for the repo within `git-next-server.toml` -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct RepoAlias(pub String); impl Display for RepoAlias { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/src/server/mod.rs b/src/server/mod.rs index 57887a6..faa19b9 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -14,6 +14,7 @@ use tracing::{error, info, level_filters::LevelFilter}; use crate::{ filesystem::FileSystem, server::{ + actors::webhook, config::{Forge, ForgeName, RepoAlias}, git::Git, }, @@ -54,14 +55,17 @@ pub async fn start(fs: FileSystem, net: Network, git: Git) { }; info!("Config loaded"); - let addresses = config + let webhook_router = webhook::WebhookRouter::new().start(); + config .forges() .flat_map(|(forge_name, forge)| create_forge_repos(forge, forge_name, &net, &git)) .map(start_actor) - .collect::>(); + .map(|(alias, addr)| webhook::AddWebhookRecipient(alias, addr.recipient())) + .for_each(|msg| webhook_router.do_send(msg)); + let webhook_server = webhook::WebhookActor::new(webhook_router.recipient()).start(); let _ = actix_rt::signal::ctrl_c().await; info!("Ctrl-C received, shutting down..."); - drop(addresses); + drop(webhook_server); } fn create_forge_repos( @@ -104,15 +108,15 @@ fn create_actor( fn start_actor( actor: (ForgeName, RepoAlias, actors::repo::RepoActor), -) -> Addr { - let (forge_name, repo_name, actor) = actor; - let span = tracing::info_span!("Forge/Repo", %forge_name, %repo_name); +) -> (RepoAlias, Addr) { + let (forge_name, repo_alias, actor) = actor; + let span = tracing::info_span!("Forge/Repo", %forge_name, %repo_alias); let _guard = span.enter(); info!("Starting"); let addr = actor.start(); addr.do_send(actors::repo::StartRepo); info!("Started"); - addr + (repo_alias, addr) } pub fn init_logging() -> Result<(), tracing::subscriber::SetGlobalDefaultError> {