Paul Campbell
83ce95776e
Remove the async wrapper for sending messages as they were never being delivered.
125 lines
4.3 KiB
Rust
125 lines
4.3 KiB
Rust
//
|
|
use actix::prelude::*;
|
|
use derive_more::Deref as _;
|
|
use tracing::Instrument as _;
|
|
|
|
use crate::{self as actor, messages::MessageToken};
|
|
use git_next_git as git;
|
|
|
|
impl Handler<actor::messages::ValidateRepo> for actor::RepoActor {
|
|
type Result = ();
|
|
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.deref()))]
|
|
fn handle(
|
|
&mut self,
|
|
msg: actor::messages::ValidateRepo,
|
|
ctx: &mut Self::Context,
|
|
) -> Self::Result {
|
|
actor::logger(&self.log, "start: ValidateRepo");
|
|
|
|
// Message Token - make sure we are only triggered for the latest/current token
|
|
match self.token_status(msg.unwrap()) {
|
|
TokenStatus::Current => {} // do nothing
|
|
TokenStatus::Expired => {
|
|
actor::logger(
|
|
&self.log,
|
|
format!("discarded: old message token: {}", self.message_token),
|
|
);
|
|
return; // message is expired
|
|
}
|
|
TokenStatus::New(message_token) => {
|
|
self.message_token = message_token;
|
|
actor::logger(
|
|
&self.log,
|
|
format!("new message token: {}", self.message_token),
|
|
);
|
|
}
|
|
}
|
|
actor::logger(&self.log, format!("accepted token: {}", self.message_token));
|
|
|
|
// Repository positions
|
|
let Some(ref open_repository) = self.open_repository else {
|
|
actor::logger(&self.log, "no open repository");
|
|
return;
|
|
};
|
|
actor::logger(&self.log, "have open repository");
|
|
let Some(repo_config) = self.repo_details.repo_config.clone() else {
|
|
actor::logger(&self.log, "no repo config");
|
|
return;
|
|
};
|
|
actor::logger(&self.log, "have repo config");
|
|
|
|
match git::validation::positions::validate_positions(
|
|
&**open_repository,
|
|
&self.repo_details,
|
|
repo_config,
|
|
) {
|
|
Ok(git::validation::positions::Positions {
|
|
main,
|
|
next,
|
|
dev,
|
|
dev_commit_history,
|
|
}) => {
|
|
tracing::debug!(%main, %next, %dev, "positions");
|
|
if next != main {
|
|
actor::do_send(
|
|
ctx.address(),
|
|
actor::messages::CheckCIStatus::new(next),
|
|
&self.log,
|
|
);
|
|
} else if next != dev {
|
|
actor::do_send(
|
|
ctx.address(),
|
|
actor::messages::AdvanceNext::new((next, dev_commit_history)),
|
|
&self.log,
|
|
)
|
|
} else {
|
|
// do nothing
|
|
}
|
|
}
|
|
Err(git::validation::positions::Error::Retryable(message)) => {
|
|
actor::logger(&self.log, message);
|
|
let addr = ctx.address();
|
|
let message_token = self.message_token;
|
|
let sleep_duration = self.sleep_duration;
|
|
let log = self.log.clone();
|
|
async move {
|
|
tracing::debug!("sleeping before retrying...");
|
|
actor::logger(&log, "before sleep");
|
|
tokio::time::sleep(sleep_duration).await;
|
|
actor::logger(&log, "after sleep");
|
|
actor::do_send(
|
|
addr,
|
|
actor::messages::ValidateRepo::new(message_token),
|
|
&log,
|
|
);
|
|
}
|
|
.in_current_span()
|
|
.into_actor(self)
|
|
.wait(ctx);
|
|
}
|
|
Err(git::validation::positions::Error::NonRetryable(message)) => {
|
|
actor::logger(&self.log, message);
|
|
}
|
|
}
|
|
|
|
tracing::debug!("Handler: ValidateRepo: finish");
|
|
}
|
|
}
|
|
|
|
enum TokenStatus {
|
|
Current,
|
|
Expired,
|
|
New(MessageToken),
|
|
}
|
|
impl actor::RepoActor {
|
|
fn token_status(&self, new: MessageToken) -> TokenStatus {
|
|
let current = &self.message_token;
|
|
if &new > current {
|
|
return TokenStatus::New(new);
|
|
}
|
|
if current > &new {
|
|
return TokenStatus::Expired;
|
|
}
|
|
TokenStatus::Current
|
|
}
|
|
}
|