fix: shutdown properly on file parse error

Closes kemitix/git-next#152
This commit is contained in:
Paul Campbell 2024-09-02 09:14:08 +01:00
parent 181ec8eb0f
commit b4a4631a1d
7 changed files with 84 additions and 19 deletions

View file

@ -22,7 +22,11 @@ impl Forge {
ForgeType::ForgeJo => Box::new(ForgeJo::new(repo_details, net)), ForgeType::ForgeJo => Box::new(ForgeJo::new(repo_details, net)),
#[cfg(feature = "github")] #[cfg(feature = "github")]
ForgeType::GitHub => Box::new(Github::new(repo_details, net)), ForgeType::GitHub => Box::new(Github::new(repo_details, net)),
_ => unreachable!(), _ => {
drop(repo_details);
drop(net);
unreachable!();
}
} }
} }
} }

View file

@ -3,4 +3,5 @@ mod receive_app_config;
mod receive_valid_app_config; mod receive_valid_app_config;
mod server_update; mod server_update;
mod shutdown; mod shutdown;
mod shutdown_trigger;
mod subscribe_updates; mod subscribe_updates;

View file

@ -0,0 +1,12 @@
//
use actix::Handler;
use crate::server::{actor::messages::ShutdownTrigger, ServerActor};
impl Handler<ShutdownTrigger> for ServerActor {
type Result = ();
fn handle(&mut self, msg: ShutdownTrigger, _ctx: &mut Self::Context) -> Self::Result {
self.shutdown_trigger.replace(msg.peel());
}
}

View file

@ -99,3 +99,13 @@ message!(
Recipient<ServerUpdate>, Recipient<ServerUpdate>,
"Subscribe to receive updates from the server" "Subscribe to receive updates from the server"
); );
/// Sends a channel to be used to shutdown the server
#[derive(Message, Constructor)]
#[rtype(result = "()")]
pub struct ShutdownTrigger(std::sync::mpsc::Sender<String>);
impl ShutdownTrigger {
pub fn peel(self) -> std::sync::mpsc::Sender<String> {
self.0
}
}

View file

