feat(server/webhook): add webhook server
Some checks failed
ci/woodpecker/cron/push-next Pipeline was successful
ci/woodpecker/cron/tag-created Pipeline was successful
ci/woodpecker/cron/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline failed

Closes kemitix/git-next#18
This commit is contained in:
Paul Campbell 2024-04-13 16:16:09 +01:00
parent 4c4ac4df25
commit df2d9d684c
11 changed files with 198 additions and 11 deletions

View file

@ -40,6 +40,11 @@ secrecy = "0.8"
# Conventional Commit check # Conventional Commit check
git-conventional = "0.12" git-conventional = "0.12"
# Webhooks
bytes = "1.6"
ulid = "1.1"
warp = "0.3"
# error handling # error handling
terrors = "0.3" terrors = "0.3"

View file

@ -1 +1,2 @@
pub mod repo; pub mod repo;
pub mod webhook;

View file

@ -145,7 +145,7 @@ fn reset_next_to_main(
} }
async fn revalidate_positions(addr: Addr<RepoActor>) { async fn revalidate_positions(addr: Addr<RepoActor>) {
// TODO : (#18) sleep and restart while we don't have webhooks // TODO : (#43) sleep and restart while we don't have webhooks
tokio::time::sleep(std::time::Duration::from_secs(10)).await; tokio::time::sleep(std::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo) addr.do_send(ValidateRepo)
} }

View file

@ -37,7 +37,7 @@ pub async fn check_next(
warn!("Checks have failed"); warn!("Checks have failed");
} }
} }
// TODO : (#18) sleep and restart while we don't have webhooks // TODO : (#43) sleep and restart while we don't have webhooks
tokio::time::sleep(std::time::Duration::from_secs(10)).await; tokio::time::sleep(std::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo); addr.do_send(ValidateRepo);
} }

View file

@ -1,6 +1,12 @@
use actix::prelude::*;
use tracing::info;
use std::ops::Deref; use std::ops::Deref;
use crate::server::actors::repo::ValidateRepo; use crate::server::actors::{
repo::{RepoActor, ValidateRepo},
webhook::WebhookMessage,
};
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct WebhookId(String); pub struct WebhookId(String);
@ -36,3 +42,14 @@ pub async fn register(
tokio::time::sleep(std::time::Duration::from_secs(10)).await; tokio::time::sleep(std::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo); addr.do_send(ValidateRepo);
} }
impl Handler<WebhookMessage> for RepoActor {
type Result = ();
fn handle(&mut self, msg: WebhookMessage, _ctx: &mut Self::Context) -> Self::Result {
let id = msg.id();
let body = msg.body();
info!(?id, ?body, "RepoActor received message");
// TODO: (#43) do something with the message
}
}

View file

@ -0,0 +1,44 @@
//
use actix::prelude::*;
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
pub struct WebhookMessage {
id: String,
path: String,
// query: String,
// headers: warp::http::HeaderMap,
body: String,
}
impl WebhookMessage {
pub const fn new(
id: String,
path: String,
// query: String,
// headers: warp::http::HeaderMap,
body: String,
) -> Self {
Self {
id,
path,
// query,
// headers,
body,
}
}
pub const fn id(&self) -> &String {
&self.id
}
pub const fn path(&self) -> &String {
&self.path
}
// pub const fn query(&self) -> &String {
// &self.query
// }
// pub const fn headers(&self) -> &warp::http::HeaderMap {
// &self.headers
// }
pub const fn body(&self) -> &String {
&self.body
}
}

View file

@ -0,0 +1,34 @@
// crate::server::actors::webhook
mod message;
mod router;
mod server;
use actix::prelude::*;
pub use message::WebhookMessage;
pub use router::AddWebhookRecipient;
pub use router::WebhookRouter;
#[derive(Debug)]
pub struct WebhookActor {
spawn_handle: Option<actix::SpawnHandle>,
message_receiver: Recipient<WebhookMessage>,
}
impl WebhookActor {
pub const fn new(message_receiver: Recipient<WebhookMessage>) -> Self {
Self {
message_receiver,
spawn_handle: None,
}
}
}
impl Actor for WebhookActor {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let address: Recipient<WebhookMessage> = self.message_receiver.clone();
let server = server::start(address);
let spawn_handle = ctx.spawn(server.into_actor(self));
self.spawn_handle.replace(spawn_handle);
}
}

View file

