fix: shutdown properly on file parse error
Some checks failed
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
Rust / build (push) Failing after 1m23s
ci/woodpecker/push/push-next Pipeline was successful

Closes kemitix/git-next#152
This commit is contained in:
Paul Campbell 2024-09-02 09:14:08 +01:00
parent 181ec8eb0f
commit 6364a1610d
6 changed files with 77 additions and 18 deletions

View file

@ -3,4 +3,5 @@ mod receive_app_config;
mod receive_valid_app_config;
mod server_update;
mod shutdown;
mod shutdown_trigger;
mod subscribe_updates;

View file

@ -0,0 +1,12 @@
//
use actix::Handler;
use crate::server::{actor::messages::ShutdownTrigger, ServerActor};
impl Handler<ShutdownTrigger> for ServerActor {
type Result = ();
fn handle(&mut self, msg: ShutdownTrigger, _ctx: &mut Self::Context) -> Self::Result {
self.shutdown_trigger.replace(msg.peel());
}
}

View file

@ -99,3 +99,13 @@ message!(
Recipient<ServerUpdate>,
"Subscribe to receive updates from the server"
);
/// Sends a channel to be used to shutdown the server
#[derive(Message, Constructor)]
#[rtype(result = "()")]
pub struct ShutdownTrigger(std::sync::mpsc::Sender<String>);
impl ShutdownTrigger {
pub fn peel(self) -> std::sync::mpsc::Sender<String> {
self.0
}
}

View file

