feat(server): avoid duplicate messages being passed for repo actor

This commit is contained in:
Paul Campbell 2024-05-05 08:17:32 +01:00
parent 2c5f497be2
commit da5bc69508
8 changed files with 104 additions and 39 deletions

View file

@ -17,6 +17,7 @@ pub async fn advance_next(
repo_config: config::RepoConfig, repo_config: config::RepoConfig,
forge: gitforge::Forge, forge: gitforge::Forge,
addr: Addr<super::RepoActor>, addr: Addr<super::RepoActor>,
message_token: super::MessageToken,
) { ) {
let next_commit = find_next_commit_on_dev(next, dev_commit_history); let next_commit = find_next_commit_on_dev(next, dev_commit_history);
let Some(commit) = next_commit else { let Some(commit) = next_commit else {
@ -36,7 +37,7 @@ pub async fn advance_next(
warn!(?err, "Failed") warn!(?err, "Failed")
} }
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
addr.do_send(ValidateRepo) addr.do_send(ValidateRepo { message_token })
} }
#[tracing::instrument] #[tracing::instrument]
@ -78,6 +79,7 @@ pub async fn advance_main(
repo_config: config::RepoConfig, repo_config: config::RepoConfig,
forge: gitforge::Forge, forge: gitforge::Forge,
addr: Addr<RepoActor>, addr: Addr<RepoActor>,
message_token: super::MessageToken,
) { ) {
info!("Advancing main to next"); info!("Advancing main to next");
if let Err(err) = forge.branch_reset( if let Err(err) = forge.branch_reset(
@ -87,7 +89,7 @@ pub async fn advance_main(
) { ) {
warn!(?err, "Failed") warn!(?err, "Failed")
}; };
addr.do_send(ValidateRepo) addr.do_send(ValidateRepo { message_token })
} }
#[cfg(test)] #[cfg(test)]

View file