@ -0,0 +1,43 @@
use std::collections::HashMap;
use actix::prelude::*;
use tracing::info;
use crate::server::{actors::webhook::message::WebhookMessage, config::RepoAlias};
#[derive(Default)]
pub struct WebhookRouter {
repos: HashMap<RepoAlias, Recipient<WebhookMessage>>,
}
impl WebhookRouter {
pub fn new() -> Self {
Self::default()
}
}
impl Actor for WebhookRouter {
type Context = Context<Self>;
}
impl Handler<WebhookMessage> for WebhookRouter {
type Result = ();
fn handle(&mut self, msg: WebhookMessage, _ctx: &mut Self::Context) -> Self::Result {
let repo_alias = RepoAlias(msg.path().clone());
info!(?repo_alias, "router...");
if let Some(recipient) = self.repos.get(&repo_alias) {
info!("Sending to recipient");
recipient.do_send(msg);
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct AddWebhookRecipient(pub RepoAlias, pub Recipient<WebhookMessage>);
impl Handler<AddWebhookRecipient> for WebhookRouter {
type Result = ();
fn handle(&mut self, msg: AddWebhookRecipient, _ctx: &mut Self::Context) -> Self::Result {
self.repos.insert(msg.0, msg.1);
}
}

View file

@ -0,0 +1,39 @@
use actix::prelude::*;
use tracing::info;
use crate::server::actors::webhook::message::WebhookMessage;
pub async fn start(address: actix::prelude::Recipient<super::message::WebhookMessage>) {
// start webhook server
use warp::Filter;
// Define the Warp route to handle incoming HTTP requests
let route = warp::post()
.map(move || address.clone())
.and(warp::path::param())
// .and(warp::query::raw())
// .and(warp::header::headers_cloned())
.and(warp::body::bytes())
.and_then(
|recipient: Recipient<WebhookMessage>,
path,
// query: String,
// headers: warp::http::HeaderMap,
body: bytes::Bytes| async move {
let bytes = body.to_vec();
let request_data = String::from_utf8_lossy(&bytes).to_string();
let id = ulid::Ulid::new().to_string();
info!(id, path, "Received webhook");
let message =
WebhookMessage::new(id, path, /* query, headers, */ request_data);
recipient
.try_send(message)
.map(|_| warp::reply::with_status("OK", warp::http::StatusCode::OK))
.map_err(|_| warp::reject::reject())
},
);
// Start the server
info!("Starting webhook server: http://0.0.0.0:8080/");
warp::serve(route).run(([0, 0, 0, 0], 8080)).await;
}

View file

@ -254,7 +254,7 @@ impl From<(&ForgeName, &Forge)> for ForgeDetails {
/// The alias of a repo /// The alias of a repo
/// This is the alias for the repo within `git-next-server.toml` /// This is the alias for the repo within `git-next-server.toml`
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RepoAlias(pub String); pub struct RepoAlias(pub String);
impl Display for RepoAlias { impl Display for RepoAlias {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {

View file

@ -14,6 +14,7 @@ use tracing::{error, info, level_filters::LevelFilter};
use crate::{ use crate::{
filesystem::FileSystem, filesystem::FileSystem,
server::{ server::{
actors::webhook,
config::{Forge, ForgeName, RepoAlias}, config::{Forge, ForgeName, RepoAlias},
git::Git, git::Git,
}, },
@ -54,14 +55,17 @@ pub async fn start(fs: FileSystem, net: Network, git: Git) {
}; };
info!("Config loaded"); info!("Config loaded");
let addresses = config let webhook_router = webhook::WebhookRouter::new().start();
config
.forges() .forges()
.flat_map(|(forge_name, forge)| create_forge_repos(forge, forge_name, &net, &git)) .flat_map(|(forge_name, forge)| create_forge_repos(forge, forge_name, &net, &git))
.map(start_actor) .map(start_actor)
.collect::<Vec<_>>(); .map(|(alias, addr)| webhook::AddWebhookRecipient(alias, addr.recipient()))
.for_each(|msg| webhook_router.do_send(msg));
let webhook_server = webhook::WebhookActor::new(webhook_router.recipient()).start();
let _ = actix_rt::signal::ctrl_c().await; let _ = actix_rt::signal::ctrl_c().await;
info!("Ctrl-C received, shutting down..."); info!("Ctrl-C received, shutting down...");
drop(addresses); drop(webhook_server);
} }
fn create_forge_repos( fn create_forge_repos(
@ -104,15 +108,15 @@ fn create_actor(
fn start_actor( fn start_actor(
actor: (ForgeName, RepoAlias, actors::repo::RepoActor), actor: (ForgeName, RepoAlias, actors::repo::RepoActor),
) -> Addr<actors::repo::RepoActor> { ) -> (RepoAlias, Addr<actors::repo::RepoActor>) {
let (forge_name, repo_name, actor) = actor; let (forge_name, repo_alias, actor) = actor;
let span = tracing::info_span!("Forge/Repo", %forge_name, %repo_name); let span = tracing::info_span!("Forge/Repo", %forge_name, %repo_alias);
let _guard = span.enter(); let _guard = span.enter();
info!("Starting"); info!("Starting");
let addr = actor.start(); let addr = actor.start();
addr.do_send(actors::repo::StartRepo); addr.do_send(actors::repo::StartRepo);
info!("Started"); info!("Started");
addr (repo_alias, addr)
} }
pub fn init_logging() -> Result<(), tracing::subscriber::SetGlobalDefaultError> { pub fn init_logging() -> Result<(), tracing::subscriber::SetGlobalDefaultError> {