@ -1,6 +1,6 @@
// //
use actix::prelude::*; use actix::prelude::*;
use messages::{ReceiveAppConfig, ServerUpdate}; use messages::{ReceiveAppConfig, ServerUpdate, Shutdown};
use tracing::error; use tracing::error;
#[cfg(test)] #[cfg(test)]
@ -58,6 +58,7 @@ pub struct ServerActor {
sleep_duration: std::time::Duration, sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>, repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
shutdown_trigger: Option<std::sync::mpsc::Sender<String>>,
subscribers: Vec<Recipient<ServerUpdate>>, subscribers: Vec<Recipient<ServerUpdate>>,
// testing // testing
@ -84,6 +85,7 @@ impl ServerActor {
net, net,
alerts, alerts,
repository_factory: repo, repository_factory: repo,
shutdown_trigger: None,
subscribers: Vec::default(), subscribers: Vec::default(),
sleep_duration, sleep_duration,
repo_actors: BTreeMap::new(), repo_actors: BTreeMap::new(),
@ -228,10 +230,15 @@ impl ServerActor {
} }
/// Attempts to gracefully shutdown the server before stopping the system. /// Attempts to gracefully shutdown the server before stopping the system.
fn abort(&self, ctx: &<Self as actix::Actor>::Context, message: impl Into<String>) { fn abort(&mut self, ctx: &<Self as actix::Actor>::Context, message: impl Into<String>) {
tracing::error!("Aborting: {}", message.into());
self.do_send(crate::server::actor::messages::Shutdown, ctx); self.do_send(crate::server::actor::messages::Shutdown, ctx);
System::current().stop_with_code(1); if let Some(t) = self.shutdown_trigger.take() {
let _ = t.send(message.into());
} else {
error!("{}", message.into());
self.do_send(Shutdown, ctx);
// System::current().stop_with_code(1);
}
} }
fn do_send<M>(&self, msg: M, ctx: &<Self as actix::Actor>::Context) fn do_send<M>(&self, msg: M, ctx: &<Self as actix::Actor>::Context)

View file

@ -6,6 +6,7 @@ mod tests;
use actix::prelude::*; use actix::prelude::*;
use actix_rt::signal; use actix_rt::signal;
use actor::messages::ShutdownTrigger;
use crate::{ use crate::{
alerts::{AlertsActor, History}, alerts::{AlertsActor, History},
@ -23,7 +24,7 @@ use tracing::{error, info};
use std::{ use std::{
path::PathBuf, path::PathBuf,
sync::{atomic::Ordering, Arc, RwLock}, sync::{atomic::Ordering, mpsc::channel, Arc, RwLock},
time::Duration, time::Duration,
}; };
@ -61,8 +62,10 @@ pub fn start(
init_logging(); init_logging();
} }
let file_watcher_err_channel: Arc<RwLock<Option<anyhow::Error>>> = Arc::new(RwLock::new(None)); let shutdown_message_holder: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None));
let file_watcher_err_channel_exec = file_watcher_err_channel.clone(); let shutdown_message_holder_exec = shutdown_message_holder.clone();
let file_watcher_err_holder: Arc<RwLock<Option<anyhow::Error>>> = Arc::new(RwLock::new(None));
let file_watcher_err_holder_exec = file_watcher_err_holder.clone();
let execution = async move { let execution = async move {
info!("Starting Alert Dispatcher..."); info!("Starting Alert Dispatcher...");
let alerts_addr = AlertsActor::new(None, History::new(A_DAY), net.clone()).start(); let alerts_addr = AlertsActor::new(None, History::new(A_DAY), net.clone()).start();
@ -80,37 +83,51 @@ pub fn start(
server.do_send(crate::server::actor::messages::Shutdown); server.do_send(crate::server::actor::messages::Shutdown);
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await; actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;
System::current().stop(); System::current().stop();
let _ = file_watcher_err_channel_exec let _ = file_watcher_err_holder_exec
.write() .write()
.map(|mut o| o.replace(err)); .map(|mut o| o.replace(err));
return; return;
} }
}; };
let (tx_shutdown, rx_shutdown) = channel::<String>();
if ui { if ui {
#[cfg(feature = "tui")] #[cfg(feature = "tui")]
{ {
use crate::server::actor::messages::SubscribeToUpdates; use crate::server::actor::messages::SubscribeToUpdates;
use crate::tui; use crate::tui;
use std::sync::mpsc::channel;
let (tx_shutdown, rx_shutdown) = channel::<()>(); let (tx_shutdown, rx_shutdown) = channel::<String>();
let tui_addr = tui::Tui::new(tx_shutdown).start(); let tui_addr = tui::Tui::new(tx_shutdown.clone()).start();
server.do_send(SubscribeToUpdates::new(tui_addr.clone().recipient())); server.do_send(SubscribeToUpdates::new(tui_addr.clone().recipient()));
server.do_send(ShutdownTrigger::new(tx_shutdown));
server.do_send(FileUpdated); // update file after ui subscription in place server.do_send(FileUpdated); // update file after ui subscription in place
loop { loop {
let _ = tui_addr.send(tui::Tick).await; let _ = tui_addr.send(tui::Tick).await;
if rx_shutdown.try_recv().is_ok() { if let Ok(message) = rx_shutdown.try_recv() {
let _ = shutdown_message_holder_exec
.write()
.map(|mut o| o.replace(message));
break; break;
} }
actix_rt::time::sleep(Duration::from_millis(16)).await; actix_rt::time::sleep(Duration::from_millis(16)).await;
} }
} }
} else { } else {
server.do_send(ShutdownTrigger::new(tx_shutdown.clone()));
server.do_send(FileUpdated); server.do_send(FileUpdated);
info!("Server running - Press Ctrl-C to stop..."); info!("Server running - Press Ctrl-C to stop...");
let _ = signal::ctrl_c().await;
info!("Ctrl-C received, shutting down..."); actix_rt::spawn(async move {
let _ = signal::ctrl_c().await;
info!("Ctrl-C received, shutting down...");
let _ = tx_shutdown.send(String::new());
});
if let Ok(message) = rx_shutdown.try_recv() {
let _ = shutdown_message_holder_exec
.write()
.map(|mut o| o.replace(message));
}
} }
// shutdown // shutdown
@ -124,10 +141,24 @@ pub fn start(
Arbiter::current().spawn(execution); Arbiter::current().spawn(execution);
system.run()?; system.run()?;
// check for error from server thread
#[allow(clippy::unwrap_used)]
if let Some(err) = &*shutdown_message_holder.read().unwrap() {
#[cfg(feature = "tui")]
if ui {
ratatui::restore();
eprintln!("Server: {err:?}");
}
error!(?err, "server");
return Err(color_eyre::eyre::eyre!(format!("{err}")));
}
// check for error from file watcher thread // check for error from file watcher thread
#[allow(clippy::unwrap_used)] #[allow(clippy::unwrap_used)]
if let Some(err) = &*file_watcher_err_channel.read().unwrap() { if let Some(err) = &*file_watcher_err_holder.read().unwrap() {
#[cfg(feature = "tui")]
if ui { if ui {
ratatui::restore();
eprintln!("File Watcher: {err:?}"); eprintln!("File Watcher: {err:?}");
} }
error!(?err, "file watcher"); error!(?err, "file watcher");

View file

@ -18,7 +18,7 @@ use tui_scrollview::ScrollViewState;
#[derive(Debug)] #[derive(Debug)]
pub struct Tui { pub struct Tui {
terminal: Option<DefaultTerminal>, terminal: Option<DefaultTerminal>,
signal_shutdown: Sender<()>, signal_shutdown: Sender<String>,
pub state: State, pub state: State,
scroll_view_state: ScrollViewState, scroll_view_state: ScrollViewState,
} }
@ -33,7 +33,7 @@ impl Actor for Tui {
} }
} }
impl Tui { impl Tui {
pub fn new(signal_shutdown: Sender<()>) -> Self { pub fn new(signal_shutdown: Sender<String>) -> Self {
Self { Self {
terminal: None, terminal: None,
signal_shutdown, signal_shutdown,
@ -68,7 +68,7 @@ impl Tui {
match key.code { match key.code {
KeyCode::Char('q') => { KeyCode::Char('q') => {
ctx.stop(); ctx.stop();
if let Err(err) = self.signal_shutdown.send(()) { if let Err(err) = self.signal_shutdown.send(String::new()) {
tracing::error!(?err, "Failed to signal shutdown"); tracing::error!(?err, "Failed to signal shutdown");
} }
} }