@ -11,11 +11,13 @@ use crate::server::{
actors::repo::webhook::WebhookAuth, actors::repo::webhook::WebhookAuth,
config::{RepoConfig, RepoDetails, Webhook}, config::{RepoConfig, RepoDetails, Webhook},
gitforge, gitforge,
types::MessageToken,
}; };
use self::webhook::WebhookId; use self::webhook::WebhookId;
pub struct RepoActor { pub struct RepoActor {
message_token: MessageToken,
span: tracing::Span, span: tracing::Span,
details: RepoDetails, details: RepoDetails,
webhook: Webhook, webhook: Webhook,
@ -40,6 +42,7 @@ impl RepoActor {
}; };
debug!(?forge, "new"); debug!(?forge, "new");
Self { Self {
message_token: MessageToken::new(),
span, span,
details, details,
webhook, webhook,
@ -137,22 +140,49 @@ impl Handler<LoadedConfig> for RepoActor {
.into_actor(self) .into_actor(self)
.wait(ctx); .wait(ctx);
} }
ctx.address().do_send(ValidateRepo); ctx.address().do_send(ValidateRepo {
message_token: self.message_token,
});
} }
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct ValidateRepo; pub struct ValidateRepo {
message_token: MessageToken,
}
impl ValidateRepo {
pub const fn new(message_token: MessageToken) -> Self {
Self { message_token }
}
}
impl Handler<ValidateRepo> for RepoActor { impl Handler<ValidateRepo> for RepoActor {
type Result = (); type Result = ();
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.details))] #[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.details, token = %msg.message_token))]
fn handle(&mut self, _msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result {
match msg.message_token {
message_token if self.message_token < message_token => {
info!(%message_token, "New message token");
self.message_token = msg.message_token;
}
message_token if self.message_token > message_token => {
info!("Dropping message from previous generation");
return; // message is expired
}
_ => {
// do nothing
}
}
info!("Message Received"); info!("Message Received");
if let Some(repo_config) = self.details.repo_config.clone() { if let Some(repo_config) = self.details.repo_config.clone() {
let forge = self.forge.clone(); let forge = self.forge.clone();
let addr = ctx.address(); let addr = ctx.address();
async move { forge.branches_validate_positions(repo_config, addr).await } let message_token = self.message_token;
async move {
forge
.branches_validate_positions(repo_config, addr, message_token)
.await
}
.in_current_span() .in_current_span()
.into_actor(self) .into_actor(self)
.wait(ctx); .wait(ctx);
@ -170,7 +200,7 @@ pub struct StartMonitoring {
} }
impl Handler<StartMonitoring> for RepoActor { impl Handler<StartMonitoring> for RepoActor {
type Result = (); type Result = ();
#[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all, fields(repo = %self.details, main = %msg.main, next= %msg.next, dev = %msg.dev))] #[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all, fields(token = %self.message_token, repo = %self.details, main = %msg.main, next= %msg.next, dev = %msg.dev))]
fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result {
info!("Message Received"); info!("Message Received");
let Some(repo_config) = self.details.repo_config.clone() else { let Some(repo_config) = self.details.repo_config.clone() else {
@ -186,12 +216,19 @@ impl Handler<StartMonitoring> for RepoActor {
let forge = self.forge.clone(); let forge = self.forge.clone();
if next_ahead_of_main { if next_ahead_of_main {
status::check_next(msg.next, addr, forge) status::check_next(msg.next, addr, forge, self.message_token)
.in_current_span() .in_current_span()
.into_actor(self) .into_actor(self)
.wait(ctx); .wait(ctx);
} else if dev_ahead_of_next { } else if dev_ahead_of_next {
branch::advance_next(msg.next, msg.dev_commit_history, repo_config, forge, addr) branch::advance_next(
msg.next,
msg.dev_commit_history,
repo_config,
forge,
addr,
self.message_token,
)
.in_current_span() .in_current_span()
.into_actor(self) .into_actor(self)
.wait(ctx); .wait(ctx);
@ -226,7 +263,7 @@ impl Handler<AdvanceMainTo> for RepoActor {
}; };
let forge = self.forge.clone(); let forge = self.forge.clone();
let addr = ctx.address(); let addr = ctx.address();
branch::advance_main(msg.0, repo_config, forge, addr) branch::advance_main(msg.0, repo_config, forge, addr, self.message_token)
.in_current_span() .in_current_span()
.into_actor(self) .into_actor(self)
.wait(ctx); .wait(ctx);

View file

@ -2,7 +2,7 @@ use actix::prelude::*;
use gix::trace::warn; use gix::trace::warn;
use tracing::info; use tracing::info;
use crate::server::{actors::repo::ValidateRepo, gitforge}; use crate::server::{actors::repo::ValidateRepo, gitforge, types::MessageToken};
use super::AdvanceMainTo; use super::AdvanceMainTo;
@ -10,6 +10,7 @@ pub async fn check_next(
next: gitforge::Commit, next: gitforge::Commit,
addr: Addr<super::RepoActor>, addr: Addr<super::RepoActor>,
forge: gitforge::Forge, forge: gitforge::Forge,
message_token: MessageToken,
) { ) {
// get the status - pass, fail, pending (all others map to fail, e.g. error) // get the status - pass, fail, pending (all others map to fail, e.g. error)
let status = forge.commit_status(&next).await; let status = forge.commit_status(&next).await;
@ -20,7 +21,7 @@ pub async fn check_next(
} }
gitforge::CommitStatus::Pending => { gitforge::CommitStatus::Pending => {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo); addr.do_send(ValidateRepo { message_token });
} }
gitforge::CommitStatus::Fail => { gitforge::CommitStatus::Fail => {
warn!("Checks have failed"); warn!("Checks have failed");

View file

@ -204,13 +204,16 @@ impl Handler<WebhookMessage> for RepoActor {
type Result = (); type Result = ();
#[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity #[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity
#[tracing::instrument(name = "RepoActor::WebhookMessage", skip_all, fields(token = %self.message_token, repo = %self.details))]
fn handle(&mut self, msg: WebhookMessage, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: WebhookMessage, ctx: &mut Self::Context) -> Self::Result {
if msg.authorisation() != self.webhook_auth { if msg.authorisation() != self.webhook_auth {
return; // invalid auth return; // invalid auth
} }
let id = msg.id(); let id = msg.id();
let span = tracing::info_span!("handle", %id);
let _guard = span.enter();
let body = msg.body(); let body = msg.body();
debug!(?id, "RepoActor received message"); debug!(%id, "RepoActor received message");
match serde_json::from_str::<Push>(body) { match serde_json::from_str::<Push>(body) {
Err(err) => debug!(?err, %body, "Not a 'push'"), Err(err) => debug!(?err, %body, "Not a 'push'"),
Ok(push) => { Ok(push) => {
@ -225,10 +228,9 @@ impl Handler<WebhookMessage> for RepoActor {
Branch::Main => { Branch::Main => {
if self.last_main_commit == Some(push.commit()) { if self.last_main_commit == Some(push.commit()) {
info!( info!(
?id, branch = %config.branches().main(),
"Ignoring - already aware of branch '{}' at commit '{}'", commit = %push.commit(),
config.branches().main(), "Ignoring - already aware of branch at commit",
push.commit()
); );
return; return;
} }
@ -237,10 +239,9 @@ impl Handler<WebhookMessage> for RepoActor {
Branch::Next => { Branch::Next => {
if self.last_next_commit == Some(push.commit()) { if self.last_next_commit == Some(push.commit()) {
info!( info!(
?id, branch = %config.branches().next(),
"Ignoring - already aware of branch '{}' at commit '{}'", commit = %push.commit(),
config.branches().next(), "Ignoring - already aware of branch at commit",
push.commit()
); );
return; return;
} }
@ -249,22 +250,23 @@ impl Handler<WebhookMessage> for RepoActor {
Branch::Dev => { Branch::Dev => {
if self.last_dev_commit == Some(push.commit()) { if self.last_dev_commit == Some(push.commit()) {
info!( info!(
?id, branch = %config.branches().dev(),
"Ignoring - already aware of branch '{}' at commit '{}'", commit = %push.commit(),
config.branches().dev(), "Ignoring - already aware of branch at commit",
push.commit()
); );
return; return;
} }
self.last_dev_commit.replace(push.commit()) self.last_dev_commit.replace(push.commit())
} }
}; };
let message_token = self.message_token.next();
info!( info!(
token = %message_token,
?branch, ?branch,
commit = %push.commit(), commit = %push.commit(),
"New commit for branch - assessing branch positions" "New commit"
); );
ctx.address().do_send(ValidateRepo); ctx.address().do_send(ValidateRepo { message_token });
} }
} }
} }

View file

@ -13,7 +13,7 @@ use crate::server::{
actors::repo::{RepoActor, StartMonitoring, ValidateRepo}, actors::repo::{RepoActor, StartMonitoring, ValidateRepo},
config::{BranchName, GitDir, RepoConfig, RepoDetails}, config::{BranchName, GitDir, RepoConfig, RepoDetails},
gitforge::{self, forgejo::branch::ValidatedPositions, RepoCloneError}, gitforge::{self, forgejo::branch::ValidatedPositions, RepoCloneError},
types::GitRef, types::{GitRef, MessageToken},
}; };
struct ForgeJo; struct ForgeJo;
@ -45,7 +45,12 @@ impl super::ForgeLike for ForgeJoEnv {
file::contents_get(&self.repo_details, &self.net, branch, file_path).await file::contents_get(&self.repo_details, &self.net, branch, file_path).await
} }
async fn branches_validate_positions(&self, repo_config: RepoConfig, addr: Addr<RepoActor>) { async fn branches_validate_positions(
&self,
repo_config: RepoConfig,
addr: Addr<RepoActor>,
message_token: MessageToken,
) {
match branch::validate_positions(self, repo_config).await { match branch::validate_positions(self, repo_config).await {
Ok(ValidatedPositions { Ok(ValidatedPositions {
main, main,
@ -63,7 +68,7 @@ impl super::ForgeLike for ForgeJoEnv {
Err(err) => { Err(err) => {
warn!("{}", err); warn!("{}", err);
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
addr.do_send(ValidateRepo); addr.do_send(ValidateRepo::new(message_token));
} }
} }
} }

View file

@ -2,7 +2,7 @@ use crate::server::{
actors::repo::RepoActor, actors::repo::RepoActor,
config::{BranchName, GitDir, RepoConfig}, config::{BranchName, GitDir, RepoConfig},
gitforge::{self, RepoCloneError}, gitforge::{self, RepoCloneError},
types::GitRef, types::{GitRef, MessageToken},
}; };
struct MockForge; struct MockForge;
@ -35,6 +35,7 @@ impl super::ForgeLike for MockForgeEnv {
&self, &self,
_repo_config: RepoConfig, _repo_config: RepoConfig,
_addr: actix::prelude::Addr<RepoActor>, _addr: actix::prelude::Addr<RepoActor>,
_message_token: MessageToken,
) { ) {
todo!() todo!()
} }

View file

@ -18,7 +18,7 @@ pub use errors::*;
use crate::server::{ use crate::server::{
config::{BranchName, GitDir, RepoConfig, RepoDetails}, config::{BranchName, GitDir, RepoConfig, RepoDetails},
types::GitRef, types::{GitRef, MessageToken},
}; };
#[async_trait::async_trait] #[async_trait::async_trait]
@ -41,6 +41,7 @@ pub trait ForgeLike {
&self, &self,
repo_config: RepoConfig, repo_config: RepoConfig,
addr: actix::prelude::Addr<super::actors::repo::RepoActor>, addr: actix::prelude::Addr<super::actors::repo::RepoActor>,
message_token: MessageToken,
); );
/// Moves a branch to a new commit. /// Moves a branch to a new commit.

View file

@ -19,3 +19,19 @@ impl Display for GitRef {
write!(f, "{}", self.0) write!(f, "{}", self.0)
} }
} }
#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct MessageToken(u32);
impl MessageToken {
pub fn new() -> Self {
Self::default()
}
pub const fn next(&self) -> Self {
Self(self.0 + 1)
}
}
impl std::fmt::Display for MessageToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}