feat(server/webhook): implement register webhook
Some checks failed
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline failed

Closes kemitix/git-next#15
This commit is contained in:
Paul Campbell 2024-04-14 06:48:26 +01:00
parent e7060800eb
commit dd91aa4f69
7 changed files with 116 additions and 54 deletions

View file

@ -4,7 +4,6 @@ use kxio::network::{self, Network};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use crate::server::{ use crate::server::{
actors::repo::ValidateRepo,
config::{self, RepoConfig, RepoDetails}, config::{self, RepoConfig, RepoDetails},
forge::{self, CommitHistories}, forge::{self, CommitHistories},
git::Git, git::Git,
@ -27,7 +26,6 @@ pub async fn validate_positions(
Ok(commit_histories) => commit_histories, Ok(commit_histories) => commit_histories,
Err(err) => { Err(err) => {
error!(?err, "Failed to get commit histories"); error!(?err, "Failed to get commit histories");
revalidate_positions(addr).await;
return; return;
} }
}; };
@ -35,20 +33,17 @@ pub async fn validate_positions(
// Validations // Validations
let Some(main) = commit_histories.main.first().cloned() else { let Some(main) = commit_histories.main.first().cloned() else {
warn!("No commits on main branch '{}'", config.branches().main()); warn!("No commits on main branch '{}'", config.branches().main());
revalidate_positions(addr).await;
return; return;
}; };
// verify that next is an ancestor of dev, and force update to it main if it isn't // verify that next is an ancestor of dev, and force update to it main if it isn't
let Some(next) = commit_histories.next.first().cloned() else { let Some(next) = commit_histories.next.first().cloned() else {
warn!("No commits on next branch '{}", config.branches().next()); warn!("No commits on next branch '{}", config.branches().next());
revalidate_positions(addr).await;
return; return;
}; };
let next_is_ancestor_of_dev = commit_histories.dev.iter().any(|dev| dev == &next); let next_is_ancestor_of_dev = commit_histories.dev.iter().any(|dev| dev == &next);
if !next_is_ancestor_of_dev { if !next_is_ancestor_of_dev {
info!("Next is not an ancestor of dev - resetting next to main"); info!("Next is not an ancestor of dev - resetting next to main");
reset_next_to_main(next, main, &repo_details, &config, &git); reset_next_to_main(next, main, &repo_details, &config, &git);
revalidate_positions(addr).await;
return; return;
} }
@ -64,12 +59,10 @@ pub async fn validate_positions(
config.branches().next() config.branches().next()
); );
reset_next_to_main(next, main, &repo_details, &config, &git); reset_next_to_main(next, main, &repo_details, &config, &git);
revalidate_positions(addr).await;
return; return;
} }
let Some(next) = next_commits.first().cloned() else { let Some(next) = next_commits.first().cloned() else {
warn!("No commits on next branch '{}'", config.branches().next()); warn!("No commits on next branch '{}'", config.branches().next());
revalidate_positions(addr).await;
return; return;
}; };
let dev_has_next = commit_histories let dev_has_next = commit_histories
@ -82,7 +75,6 @@ pub async fn validate_positions(
config.branches().dev(), config.branches().dev(),
config.branches().next() config.branches().next()
); );
revalidate_positions(addr).await;
return; // dev is not based on next return; // dev is not based on next
} }
let dev_has_main = commit_histories.dev.iter().any(|commit| commit == &main); let dev_has_main = commit_histories.dev.iter().any(|commit| commit == &main);
@ -93,12 +85,10 @@ pub async fn validate_positions(
config.branches().main(), config.branches().main(),
config.branches().main(), config.branches().main(),
); );
revalidate_positions(addr).await;
return; return;
} }
let Some(dev) = commit_histories.dev.first().cloned() else { let Some(dev) = commit_histories.dev.first().cloned() else {
warn!("No commits on dev branch '{}'", config.branches().dev()); warn!("No commits on dev branch '{}'", config.branches().dev());
revalidate_positions(addr).await;
return; return;
}; };
addr.do_send(StartMonitoring { addr.do_send(StartMonitoring {
@ -144,12 +134,6 @@ fn reset_next_to_main(
} }
} }
async fn revalidate_positions(addr: Addr<RepoActor>) {
// TODO : (#43) sleep and restart while we don't have webhooks
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo)
}
// advance next to the next commit towards the head of the dev branch // advance next to the next commit towards the head of the dev branch
#[tracing::instrument(fields(next), skip_all)] #[tracing::instrument(fields(next), skip_all)]
pub async fn advance_next( pub async fn advance_next(
@ -157,18 +141,15 @@ pub async fn advance_next(
dev_commit_history: Vec<forge::Commit>, dev_commit_history: Vec<forge::Commit>,
repo_details: config::RepoDetails, repo_details: config::RepoDetails,
repo_config: config::RepoConfig, repo_config: config::RepoConfig,
addr: Addr<RepoActor>,
git: Git, git: Git,
) { ) {
let next_commit = find_next_commit_on_dev(next, dev_commit_history); let next_commit = find_next_commit_on_dev(next, dev_commit_history);
let Some(commit) = next_commit else { let Some(commit) = next_commit else {
warn!("No commits to advance next to"); warn!("No commits to advance next to");
revalidate_positions(addr).await;
return; return;
}; };
if let Some(problem) = validate_commit_message(commit.message()) { if let Some(problem) = validate_commit_message(commit.message()) {
warn!("Can't advance next to commit '{}': {}", commit, problem); warn!("Can't advance next to commit '{}': {}", commit, problem);
revalidate_positions(addr).await;
return; return;
} }
info!("Advancing next to commit '{}'", commit); info!("Advancing next to commit '{}'", commit);
@ -180,7 +161,6 @@ pub async fn advance_next(
) { ) {
warn!(?err, "Failed") warn!(?err, "Failed")
}; };
revalidate_positions(addr).await;
} }
#[tracing::instrument] #[tracing::instrument]

