diff --git a/src/server/actors/repo/branch.rs b/src/server/actors/repo/branch.rs index bf650fd..aa14181 100644 --- a/src/server/actors/repo/branch.rs +++ b/src/server/actors/repo/branch.rs @@ -17,6 +17,7 @@ pub async fn advance_next( repo_config: config::RepoConfig, forge: gitforge::Forge, addr: Addr, + message_token: super::MessageToken, ) { let next_commit = find_next_commit_on_dev(next, dev_commit_history); let Some(commit) = next_commit else { @@ -36,7 +37,7 @@ pub async fn advance_next( warn!(?err, "Failed") } tokio::time::sleep(Duration::from_secs(10)).await; - addr.do_send(ValidateRepo) + addr.do_send(ValidateRepo { message_token }) } #[tracing::instrument] @@ -78,6 +79,7 @@ pub async fn advance_main( repo_config: config::RepoConfig, forge: gitforge::Forge, addr: Addr, + message_token: super::MessageToken, ) { info!("Advancing main to next"); if let Err(err) = forge.branch_reset( @@ -87,7 +89,7 @@ pub async fn advance_main( ) { warn!(?err, "Failed") }; - addr.do_send(ValidateRepo) + addr.do_send(ValidateRepo { message_token }) } #[cfg(test)] diff --git a/src/server/actors/repo/mod.rs b/src/server/actors/repo/mod.rs index e7efb33..557460f 100644 --- a/src/server/actors/repo/mod.rs +++ b/src/server/actors/repo/mod.rs @@ -11,11 +11,13 @@ use crate::server::{ actors::repo::webhook::WebhookAuth, config::{RepoConfig, RepoDetails, Webhook}, gitforge, + types::MessageToken, }; use self::webhook::WebhookId; pub struct RepoActor { + message_token: MessageToken, span: tracing::Span, details: RepoDetails, webhook: Webhook, @@ -40,6 +42,7 @@ impl RepoActor { }; debug!(?forge, "new"); Self { + message_token: MessageToken::new(), span, details, webhook, @@ -137,25 +140,52 @@ impl Handler for RepoActor { .into_actor(self) .wait(ctx); } - ctx.address().do_send(ValidateRepo); + ctx.address().do_send(ValidateRepo { + message_token: self.message_token, + }); } } #[derive(Message)] #[rtype(result = "()")] -pub struct ValidateRepo; +pub struct ValidateRepo { + message_token: MessageToken, +} +impl ValidateRepo { + pub const fn new(message_token: MessageToken) -> Self { + Self { message_token } + } +} impl Handler for RepoActor { type Result = (); - #[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.details))] - fn handle(&mut self, _msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result { + #[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.details, token = %msg.message_token))] + fn handle(&mut self, msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result { + match msg.message_token { + message_token if self.message_token < message_token => { + info!(%message_token, "New message token"); + self.message_token = msg.message_token; + } + message_token if self.message_token > message_token => { + info!("Dropping message from previous generation"); + return; // message is expired + } + _ => { + // do nothing + } + } info!("Message Received"); if let Some(repo_config) = self.details.repo_config.clone() { let forge = self.forge.clone(); let addr = ctx.address(); - async move { forge.branches_validate_positions(repo_config, addr).await } - .in_current_span() - .into_actor(self) - .wait(ctx); + let message_token = self.message_token; + async move { + forge + .branches_validate_positions(repo_config, addr, message_token) + .await + } + .in_current_span() + .into_actor(self) + .wait(ctx); } } } @@ -170,7 +200,7 @@ pub struct StartMonitoring { } impl Handler for RepoActor { type Result = (); - #[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all, fields(repo = %self.details, main = %msg.main, next= %msg.next, dev = %msg.dev))] + #[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all, fields(token = %self.message_token, repo = %self.details, main = %msg.main, next= %msg.next, dev = %msg.dev))] fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result { info!("Message Received"); let Some(repo_config) = self.details.repo_config.clone() else { @@ -186,15 +216,22 @@ impl Handler for RepoActor { let forge = self.forge.clone(); if next_ahead_of_main { - status::check_next(msg.next, addr, forge) + status::check_next(msg.next, addr, forge, self.message_token) .in_current_span() .into_actor(self) .wait(ctx); } else if dev_ahead_of_next { - branch::advance_next(msg.next, msg.dev_commit_history, repo_config, forge, addr) - .in_current_span() - .into_actor(self) - .wait(ctx); + branch::advance_next( + msg.next, + msg.dev_commit_history, + repo_config, + forge, + addr, + self.message_token, + ) + .in_current_span() + .into_actor(self) + .wait(ctx); } } } @@ -226,7 +263,7 @@ impl Handler for RepoActor { }; let forge = self.forge.clone(); let addr = ctx.address(); - branch::advance_main(msg.0, repo_config, forge, addr) + branch::advance_main(msg.0, repo_config, forge, addr, self.message_token) .in_current_span() .into_actor(self) .wait(ctx); diff --git a/src/server/actors/repo/status.rs b/src/server/actors/repo/status.rs index 6c47fae..f0b76d9 100644 --- a/src/server/actors/repo/status.rs +++ b/src/server/actors/repo/status.rs @@ -2,7 +2,7 @@ use actix::prelude::*; use gix::trace::warn; use tracing::info; -use crate::server::{actors::repo::ValidateRepo, gitforge}; +use crate::server::{actors::repo::ValidateRepo, gitforge, types::MessageToken}; use super::AdvanceMainTo; @@ -10,6 +10,7 @@ pub async fn check_next( next: gitforge::Commit, addr: Addr, forge: gitforge::Forge, + message_token: MessageToken, ) { // get the status - pass, fail, pending (all others map to fail, e.g. error) let status = forge.commit_status(&next).await; @@ -20,7 +21,7 @@ pub async fn check_next( } gitforge::CommitStatus::Pending => { tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; - addr.do_send(ValidateRepo); + addr.do_send(ValidateRepo { message_token }); } gitforge::CommitStatus::Fail => { warn!("Checks have failed"); diff --git a/src/server/actors/repo/webhook.rs b/src/server/actors/repo/webhook.rs index 0dcc914..cc1787a 100644 --- a/src/server/actors/repo/webhook.rs +++ b/src/server/actors/repo/webhook.rs @@ -204,13 +204,16 @@ impl Handler for RepoActor { type Result = (); #[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity + #[tracing::instrument(name = "RepoActor::WebhookMessage", skip_all, fields(token = %self.message_token, repo = %self.details))] fn handle(&mut self, msg: WebhookMessage, ctx: &mut Self::Context) -> Self::Result { if msg.authorisation() != self.webhook_auth { return; // invalid auth } let id = msg.id(); + let span = tracing::info_span!("handle", %id); + let _guard = span.enter(); let body = msg.body(); - debug!(?id, "RepoActor received message"); + debug!(%id, "RepoActor received message"); match serde_json::from_str::(body) { Err(err) => debug!(?err, %body, "Not a 'push'"), Ok(push) => { @@ -225,10 +228,9 @@ impl Handler for RepoActor { Branch::Main => { if self.last_main_commit == Some(push.commit()) { info!( - ?id, - "Ignoring - already aware of branch '{}' at commit '{}'", - config.branches().main(), - push.commit() + branch = %config.branches().main(), + commit = %push.commit(), + "Ignoring - already aware of branch at commit", ); return; } @@ -237,10 +239,9 @@ impl Handler for RepoActor { Branch::Next => { if self.last_next_commit == Some(push.commit()) { info!( - ?id, - "Ignoring - already aware of branch '{}' at commit '{}'", - config.branches().next(), - push.commit() + branch = %config.branches().next(), + commit = %push.commit(), + "Ignoring - already aware of branch at commit", ); return; } @@ -249,22 +250,23 @@ impl Handler for RepoActor { Branch::Dev => { if self.last_dev_commit == Some(push.commit()) { info!( - ?id, - "Ignoring - already aware of branch '{}' at commit '{}'", - config.branches().dev(), - push.commit() + branch = %config.branches().dev(), + commit = %push.commit(), + "Ignoring - already aware of branch at commit", ); return; } self.last_dev_commit.replace(push.commit()) } }; + let message_token = self.message_token.next(); info!( + token = %message_token, ?branch, commit = %push.commit(), - "New commit for branch - assessing branch positions" + "New commit" ); - ctx.address().do_send(ValidateRepo); + ctx.address().do_send(ValidateRepo { message_token }); } } } diff --git a/src/server/gitforge/forgejo/mod.rs b/src/server/gitforge/forgejo/mod.rs index 078eec6..c8d683a 100644 --- a/src/server/gitforge/forgejo/mod.rs +++ b/src/server/gitforge/forgejo/mod.rs @@ -13,7 +13,7 @@ use crate::server::{ actors::repo::{RepoActor, StartMonitoring, ValidateRepo}, config::{BranchName, GitDir, RepoConfig, RepoDetails}, gitforge::{self, forgejo::branch::ValidatedPositions, RepoCloneError}, - types::GitRef, + types::{GitRef, MessageToken}, }; struct ForgeJo; @@ -45,7 +45,12 @@ impl super::ForgeLike for ForgeJoEnv { file::contents_get(&self.repo_details, &self.net, branch, file_path).await } - async fn branches_validate_positions(&self, repo_config: RepoConfig, addr: Addr) { + async fn branches_validate_positions( + &self, + repo_config: RepoConfig, + addr: Addr, + message_token: MessageToken, + ) { match branch::validate_positions(self, repo_config).await { Ok(ValidatedPositions { main, @@ -63,7 +68,7 @@ impl super::ForgeLike for ForgeJoEnv { Err(err) => { warn!("{}", err); tokio::time::sleep(Duration::from_secs(10)).await; - addr.do_send(ValidateRepo); + addr.do_send(ValidateRepo::new(message_token)); } } } diff --git a/src/server/gitforge/mock_forge.rs b/src/server/gitforge/mock_forge.rs index 09f406d..d44f1e3 100644 --- a/src/server/gitforge/mock_forge.rs +++ b/src/server/gitforge/mock_forge.rs @@ -2,7 +2,7 @@ use crate::server::{ actors::repo::RepoActor, config::{BranchName, GitDir, RepoConfig}, gitforge::{self, RepoCloneError}, - types::GitRef, + types::{GitRef, MessageToken}, }; struct MockForge; @@ -35,6 +35,7 @@ impl super::ForgeLike for MockForgeEnv { &self, _repo_config: RepoConfig, _addr: actix::prelude::Addr, + _message_token: MessageToken, ) { todo!() } diff --git a/src/server/gitforge/mod.rs b/src/server/gitforge/mod.rs index e69e8cb..ba5136d 100644 --- a/src/server/gitforge/mod.rs +++ b/src/server/gitforge/mod.rs @@ -18,7 +18,7 @@ pub use errors::*; use crate::server::{ config::{BranchName, GitDir, RepoConfig, RepoDetails}, - types::GitRef, + types::{GitRef, MessageToken}, }; #[async_trait::async_trait] @@ -41,6 +41,7 @@ pub trait ForgeLike { &self, repo_config: RepoConfig, addr: actix::prelude::Addr, + message_token: MessageToken, ); /// Moves a branch to a new commit. diff --git a/src/server/types.rs b/src/server/types.rs index beba8f4..58da629 100644 --- a/src/server/types.rs +++ b/src/server/types.rs @@ -19,3 +19,19 @@ impl Display for GitRef { write!(f, "{}", self.0) } } + +#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct MessageToken(u32); +impl MessageToken { + pub fn new() -> Self { + Self::default() + } + pub const fn next(&self) -> Self { + Self(self.0 + 1) + } +} +impl std::fmt::Display for MessageToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +}