git-next/crates/cli/src/server/actors/repo/webhook.rs

328 lines
11 KiB
Rust
Raw Normal View History

use actix::prelude::*;
use kxio::network::{self, json};
use tracing::{debug, info, warn};
use ulid::DecodeError;
use std::{collections::HashMap, fmt::Display, ops::Deref, str::FromStr};
use crate::server::{
actors::{
repo::{RepoActor, ValidateRepo, WebhookRegistered},
webhook::WebhookMessage,
},
config::{RepoBranches, Webhook, WebhookUrl},
gitforge,
};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WebhookId(String);
impl WebhookId {
#[allow(dead_code)]
pub const fn new(id: String) -> Self {
Self(id)
}
}
impl Deref for WebhookId {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Display for WebhookId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WebhookAuth(ulid::Ulid);
impl WebhookAuth {
pub fn from_str(authorisation: &str) -> Result<Self, DecodeError> {
let id = ulid::Ulid::from_str(authorisation);
Ok(Self(id?))
}
fn generate() -> Self {
Self(ulid::Ulid::new())
}
fn header_value(&self) -> String {
format!("Basic {}", self.0.to_string())
}
}
impl Deref for WebhookAuth {
type Target = ulid::Ulid;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[tracing::instrument(skip_all, fields(%webhook_id))]
pub async fn unregister(
webhook_id: WebhookId,
repo_details: crate::server::config::RepoDetails,
net: network::Network,
) {
let hostname = &repo_details.forge.hostname;
let repo_path = repo_details.repo_path;
use secrecy::ExposeSecret;
let token = repo_details.forge.token.expose_secret();
let url = network::NetUrl::new(format!(
"https://{hostname}/api/v1/repos/{repo_path}/hooks/{webhook_id}?token={token}"
));
let request = network::NetRequest::new(
network::RequestMethod::Delete,
url,
network::NetRequestHeaders::new(),
network::RequestBody::None,
network::ResponseType::None,
None,
network::NetRequestLogging::None,
);
let result = net.delete(request).await;
match result {
Ok(_) => info!("unregistered webhook"),
Err(err) => warn!(?err, "Failed to unregister webhook"),
}
}
#[tracing::instrument(skip_all)]
pub async fn register(
repo_details: crate::server::config::RepoDetails,
webhook: Webhook,
addr: actix::prelude::Addr<super::RepoActor>,
net: network::Network,
) {
let Some(repo_config) = repo_details.repo_config.clone() else {
return;
};
let webhook_url = webhook.url();
// remove any lingering webhooks for the same URL
let existing_webhook_ids = find_existing_webhooks(&repo_details, &webhook_url, &net).await;
for webhook_id in existing_webhook_ids {
unregister(webhook_id, repo_details.clone(), net.clone()).await;
}
let hostname = &repo_details.forge.hostname;
let repo_path = repo_details.repo_path;
use secrecy::ExposeSecret;
let token = repo_details.forge.token.expose_secret();
let url = network::NetUrl::new(format!(
"https://{hostname}/api/v1/repos/{repo_path}/hooks?token={token}"
));
let repo_alias = &repo_details.repo_alias;
let headers = network::NetRequestHeaders::new().with("Content-Type", "application/json");
let authorisation = WebhookAuth::generate();
let body = json!({
"active": true,
"authorization_header": authorisation.header_value(),
"branch_filter": format!("{{{},{},{}}}", repo_config.branches().main(), repo_config.branches().next(), repo_config.branches().dev()),
"config": {
"content_type": "json",
"url": format!("{}/{}", webhook_url.as_ref(), repo_alias),
},
"events": [ "push" ],
"type": "forgejo"
});
let request = network::NetRequest::new(
network::RequestMethod::Post,
url,
headers,
network::RequestBody::Json(body),
network::ResponseType::Json,
None,
network::NetRequestLogging::None,
);
let result = net.post_json::<Hook>(request).await;
match result {
Ok(response) => {
if let Some(hook) = response.response_body() {
info!(webhook_id = %hook.id, "Webhook registered");
addr.do_send(WebhookRegistered(hook.id(), authorisation));
}
}
Err(_) => warn!("Failed to register webhook"),
}
}
async fn find_existing_webhooks(
repo_details: &crate::server::config::RepoDetails,
webhook_url: &WebhookUrl,
net: &network::Network,
) -> Vec<WebhookId> {
let mut ids: Vec<WebhookId> = vec![];
let hostname = &repo_details.forge.hostname;
let repo_path = &repo_details.repo_path;
let mut page = 1;
loop {
use secrecy::ExposeSecret;
let token = &repo_details.forge.token.expose_secret();
let url =
format!("https://{hostname}/api/v1/repos/{repo_path}/hooks?page={page}&token={token}");
let net_url = network::NetUrl::new(url);
let request = network::NetRequest::new(
network::RequestMethod::Get,
net_url,
network::NetRequestHeaders::new(),
network::RequestBody::None,
network::ResponseType::Json,
None,
network::NetRequestLogging::None,
);
let result = net.get::<Vec<Hook>>(request).await;
if let Ok(response) = result {
if let Some(list) = response.response_body() {
if list.is_empty() {
return ids;
}
for hook in list {
if let Some(existing_url) = hook.config.get("url") {
if existing_url.starts_with(webhook_url.as_ref()) {
ids.push(hook.id());
}
}
}
}
}
page += 1;
}
}
#[derive(Debug, serde::Deserialize)]
struct Hook {
id: i64,
config: HashMap<String, String>,
}
impl Hook {
fn id(&self) -> WebhookId {
WebhookId(format!("{}", self.id))
}
}
impl Handler<WebhookMessage> for RepoActor {
type Result = ();
#[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity
#[tracing::instrument(name = "RepoActor::WebhookMessage", skip_all, fields(token = %self.message_token, repo = %self.details))]
fn handle(&mut self, msg: WebhookMessage, ctx: &mut Self::Context) -> Self::Result {
if msg.authorisation() != self.webhook_auth {
return; // invalid auth
}
let id = msg.id();
let span = tracing::info_span!("handle", %id);
let _guard = span.enter();
let body = msg.body();
debug!(%id, "RepoActor received message");
match serde_json::from_str::<Push>(body) {
Err(err) => debug!(?err, %body, "Not a 'push'"),
Ok(push) => {
if let Some(config) = &self.details.repo_config {
match push.branch(config.branches()) {
None => warn!(
?push,
"Unrecognised branch, we should be filtering to only the ones we want"
),
Some(branch) => {
match branch {
Branch::Main => {
if self.last_main_commit == Some(push.commit()) {
info!(
branch = %config.branches().main(),
commit = %push.commit(),
"Ignoring - already aware of branch at commit",
);
return;
}
self.last_main_commit.replace(push.commit())
}
Branch::Next => {
if self.last_next_commit == Some(push.commit()) {
info!(
branch = %config.branches().next(),
commit = %push.commit(),
"Ignoring - already aware of branch at commit",
);
return;
}
self.last_next_commit.replace(push.commit())
}
Branch::Dev => {
if self.last_dev_commit == Some(push.commit()) {
info!(
branch = %config.branches().dev(),
commit = %push.commit(),
"Ignoring - already aware of branch at commit",
);
return;
}
self.last_dev_commit.replace(push.commit())
}
};
let message_token = self.message_token.next();
info!(
token = %message_token,
?branch,
commit = %push.commit(),
"New commit"
);
ctx.address().do_send(ValidateRepo { message_token });
}
}
}
}
}
}
}
#[derive(Debug, serde::Deserialize)]
struct Push {
#[serde(rename = "ref")]
reference: String,
after: String,
head_commit: HeadCommit,
}
impl Push {
pub fn branch(&self, repo_branches: &RepoBranches) -> Option<Branch> {
if !self.reference.starts_with("refs/heads/") {
warn!(r#ref = self.reference, "Unexpected ref");
return None;
}
let (_, branch) = self.reference.split_at(11);
if branch == *repo_branches.main() {
return Some(Branch::Main);
}
if branch == *repo_branches.next() {
return Some(Branch::Next);
}
if branch == *repo_branches.dev() {
return Some(Branch::Dev);
}
warn!(branch, "Unexpected branch");
None
}
pub fn commit(&self) -> gitforge::Commit {
gitforge::Commit::new(&self.after, &self.head_commit.message)
}
}
#[derive(Debug)]
enum Branch {
Main,
Next,
Dev,
}
#[derive(Debug, serde::Deserialize)]
struct HeadCommit {
message: String,
}
#[cfg(test)]
mod tests {
#[test]
fn splt_ref() {
assert_eq!("refs/heads/next".split_at(11), ("refs/heads/", "next"));
}
}