View file

@ -8,7 +8,7 @@ use kxio::network::Network;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::server::{ use crate::server::{
config::{RepoConfig, RepoDetails}, config::{RepoConfig, RepoDetails, Webhook},
forge, forge,
git::Git, git::Git,
}; };
@ -17,17 +17,24 @@ use self::webhook::WebhookId;
pub struct RepoActor { pub struct RepoActor {
details: RepoDetails, details: RepoDetails,
config: Option<RepoConfig>, // INFO: if [None] then send [StartRepo] to populate it webhook: Webhook,
webhook_id: Option<WebhookId>, // INFO: if [None] then no webhook is configured webhook_id: Option<WebhookId>, // INFO: if [None] then no webhook is configured
webhook_auth: Option<ulid::Ulid>,
net: Network, net: Network,
git: Git, git: Git,
} }
impl RepoActor { impl RepoActor {
pub(crate) const fn new(details: RepoDetails, net: Network, git: Git) -> Self { pub(crate) const fn new(
details: RepoDetails,
webhook: Webhook,
net: Network,
git: Git,
) -> Self {
Self { Self {
details, details,
config: None, webhook,
webhook_id: None, webhook_id: None,
webhook_auth: None,
net, net,
git, git,
} }
@ -72,7 +79,18 @@ impl Handler<LoadedConfig> for RepoActor {
fn handle(&mut self, msg: LoadedConfig, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: LoadedConfig, ctx: &mut Self::Context) -> Self::Result {
let config = msg.0; let config = msg.0;
info!(%self.details, %config, "Config loaded"); info!(%self.details, %config, "Config loaded");
self.config.replace(config); self.details.config.replace(config);
if self.webhook_id.is_none() {
info!("lets register the webhook...");
webhook::register(
self.details.clone(),
self.webhook.clone(),
ctx.address(),
self.net.clone(),
)
.into_actor(self)
.wait(ctx);
}
ctx.address().do_send(ValidateRepo); ctx.address().do_send(ValidateRepo);
} }
} }
@ -83,7 +101,7 @@ pub struct ValidateRepo;
impl Handler<ValidateRepo> for RepoActor { impl Handler<ValidateRepo> for RepoActor {
type Result = (); type Result = ();
fn handle(&mut self, _msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result {
if let Some(repo_config) = self.config.clone() { if let Some(repo_config) = self.details.config.clone() {
let repo_details = self.details.clone(); let repo_details = self.details.clone();
let addr = ctx.address(); let addr = ctx.address();
let net = self.net.clone(); let net = self.net.clone();
@ -106,7 +124,7 @@ pub struct StartMonitoring {
impl Handler<StartMonitoring> for RepoActor { impl Handler<StartMonitoring> for RepoActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result {
let Some(repo_config) = self.config.clone() else { let Some(repo_config) = self.details.config.clone() else {
warn!("No config loaded"); warn!("No config loaded");
return; return;
}; };
@ -116,6 +134,7 @@ impl Handler<StartMonitoring> for RepoActor {
info!(%msg.main, %msg.next, %msg.dev, next_ahead_of_main, dev_ahead_of_next, "StartMonitoring"); info!(%msg.main, %msg.next, %msg.dev, next_ahead_of_main, dev_ahead_of_next, "StartMonitoring");
let repo_details = self.details.clone(); let repo_details = self.details.clone();
let webhook = self.webhook.clone();
let addr = ctx.address(); let addr = ctx.address();
let net = self.net.clone(); let net = self.net.clone();
let git = self.git.clone(); let git = self.git.clone();
@ -130,13 +149,12 @@ impl Handler<StartMonitoring> for RepoActor {
msg.dev_commit_history, msg.dev_commit_history,
repo_details, repo_details,
repo_config, repo_config,
addr,
git, git,
) )
.into_actor(self) .into_actor(self)
.wait(ctx); .wait(ctx);
} else if self.webhook_id.is_none() { } else if self.webhook_id.is_none() {
webhook::register(repo_details, addr, net) webhook::register(repo_details, webhook, addr, net)
.into_actor(self) .into_actor(self)
.wait(ctx); .wait(ctx);
} }
@ -145,11 +163,12 @@ impl Handler<StartMonitoring> for RepoActor {
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct WebhookRegistered(pub WebhookId); pub struct WebhookRegistered(pub WebhookId, pub ulid::Ulid);
impl Handler<WebhookRegistered> for RepoActor { impl Handler<WebhookRegistered> for RepoActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: WebhookRegistered, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: WebhookRegistered, _ctx: &mut Self::Context) -> Self::Result {
self.webhook_id.replace(msg.0); self.webhook_id.replace(msg.0);
self.webhook_auth.replace(msg.1);
} }
} }
@ -160,7 +179,7 @@ impl Handler<AdvanceMainTo> for RepoActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: AdvanceMainTo, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: AdvanceMainTo, ctx: &mut Self::Context) -> Self::Result {
let repo_details = self.details.clone(); let repo_details = self.details.clone();
let Some(repo_config) = self.config.clone() else { let Some(repo_config) = self.details.config.clone() else {
warn!("No config loaded"); warn!("No config loaded");
return; return;
}; };

View file

@ -3,7 +3,6 @@ use gix::trace::warn;
use tracing::info; use tracing::info;
use crate::server::{ use crate::server::{
actors::repo::ValidateRepo,
config::{self, ForgeType}, config::{self, ForgeType},
forge, forge,
}; };
@ -37,9 +36,6 @@ pub async fn check_next(
warn!("Checks have failed"); warn!("Checks have failed");
} }
} }
// TODO : (#43) sleep and restart while we don't have webhooks
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo);
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -1,12 +1,15 @@
use actix::prelude::*; use actix::prelude::*;
use kxio::network; use kxio::network::{self, json};
use tracing::{info, warn}; use tracing::{info, warn};
use std::{fmt::Display, ops::Deref}; use std::{fmt::Display, ops::Deref};
use crate::server::actors::{ use crate::server::{
repo::{RepoActor, ValidateRepo}, actors::{
webhook::WebhookMessage, repo::{RepoActor, WebhookRegistered},
webhook::WebhookMessage,
},
config::Webhook,
}; };
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
@ -37,8 +40,10 @@ pub async fn unregister(
info!(?webhook_id, "unregister webhook"); info!(?webhook_id, "unregister webhook");
let hostname = &repo_details.forge.hostname; let hostname = &repo_details.forge.hostname;
let path = repo_details.repo; let path = repo_details.repo;
use secrecy::ExposeSecret;
let token = repo_details.forge.token.expose_secret();
let url = network::NetUrl::new(format!( let url = network::NetUrl::new(format!(
"https://{hostname}/api/v1/repos/{path}/hooks/{webhook_id}" "https://{hostname}/api/v1/repos/{path}/hooks/{webhook_id}?token={token}"
)); ));
let request = network::NetRequest::new( let request = network::NetRequest::new(
network::RequestMethod::Delete, network::RequestMethod::Delete,
@ -57,13 +62,66 @@ pub async fn unregister(
} }
pub async fn register( pub async fn register(
_repo_details: crate::server::config::RepoDetails, repo_details: crate::server::config::RepoDetails,
webhook: Webhook,
addr: actix::prelude::Addr<super::RepoActor>, addr: actix::prelude::Addr<super::RepoActor>,
_net: network::Network, net: network::Network,
) { ) {
// TODO: (#15) register webhook - on success send webhook id to RepoActor let Some(repo_config) = repo_details.config else {
tokio::time::sleep(std::time::Duration::from_secs(10)).await; return;
addr.do_send(ValidateRepo); };
info!("Registering webhook");
let hostname = &repo_details.forge.hostname;
let path = repo_details.repo;
use secrecy::ExposeSecret;
let token = repo_details.forge.token.expose_secret();
let url = network::NetUrl::new(format!(
"https://{hostname}/api/v1/repos/{path}/hooks?token={token}"
));
let webhook_url = webhook.url();
let repo_alias = repo_details.name;
let headers = network::NetRequestHeaders::new().with("Content-Type", "application/json");
let authorisation = ulid::Ulid::new();
let body = json!({
"active": true,
"authorization_header": format!("Basic {}", authorisation),
"branch_filter": format!("{{{},{},{}}}", repo_config.branches().main(), repo_config.branches().next(), repo_config.branches().dev()),
"config": {
"content_type": "json",
"url": format!("{}/{}", webhook_url.as_ref(), repo_alias),
},
"events": [ "push" ],
"type": "forgejo"
});
let request = network::NetRequest::new(
network::RequestMethod::Post,
url,
headers,
network::RequestBody::Json(body),
network::ResponseType::Json,
None,
network::NetRequestLogging::None,
);
let result = net.post_json::<Hook>(request).await;
match result {
Ok(response) => {
if let Some(hook) = response.response_body() {
info!("Webhook registered");
addr.do_send(WebhookRegistered(hook.id(), authorisation));
}
}
Err(_) => warn!("Failed to register webhook"),
}
}
#[derive(Debug, serde::Deserialize)]
struct Hook {
id: i64,
}
impl Hook {
fn id(&self) -> WebhookId {
WebhookId(format!("{}", self.id))
}
} }
impl Handler<WebhookMessage> for RepoActor { impl Handler<WebhookMessage> for RepoActor {

View file

@ -26,10 +26,14 @@ impl ServerConfig {
.iter() .iter()
.map(|(name, forge)| (ForgeName(name.clone()), forge)) .map(|(name, forge)| (ForgeName(name.clone()), forge))
} }
pub const fn webhook(&self) -> &Webhook {
&self.webhook
}
} }
/// Defines the Webhook Forges should send updates to /// Defines the Webhook Forges should send updates to
#[derive(Debug, PartialEq, Eq, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
pub struct Webhook { pub struct Webhook {
url: String, url: String,
} }

View file

@ -1,6 +1,5 @@
use base64::Engine; use base64::Engine;
use kxio::network::{self, Network}; use kxio::network::{self, Network};
use secrecy::ExposeSecret;
use terrors::OneOf; use terrors::OneOf;
use tracing::{error, info}; use tracing::{error, info};
@ -34,6 +33,7 @@ pub async fn load(
let path = &details.repo; let path = &details.repo;
let filepath = ".git-next.toml"; let filepath = ".git-next.toml";
let branch = &details.branch; let branch = &details.branch;
use secrecy::ExposeSecret;
let token = details.forge.token.expose_secret(); let token = details.forge.token.expose_secret();
let url = network::NetUrl::new(format!( let url = network::NetUrl::new(format!(
"https://{hostname}/api/v1/repos/{path}/contents/{filepath}?ref={branch}&token={token}" "https://{hostname}/api/v1/repos/{path}/contents/{filepath}?ref={branch}&token={token}"
@ -128,6 +128,7 @@ pub async fn validate(
) -> Result<RepoConfig, OneOf<RepoConfigValidateErrors>> { ) -> Result<RepoConfig, OneOf<RepoConfigValidateErrors>> {
let hostname = &details.forge.hostname; let hostname = &details.forge.hostname;
let path = &details.repo; let path = &details.repo;
use secrecy::ExposeSecret;
let token = details.forge.token.expose_secret(); let token = details.forge.token.expose_secret();
let url = network::NetUrl::new(format!( let url = network::NetUrl::new(format!(
"https://{hostname}/api/v1/repos/{path}/branches?token={token}" "https://{hostname}/api/v1/repos/{path}/branches?token={token}"

View file

@ -15,7 +15,7 @@ use crate::{
filesystem::FileSystem, filesystem::FileSystem,
server::{ server::{
actors::webhook, actors::webhook,
config::{Forge, ForgeName, RepoAlias}, config::{Forge, ForgeName, RepoAlias, Webhook},
git::Git, git::Git,
}, },
}; };
@ -46,19 +46,19 @@ pub async fn start(fs: FileSystem, net: Network, git: Git) {
return; return;
}; };
info!("Starting Server..."); info!("Starting Server...");
let config = match config::ServerConfig::load(&fs) { let server_config = match config::ServerConfig::load(&fs) {
Ok(config) => config, Ok(server_config) => server_config,
Err(err) => { Err(err) => {
error!("Failed to load config file. Error: {}", err); error!("Failed to load config file. Error: {}", err);
return; return;
} }
}; };
info!("Config loaded");
let webhook_router = webhook::WebhookRouter::new().start(); let webhook_router = webhook::WebhookRouter::new().start();
config let webhook = server_config.webhook();
server_config
.forges() .forges()
.flat_map(|(forge_name, forge)| create_forge_repos(forge, forge_name, &net, &git)) .flat_map(|(forge_name, forge)| create_forge_repos(forge, forge_name, webhook, &net, &git))
.map(start_actor) .map(start_actor)
.map(|(alias, addr)| webhook::AddWebhookRecipient(alias, addr.recipient())) .map(|(alias, addr)| webhook::AddWebhookRecipient(alias, addr.recipient()))
.for_each(|msg| webhook_router.do_send(msg)); .for_each(|msg| webhook_router.do_send(msg));
@ -71,6 +71,7 @@ pub async fn start(fs: FileSystem, net: Network, git: Git) {
fn create_forge_repos( fn create_forge_repos(
forge: &Forge, forge: &Forge,
forge_name: ForgeName, forge_name: ForgeName,
webhook: &Webhook,
net: &Network, net: &Network,
git: &Git, git: &Git,
) -> Vec<(ForgeName, RepoAlias, RepoActor)> { ) -> Vec<(ForgeName, RepoAlias, RepoActor)> {
@ -80,16 +81,18 @@ fn create_forge_repos(
info!("Creating Forge"); info!("Creating Forge");
forge forge
.repos() .repos()
.map(create_actor(forge_name, forge.clone(), net, git)) .map(create_actor(forge_name, forge.clone(), webhook, net, git))
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
fn create_actor( fn create_actor(
forge_name: ForgeName, forge_name: ForgeName,
forge: config::Forge, forge: config::Forge,
webhook: &Webhook,
net: &Network, net: &Network,
git: &Git, git: &Git,
) -> impl Fn((RepoAlias, &Repo)) -> (ForgeName, RepoAlias, RepoActor) { ) -> impl Fn((RepoAlias, &Repo)) -> (ForgeName, RepoAlias, RepoActor) {
let webhook = webhook.clone();
let net = net.clone(); let net = net.clone();
let git = git.clone(); let git = git.clone();
move |(repo_name, repo)| { move |(repo_name, repo)| {
@ -98,6 +101,7 @@ fn create_actor(
info!("Creating Repo"); info!("Creating Repo");
let actor = actors::repo::RepoActor::new( let actor = actors::repo::RepoActor::new(
config::RepoDetails::new(&repo_name, repo, &forge_name, &forge), config::RepoDetails::new(&repo_name, repo, &forge_name, &forge),
webhook.clone(),
net.clone(), net.clone(),
git.clone(), git.clone(),
); );