Compare commits

...

2 commits

Author SHA1 Message Date
2ceca879c8 fix(repo): avoid blocking threads when pausing
All checks were successful
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
2024-08-30 22:41:51 +01:00
3084117d1d fix(tui): improve reliability of status updates
Some checks failed
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
Rust / build (push) Failing after 5m6s
2024-08-30 22:41:51 +01:00
9 changed files with 69 additions and 50 deletions

View file

@ -1,7 +1,7 @@
// //
use actix::prelude::*; use actix::prelude::*;
use tracing::warn; use tracing::{warn, Instrument};
use crate::{ use crate::{
repo::{ repo::{
@ -34,11 +34,12 @@ impl Handler<AdvanceNext> for RepoActor {
let addr = ctx.address(); let addr = ctx.address();
let (commit, force) = find_next_commit_on_dev(&next, &main, &dev_commit_history); let (commit, force) = find_next_commit_on_dev(&next, &main, &dev_commit_history);
if let Some(commit) = &commit {
self.update_tui(RepoUpdate::AdvancingNext { self.update_tui(RepoUpdate::AdvancingNext {
commit: commit.clone(), commit: commit.clone(),
force: force.clone(), force: force.clone(),
}); });
};
match advance_next( match advance_next(
commit, commit,
force, force,
@ -49,10 +50,20 @@ impl Handler<AdvanceNext> for RepoActor {
) { ) {
Ok(message_token) => { Ok(message_token) => {
// pause to allow any CI checks to be started // pause to allow any CI checks to be started
std::thread::sleep(self.sleep_duration); let sleep_duration = self.sleep_duration;
do_send(&addr, ValidateRepo::new(message_token), self.log.as_ref()); let log = self.log.clone();
async move {
actix_rt::time::sleep(sleep_duration).await;
do_send(&addr, ValidateRepo::new(message_token), log.as_ref());
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
Err(err) => {
warn!("advance next: {err}");
self.alert_tui(err.to_string());
} }
Err(err) => warn!("advance next: {err}"),
} }
} }
} }

View file

@ -2,11 +2,11 @@
use actix::prelude::*; use actix::prelude::*;
use git_next_core::git::{forge::commit::Status, graph, UserNotification}; use git_next_core::git::{forge::commit::Status, graph, UserNotification};
use tracing::debug; use tracing::{debug, Instrument};
use crate::{ use crate::{
repo::{ repo::{
delay_send, do_send, logger, do_send, logger,
messages::{AdvanceMain, ReceiveCIStatus, ValidateRepo}, messages::{AdvanceMain, ReceiveCIStatus, ValidateRepo},
notify_user, RepoActor, notify_user, RepoActor,
}, },
@ -37,8 +37,14 @@ impl Handler<ReceiveCIStatus> for RepoActor {
do_send(&addr, AdvanceMain::new(next), self.log.as_ref()); do_send(&addr, AdvanceMain::new(next), self.log.as_ref());
} }
Status::Pending => { Status::Pending => {
std::thread::sleep(sleep_duration); let log = self.log.clone();
do_send(&addr, ValidateRepo::new(message_token), self.log.as_ref()); async move {
actix_rt::time::sleep(sleep_duration).await;
do_send(&addr, ValidateRepo::new(message_token), log.as_ref());
}
.in_current_span()
.into_actor(self)
.wait(ctx);
} }
Status::Fail => { Status::Fail => {
tracing::warn!("Checks have failed"); tracing::warn!("Checks have failed");
@ -53,12 +59,16 @@ impl Handler<ReceiveCIStatus> for RepoActor {
}, },
log.as_ref(), log.as_ref(),
); );
delay_send( async move {
&addr, debug!("sleeping before retrying...");
sleep_duration, logger(log.as_ref(), "before sleep");
ValidateRepo::new(message_token), actix_rt::time::sleep(sleep_duration).await;
self.log.as_ref(), logger(log.as_ref(), "after sleep");
); do_send(&addr, ValidateRepo::new(message_token), log.as_ref());
}
.in_current_span()
.into_actor(self)
.wait(ctx);
} }
} }
} }

View file