@ -1,6 +1,6 @@
//
use actix::prelude::*;
use messages::{ReceiveAppConfig, ServerUpdate};
use messages::{ReceiveAppConfig, ServerUpdate, Shutdown};
use tracing::error;
#[cfg(test)]
@ -58,6 +58,7 @@ pub struct ServerActor {
sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
shutdown_trigger: Option<std::sync::mpsc::Sender<String>>,
subscribers: Vec<Recipient<ServerUpdate>>,
// testing
@ -84,6 +85,7 @@ impl ServerActor {
net,
alerts,
repository_factory: repo,
shutdown_trigger: None,
subscribers: Vec::default(),
sleep_duration,
repo_actors: BTreeMap::new(),
@ -228,10 +230,15 @@ impl ServerActor {
}
/// Attempts to gracefully shutdown the server before stopping the system.
fn abort(&self, ctx: &<Self as actix::Actor>::Context, message: impl Into<String>) {
tracing::error!("Aborting: {}", message.into());
fn abort(&mut self, ctx: &<Self as actix::Actor>::Context, message: impl Into<String>) {
self.do_send(crate::server::actor::messages::Shutdown, ctx);
System::current().stop_with_code(1);
if let Some(t) = self.shutdown_trigger.take() {
let _ = t.send(message.into());
} else {
error!("{}", message.into());
self.do_send(Shutdown, ctx);
// System::current().stop_with_code(1);
}
}
fn do_send<M>(&self, msg: M, ctx: &<Self as actix::Actor>::Context)

View file

@ -6,6 +6,7 @@ mod tests;
use actix::prelude::*;
use actix_rt::signal;
use actor::messages::ShutdownTrigger;
use crate::{
alerts::{AlertsActor, History},
@ -23,7 +24,7 @@ use tracing::{error, info};
use std::{
path::PathBuf,
sync::{atomic::Ordering, Arc, RwLock},
sync::{atomic::Ordering, mpsc::channel, Arc, RwLock},
time::Duration,
};
@ -61,8 +62,10 @@ pub fn start(
init_logging();
}
let file_watcher_err_channel: Arc<RwLock<Option<anyhow::Error>>> = Arc::new(RwLock::new(None));
let file_watcher_err_channel_exec = file_watcher_err_channel.clone();
let shutdown_message_holder: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None));
let shutdown_message_holder_exec = shutdown_message_holder.clone();
let file_watcher_err_holder: Arc<RwLock<Option<anyhow::Error>>> = Arc::new(RwLock::new(None));
let file_watcher_err_holder_exec = file_watcher_err_holder.clone();
let execution = async move {
info!("Starting Alert Dispatcher...");
let alerts_addr = AlertsActor::new(None, History::new(A_DAY), net.clone()).start();
@ -80,37 +83,51 @@ pub fn start(
server.do_send(crate::server::actor::messages::Shutdown);
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;
System::current().stop();
let _ = file_watcher_err_channel_exec
let _ = file_watcher_err_holder_exec
.write()
.map(|mut o| o.replace(err));
return;
}
};
let (tx_shutdown, rx_shutdown) = channel::<String>();
if ui {
#[cfg(feature = "tui")]
{
use crate::server::actor::messages::SubscribeToUpdates;
use crate::tui;
use std::sync::mpsc::channel;
let (tx_shutdown, rx_shutdown) = channel::<()>();
let tui_addr = tui::Tui::new(tx_shutdown).start();
let (tx_shutdown, rx_shutdown) = channel::<String>();
let tui_addr = tui::Tui::new(tx_shutdown.clone()).start();
server.do_send(SubscribeToUpdates::new(tui_addr.clone().recipient()));
server.do_send(ShutdownTrigger::new(tx_shutdown));
server.do_send(FileUpdated); // update file after ui subscription in place
loop {
let _ = tui_addr.send(tui::Tick).await;
if rx_shutdown.try_recv().is_ok() {
if let Ok(message) = rx_shutdown.try_recv() {
let _ = shutdown_message_holder_exec
.write()
.map(|mut o| o.replace(message));
break;
}
actix_rt::time::sleep(Duration::from_millis(16)).await;
}
}
} else {
server.do_send(ShutdownTrigger::new(tx_shutdown.clone()));
server.do_send(FileUpdated);
info!("Server running - Press Ctrl-C to stop...");
let _ = signal::ctrl_c().await;
info!("Ctrl-C received, shutting down...");
actix_rt::spawn(async move {
let _ = signal::ctrl_c().await;
info!("Ctrl-C received, shutting down...");
let _ = tx_shutdown.send(String::new());
});
if let Ok(message) = rx_shutdown.try_recv() {
let _ = shutdown_message_holder_exec
.write()
.map(|mut o| o.replace(message));
}
}
// shutdown
@ -124,10 +141,22 @@ pub fn start(
Arbiter::current().spawn(execution);
system.run()?;
// check for error from server thread
#[allow(clippy::unwrap_used)]
if let Some(err) = &*shutdown_message_holder.read().unwrap() {
if ui {
ratatui::restore();
eprintln!("Server: {err:?}");
}
error!(?err, "server");
return Err(color_eyre::eyre::eyre!(format!("{err}")));
}
// check for error from file watcher thread
#[allow(clippy::unwrap_used)]
if let Some(err) = &*file_watcher_err_channel.read().unwrap() {
if let Some(err) = &*file_watcher_err_holder.read().unwrap() {
if ui {
ratatui::restore();
eprintln!("File Watcher: {err:?}");
}
error!(?err, "file watcher");

View file

@ -18,7 +18,7 @@ use tui_scrollview::ScrollViewState;
#[derive(Debug)]
pub struct Tui {
terminal: Option<DefaultTerminal>,
signal_shutdown: Sender<()>,
signal_shutdown: Sender<String>,
pub state: State,
scroll_view_state: ScrollViewState,
}
@ -33,7 +33,7 @@ impl Actor for Tui {
}
}
impl Tui {
pub fn new(signal_shutdown: Sender<()>) -> Self {
pub fn new(signal_shutdown: Sender<String>) -> Self {
Self {
terminal: None,
signal_shutdown,
@ -68,7 +68,7 @@ impl Tui {
match key.code {
KeyCode::Char('q') => {
ctx.stop();
if let Err(err) = self.signal_shutdown.send(()) {
if let Err(err) = self.signal_shutdown.send(String::new()) {
tracing::error!(?err, "Failed to signal shutdown");
}
}