use actix::prelude::*; use kxio::network::{self, json}; use tracing::{debug, info, warn}; use ulid::DecodeError; use std::{collections::HashMap, fmt::Display, ops::Deref, str::FromStr}; use crate::server::{ actors::{ repo::{RepoActor, ValidateRepo, WebhookRegistered}, webhook::WebhookMessage, }, config::{RepoBranches, Webhook, WebhookUrl}, forge, }; #[derive(Clone, Debug, PartialEq, Eq)] pub struct WebhookId(String); impl WebhookId { #[allow(dead_code)] pub const fn new(id: String) -> Self { Self(id) } } impl Deref for WebhookId { type Target = String; fn deref(&self) -> &Self::Target { &self.0 } } impl Display for WebhookId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } } #[derive(Clone, Debug, PartialEq, Eq)] pub struct WebhookAuth(ulid::Ulid); impl WebhookAuth { pub fn from_str(authorisation: &str) -> Result { let id = ulid::Ulid::from_str(authorisation); Ok(Self(id?)) } fn generate() -> Self { Self(ulid::Ulid::new()) } fn header_value(&self) -> String { format!("Basic {}", self.0.to_string()) } } impl Deref for WebhookAuth { type Target = ulid::Ulid; fn deref(&self) -> &Self::Target { &self.0 } } pub async fn unregister( webhook_id: WebhookId, repo_details: crate::server::config::RepoDetails, net: network::Network, ) { info!(?webhook_id, "unregister webhook"); let hostname = &repo_details.forge.hostname; let path = repo_details.repo; use secrecy::ExposeSecret; let token = repo_details.forge.token.expose_secret(); let url = network::NetUrl::new(format!( "https://{hostname}/api/v1/repos/{path}/hooks/{webhook_id}?token={token}" )); let request = network::NetRequest::new( network::RequestMethod::Delete, url, network::NetRequestHeaders::new(), network::RequestBody::None, network::ResponseType::None, None, network::NetRequestLogging::None, ); let result = net.delete(request).await; match result { Ok(_) => info!(?webhook_id, "unregistered webhook"), Err(err) => warn!(?webhook_id, ?err, "Failed to unregister webhook"), } } pub async fn register( repo_details: crate::server::config::RepoDetails, webhook: Webhook, addr: actix::prelude::Addr, net: network::Network, ) { let Some(repo_config) = repo_details.config.clone() else { return; }; let webhook_url = webhook.url(); // remove any lingering webhooks for the same URL let existing_webhook_ids = find_existing_webhooks(&repo_details, &webhook_url, &net).await; for webhook_id in existing_webhook_ids { unregister(webhook_id, repo_details.clone(), net.clone()).await; } info!("Registering webhook"); let hostname = &repo_details.forge.hostname; let path = repo_details.repo; use secrecy::ExposeSecret; let token = repo_details.forge.token.expose_secret(); let url = network::NetUrl::new(format!( "https://{hostname}/api/v1/repos/{path}/hooks?token={token}" )); let repo_alias = &repo_details.name; let headers = network::NetRequestHeaders::new().with("Content-Type", "application/json"); let authorisation = WebhookAuth::generate(); let body = json!({ "active": true, "authorization_header": authorisation.header_value(), "branch_filter": format!("{{{},{},{}}}", repo_config.branches().main(), repo_config.branches().next(), repo_config.branches().dev()), "config": { "content_type": "json", "url": format!("{}/{}", webhook_url.as_ref(), repo_alias), }, "events": [ "push" ], "type": "forgejo" }); let request = network::NetRequest::new( network::RequestMethod::Post, url, headers, network::RequestBody::Json(body), network::ResponseType::Json, None, network::NetRequestLogging::None, ); let result = net.post_json::(request).await; match result { Ok(response) => { if let Some(hook) = response.response_body() { info!("Webhook registered"); addr.do_send(WebhookRegistered(hook.id(), authorisation)); } } Err(_) => warn!("Failed to register webhook"), } } async fn find_existing_webhooks( repo_details: &crate::server::config::RepoDetails, webhook_url: &WebhookUrl, net: &network::Network, ) -> Vec { let mut ids: Vec = vec![]; let hostname = &repo_details.forge.hostname; let path = &repo_details.repo; let mut page = 1; loop { use secrecy::ExposeSecret; let token = &repo_details.forge.token.expose_secret(); let url = format!("https://{hostname}/api/v1/repos/{path}/hooks?page={page}&token={token}"); let net_url = network::NetUrl::new(url); let request = network::NetRequest::new( network::RequestMethod::Get, net_url, network::NetRequestHeaders::new(), network::RequestBody::None, network::ResponseType::Json, None, network::NetRequestLogging::None, ); let result = net.get::>(request).await; if let Ok(response) = result { if let Some(list) = response.response_body() { if list.is_empty() { return ids; } for hook in list { if let Some(existing_url) = hook.config.get("url") { if existing_url.starts_with(webhook_url.as_ref()) { ids.push(hook.id()); } } } } } page += 1; } } #[derive(Debug, serde::Deserialize)] struct Hook { id: i64, config: HashMap, } impl Hook { fn id(&self) -> WebhookId { WebhookId(format!("{}", self.id)) } } impl Handler for RepoActor { type Result = (); #[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity fn handle(&mut self, msg: WebhookMessage, ctx: &mut Self::Context) -> Self::Result { if msg.authorisation() != self.webhook_auth { return; // invalid auth } let id = msg.id(); let body = msg.body(); debug!(?id, "RepoActor received message"); match serde_json::from_str::(body) { Err(err) => debug!(?err, %body, "Not a 'push'"), Ok(push) => { if let Some(config) = &self.details.config { match push.branch(config.branches()) { None => warn!( ?push, "Unrecognised branch, we should be filtering to only the ones we want" ), Some(branch) => { match branch { Branch::Main => { if self.last_main_commit == Some(push.commit()) { info!( ?id, "Ignoring - already aware of branch '{}' at commit '{}'", config.branches().main(), push.commit() ); return; } self.last_main_commit.replace(push.commit()) } Branch::Next => { if self.last_next_commit == Some(push.commit()) { info!( ?id, "Ignoring - already aware of branch '{}' at commit '{}'", config.branches().next(), push.commit() ); return; } self.last_next_commit.replace(push.commit()) } Branch::Dev => { if self.last_dev_commit == Some(push.commit()) { info!( ?id, "Ignoring - already aware of branch '{}' at commit '{}'", config.branches().dev(), push.commit() ); return; } self.last_dev_commit.replace(push.commit()) } }; info!( ?branch, commit = %push.commit(), "New commit for branch - assessing branch positions" ); ctx.address().do_send(ValidateRepo); } } } } } } } #[derive(Debug, serde::Deserialize)] struct Push { #[serde(rename = "ref")] reference: String, after: String, head_commit: HeadCommit, } impl Push { pub fn branch(&self, repo_branches: &RepoBranches) -> Option { if !self.reference.starts_with("refs/heads/") { warn!(r#ref = self.reference, "Unexpected ref"); return None; } let (_, branch) = self.reference.split_at(11); if branch == *repo_branches.main() { return Some(Branch::Main); } if branch == *repo_branches.next() { return Some(Branch::Next); } if branch == *repo_branches.dev() { return Some(Branch::Dev); } warn!(branch, "Unexpected branch"); None } pub fn commit(&self) -> forge::Commit { forge::Commit::new(&self.after, &self.head_commit.message) } } #[derive(Debug)] enum Branch { Main, Next, Dev, } #[derive(Debug, serde::Deserialize)] struct HeadCommit { message: String, } #[cfg(test)] mod tests { #[test] fn splt_ref() { assert_eq!("refs/heads/next".split_at(11), ("refs/heads/", "next")); } }