@ -1,7 +1,7 @@
// //
use actix::prelude::*; use actix::prelude::*;
use tracing::{debug, instrument, Instrument as _}; use tracing::{info, instrument, Instrument as _};
use crate::{ use crate::{
repo::{ repo::{
@ -13,6 +13,7 @@ use crate::{
}; };
use git_next_core::git::{ use git_next_core::git::{
push::Force,
validation::positions::{validate, Error, Positions}, validation::positions::{validate, Error, Positions},
UserNotification, UserNotification,
}; };
@ -74,11 +75,18 @@ impl Handler<ValidateRepo> for RepoActor {
}, },
git_log, git_log,
)) => { )) => {
debug!(%main, %next, %dev, "positions"); info!(%main, %next, %dev, "positions");
self.update_tui_log(git_log); self.update_tui_log(git_log);
if next_is_valid && next != main { if next_is_valid && next != main {
info!("Checking CI");
self.update_tui(RepoUpdate::CheckingCI);
do_send(&ctx.address(), CheckCIStatus::new(next), self.log.as_ref()); do_send(&ctx.address(), CheckCIStatus::new(next), self.log.as_ref());
} else if next != dev { } else if next != dev {
info!("Advance next");
self.update_tui(RepoUpdate::AdvancingNext {
commit: next.clone(),
force: Force::No,
});
do_send( do_send(
&ctx.address(), &ctx.address(),
AdvanceNext::new(AdvanceNextPayload { AdvanceNext::new(AdvanceNextPayload {
@ -89,10 +97,12 @@ impl Handler<ValidateRepo> for RepoActor {
self.log.as_ref(), self.log.as_ref(),
); );
} else { } else {
info!("do nothing");
self.update_tui(RepoUpdate::Okay { main, next, dev }); self.update_tui(RepoUpdate::Okay { main, next, dev });
} }
} }
Err(Error::Retryable(message)) => { Err(Error::Retryable(message)) => {
info!(?message, "Retryable");
self.alert_tui(format!("[retryable: {message}]")); self.alert_tui(format!("[retryable: {message}]"));
logger(self.log.as_ref(), message); logger(self.log.as_ref(), message);
let addr = ctx.address(); let addr = ctx.address();
@ -100,7 +110,7 @@ impl Handler<ValidateRepo> for RepoActor {
let sleep_duration = self.sleep_duration; let sleep_duration = self.sleep_duration;
let log = self.log.clone(); let log = self.log.clone();
async move { async move {
debug!("sleeping before retrying..."); info!("sleeping before retrying...");
logger(log.as_ref(), "before sleep"); logger(log.as_ref(), "before sleep");
actix_rt::time::sleep(sleep_duration).await; actix_rt::time::sleep(sleep_duration).await;
logger(log.as_ref(), "after sleep"); logger(log.as_ref(), "after sleep");
@ -111,6 +121,7 @@ impl Handler<ValidateRepo> for RepoActor {
.wait(ctx); .wait(ctx);
} }
Err(Error::UserIntervention(user_notification)) => { Err(Error::UserIntervention(user_notification)) => {
info!(?user_notification, "User Intervention");
self.alert_tui(format!("[USER INTERVENTION: {user_notification}]")); self.alert_tui(format!("[USER INTERVENTION: {user_notification}]"));
if let UserNotification::CICheckFailed { log, .. } if let UserNotification::CICheckFailed { log, .. }
| UserNotification::DevNotBasedOnMain { log, .. } = &user_notification | UserNotification::DevNotBasedOnMain { log, .. } = &user_notification
@ -124,6 +135,7 @@ impl Handler<ValidateRepo> for RepoActor {
); );
} }
Err(Error::NonRetryable(message)) => { Err(Error::NonRetryable(message)) => {
info!(?message, "NonRetryable");
self.alert_tui(format!("[Error: {message}]")); self.alert_tui(format!("[Error: {message}]"));
logger(self.log.as_ref(), message); logger(self.log.as_ref(), message);
} }

View file

@ -7,7 +7,6 @@ use crate::{
}; };
use derive_more::Deref; use derive_more::Deref;
use kxio::network::Network; use kxio::network::Network;
use std::time::Duration;
use tracing::{info, instrument, warn, Instrument}; use tracing::{info, instrument, warn, Instrument};
use git_next_core::{ use git_next_core::{
@ -172,19 +171,6 @@ impl Actor for RepoActor {
} }
} }
pub fn delay_send<M>(addr: &Addr<RepoActor>, delay: Duration, msg: M, log: Option<&ActorLog>)
where
M: actix::Message + Send + 'static + std::fmt::Debug,
RepoActor: actix::Handler<M>,
<M as actix::Message>::Result: Send,
{
let log_message = format!("send-after-delay: {msg:?}");
tracing::debug!(log_message);
logger(log, log_message);
std::thread::sleep(delay);
do_send(addr, msg, log);
}
pub fn do_send<M>(addr: &Addr<RepoActor>, msg: M, log: Option<&ActorLog>) pub fn do_send<M>(addr: &Addr<RepoActor>, msg: M, log: Option<&ActorLog>)
where where
M: actix::Message + Send + 'static + std::fmt::Debug, M: actix::Message + Send + 'static + std::fmt::Debug,
@ -192,7 +178,7 @@ where
<M as actix::Message>::Result: Send, <M as actix::Message>::Result: Send,
{ {
let log_message = format!("send: {msg:?}"); let log_message = format!("send: {msg:?}");
tracing::debug!(log_message); info!(log_message);
logger(log, log_message); logger(log, log_message);
if cfg!(not(test)) { if cfg!(not(test)) {
// #[cfg(not(test))] // #[cfg(not(test))]

View file

@ -1,9 +1,11 @@
use std::time::Duration;
use crate::repo::messages::AdvanceNextPayload; use crate::repo::messages::AdvanceNextPayload;
// //
use super::*; use super::*;
#[actix::test] #[test_log::test(actix::test)]
async fn should_fetch_then_push_then_revalidate() -> TestResult { async fn should_fetch_then_push_then_revalidate() -> TestResult {
//given //given
let fs = given::a_filesystem(); let fs = given::a_filesystem();
@ -41,14 +43,11 @@ async fn should_fetch_then_push_then_revalidate() -> TestResult {
}, },
)) ))
.await?; .await?;
actix_rt::time::sleep(Duration::from_millis(1)).await;
System::current().stop(); System::current().stop();
//then //then
tracing::debug!(?log, ""); tracing::debug!(?log, "");
log.read().map_err(|e| e.to_string()).map(|l| { log.require_message_containing("send: ValidateRepo")?;
assert!(l
.iter()
.any(|message| message.contains("send: ValidateRepo")));
})?;
Ok(()) Ok(())
} }

View file

@ -1,3 +1,5 @@
use std::time::Duration;
// //
use super::*; use super::*;
@ -79,7 +81,7 @@ async fn when_fail_should_recheck_after_delay() -> TestResult {
git::forge::commit::Status::Fail, git::forge::commit::Status::Fail,
))) )))
.await?; .await?;
actix_rt::time::sleep(std::time::Duration::from_millis(2)).await; actix_rt::time::sleep(Duration::from_millis(20)).await;
System::current().stop(); System::current().stop();
//then //then

