From 42a2b01ca7dd960272f6c3559e0f3be978528d06 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Mon, 2 Sep 2024 09:14:08 +0100 Subject: [PATCH] WIP: shutdown on file parse error --- crates/cli/src/server/actor/handlers/mod.rs | 1 + .../server/actor/handlers/shutdown_trigger.rs | 12 +++++ crates/cli/src/server/actor/messages.rs | 10 ++++ crates/cli/src/server/actor/mod.rs | 15 ++++-- crates/cli/src/server/mod.rs | 50 +++++++++++++++---- crates/cli/src/tui/actor/mod.rs | 6 +-- 6 files changed, 77 insertions(+), 17 deletions(-) create mode 100644 crates/cli/src/server/actor/handlers/shutdown_trigger.rs diff --git a/crates/cli/src/server/actor/handlers/mod.rs b/crates/cli/src/server/actor/handlers/mod.rs index a5e6358..4a5aea3 100644 --- a/crates/cli/src/server/actor/handlers/mod.rs +++ b/crates/cli/src/server/actor/handlers/mod.rs @@ -3,4 +3,5 @@ mod receive_app_config; mod receive_valid_app_config; mod server_update; mod shutdown; +mod shutdown_trigger; mod subscribe_updates; diff --git a/crates/cli/src/server/actor/handlers/shutdown_trigger.rs b/crates/cli/src/server/actor/handlers/shutdown_trigger.rs new file mode 100644 index 0000000..7469583 --- /dev/null +++ b/crates/cli/src/server/actor/handlers/shutdown_trigger.rs @@ -0,0 +1,12 @@ +// +use actix::Handler; + +use crate::server::{actor::messages::ShutdownTrigger, ServerActor}; + +impl Handler for ServerActor { + type Result = (); + + fn handle(&mut self, msg: ShutdownTrigger, _ctx: &mut Self::Context) -> Self::Result { + self.shutdown_trigger.replace(msg.peel()); + } +} diff --git a/crates/cli/src/server/actor/messages.rs b/crates/cli/src/server/actor/messages.rs index ed5503c..a6ba308 100644 --- a/crates/cli/src/server/actor/messages.rs +++ b/crates/cli/src/server/actor/messages.rs @@ -99,3 +99,13 @@ message!( Recipient, "Subscribe to receive updates from the server" ); + +/// Sends a channel to be used to shutdown the server +#[derive(Message, Constructor)] +#[rtype(result = "()")] +pub struct ShutdownTrigger(std::sync::mpsc::Sender); +impl ShutdownTrigger { + pub fn peel(self) -> std::sync::mpsc::Sender { + self.0 + } +} diff --git a/crates/cli/src/server/actor/mod.rs b/crates/cli/src/server/actor/mod.rs index 74eefab..b7d0b0e 100644 --- a/crates/cli/src/server/actor/mod.rs +++ b/crates/cli/src/server/actor/mod.rs @@ -1,6 +1,6 @@ // use actix::prelude::*; -use messages::{ReceiveAppConfig, ServerUpdate}; +use messages::{ReceiveAppConfig, ServerUpdate, Shutdown}; use tracing::error; #[cfg(test)] @@ -58,6 +58,7 @@ pub struct ServerActor { sleep_duration: std::time::Duration, repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr>, + shutdown_trigger: Option>, subscribers: Vec>, // testing @@ -84,6 +85,7 @@ impl ServerActor { net, alerts, repository_factory: repo, + shutdown_trigger: None, subscribers: Vec::default(), sleep_duration, repo_actors: BTreeMap::new(), @@ -228,10 +230,15 @@ impl ServerActor { } /// Attempts to gracefully shutdown the server before stopping the system. - fn abort(&self, ctx: &::Context, message: impl Into) { - tracing::error!("Aborting: {}", message.into()); + fn abort(&mut self, ctx: &::Context, message: impl Into) { self.do_send(crate::server::actor::messages::Shutdown, ctx); - System::current().stop_with_code(1); + if let Some(t) = self.shutdown_trigger.take() { + let _ = t.send(message.into()); + } else { + error!("{}", message.into()); + self.do_send(Shutdown, ctx); + // System::current().stop_with_code(1); + } } fn do_send(&self, msg: M, ctx: &::Context) diff --git a/crates/cli/src/server/mod.rs b/crates/cli/src/server/mod.rs index c63a781..a05be17 100644 --- a/crates/cli/src/server/mod.rs +++ b/crates/cli/src/server/mod.rs @@ -6,6 +6,7 @@ mod tests; use actix::prelude::*; use actix_rt::signal; +use actor::messages::ShutdownTrigger; use crate::{ alerts::{AlertsActor, History}, @@ -61,8 +62,10 @@ pub fn start( init_logging(); } - let file_watcher_err_channel: Arc>> = Arc::new(RwLock::new(None)); - let file_watcher_err_channel_exec = file_watcher_err_channel.clone(); + let shutdown_message_holder: Arc>> = Arc::new(RwLock::new(None)); + let shutdown_message_holder_exec = shutdown_message_holder.clone(); + let file_watcher_err_holder: Arc>> = Arc::new(RwLock::new(None)); + let file_watcher_err_holder_exec = file_watcher_err_holder.clone(); let execution = async move { info!("Starting Alert Dispatcher..."); let alerts_addr = AlertsActor::new(None, History::new(A_DAY), net.clone()).start(); @@ -80,37 +83,52 @@ pub fn start( server.do_send(crate::server::actor::messages::Shutdown); actix_rt::time::sleep(std::time::Duration::from_millis(10)).await; System::current().stop(); - let _ = file_watcher_err_channel_exec + let _ = file_watcher_err_holder_exec .write() .map(|mut o| o.replace(err)); return; } }; + use std::sync::mpsc::channel; + let (tx_shutdown, rx_shutdown) = channel::(); if ui { #[cfg(feature = "tui")] { use crate::server::actor::messages::SubscribeToUpdates; use crate::tui; - use std::sync::mpsc::channel; - let (tx_shutdown, rx_shutdown) = channel::<()>(); - let tui_addr = tui::Tui::new(tx_shutdown).start(); + let (tx_shutdown, rx_shutdown) = channel::(); + let tui_addr = tui::Tui::new(tx_shutdown.clone()).start(); server.do_send(SubscribeToUpdates::new(tui_addr.clone().recipient())); + server.do_send(ShutdownTrigger::new(tx_shutdown)); server.do_send(FileUpdated); // update file after ui subscription in place loop { let _ = tui_addr.send(tui::Tick).await; - if rx_shutdown.try_recv().is_ok() { + if let Ok(message) = rx_shutdown.try_recv() { + let _ = shutdown_message_holder_exec + .write() + .map(|mut o| o.replace(message)); break; } actix_rt::time::sleep(Duration::from_millis(16)).await; } } } else { + server.do_send(ShutdownTrigger::new(tx_shutdown.clone())); server.do_send(FileUpdated); info!("Server running - Press Ctrl-C to stop..."); - let _ = signal::ctrl_c().await; - info!("Ctrl-C received, shutting down..."); + + actix_rt::spawn(async move { + let _ = signal::ctrl_c().await; + info!("Ctrl-C received, shutting down..."); + let _ = tx_shutdown.send(String::new()); + }); + if let Ok(message) = rx_shutdown.try_recv() { + let _ = shutdown_message_holder_exec + .write() + .map(|mut o| o.replace(message)); + } } // shutdown @@ -124,10 +142,22 @@ pub fn start( Arbiter::current().spawn(execution); system.run()?; + // check for error from server thread + #[allow(clippy::unwrap_used)] + if let Some(err) = &*shutdown_message_holder.read().unwrap() { + if ui { + ratatui::restore(); + eprintln!("Server: {err:?}"); + } + error!(?err, "server"); + return Err(color_eyre::eyre::eyre!(format!("{err}"))); + } + // check for error from file watcher thread #[allow(clippy::unwrap_used)] - if let Some(err) = &*file_watcher_err_channel.read().unwrap() { + if let Some(err) = &*file_watcher_err_holder.read().unwrap() { if ui { + ratatui::restore(); eprintln!("File Watcher: {err:?}"); } error!(?err, "file watcher"); diff --git a/crates/cli/src/tui/actor/mod.rs b/crates/cli/src/tui/actor/mod.rs index a156031..848c8f8 100644 --- a/crates/cli/src/tui/actor/mod.rs +++ b/crates/cli/src/tui/actor/mod.rs @@ -18,7 +18,7 @@ use tui_scrollview::ScrollViewState; #[derive(Debug)] pub struct Tui { terminal: Option, - signal_shutdown: Sender<()>, + signal_shutdown: Sender, pub state: State, scroll_view_state: ScrollViewState, } @@ -33,7 +33,7 @@ impl Actor for Tui { } } impl Tui { - pub fn new(signal_shutdown: Sender<()>) -> Self { + pub fn new(signal_shutdown: Sender) -> Self { Self { terminal: None, signal_shutdown, @@ -68,7 +68,7 @@ impl Tui { match key.code { KeyCode::Char('q') => { ctx.stop(); - if let Err(err) = self.signal_shutdown.send(()) { + if let Err(err) = self.signal_shutdown.send(String::new()) { tracing::error!(?err, "Failed to signal shutdown"); } }