View file

@ -70,7 +70,7 @@ pub enum RepoUpdate {
}, },
CheckingCI, CheckingCI,
AdvancingNext { AdvancingNext {
commit: Option<git::Commit>, commit: git::Commit,
force: git::push::Force, force: git::push::Force,
}, },
AdvancingMain { AdvancingMain {

View file

@ -47,10 +47,9 @@ impl Handler<ServerUpdate> for Tui {
RepoUpdate::CheckingCI => { RepoUpdate::CheckingCI => {
repo_state.update_message("Checking CI status"); repo_state.update_message("Checking CI status");
} }
RepoUpdate::AdvancingNext { RepoUpdate::AdvancingNext { commit, force: _ } => {
commit: _, repo_state.update_message(format!("advancing next to {commit}"));
force: _, }
} => (),
RepoUpdate::AdvancingMain { commit } => { RepoUpdate::AdvancingMain { commit } => {
repo_state.update_message(format!("advancing main to {commit}")); repo_state.update_message(format!("advancing main to {commit}"));
} }

View file

@ -233,12 +233,12 @@ impl RepoState {
#[tracing::instrument] #[tracing::instrument]
pub fn update_message(&mut self, msg: impl Into<String> + std::fmt::Debug) { pub fn update_message(&mut self, msg: impl Into<String> + std::fmt::Debug) {
tracing::info!("new tui message");
let msg: String = msg.into(); let msg: String = msg.into();
match self { match self {
Self::Identified { message, .. } Self::Identified { message, .. }
| Self::Configured { message, .. } | Self::Configured { message, .. }
| Self::Ready { message, .. } => { | Self::Ready { message, .. } => {
info!(?msg, "updating ui");
*message = msg; *message = msg;
} }
} }