// // Root actor for all other actors - supervises them all use std::time::Duration; use color_eyre::Result; use derive_more::derive::Constructor; use git_next_core::{git::RepositoryFactory, s}; use kameo::{ actor::{pubsub::PubSub, ActorRef}, mailbox::unbounded::UnboundedMailbox, message::{Context, Message}, Actor, }; use kxio::{fs::FileSystem, net::Net}; #[cfg(feature = "tui")] use crate::tui::Tui; use crate::{ alerts::{AlertsActor, History}, base_actor::BaseActor, default_on_actor_panic, default_on_actor_start, file_watcher::{FileUpdated, FileWatcherActor}, on_actor_link_died, on_actor_stop, publish, server::{actor::messages::ServerUpdate, ServerActor}, subscribe, MessageBus, }; #[derive(Debug)] pub struct RootActor { ui: bool, fs: FileSystem, net: Net, sleep_duration: std::time::Duration, tx_shutdown: tokio::sync::mpsc::Sender, alerts_actor_ref: Option>, server_updates_bus: Option>, server_actor_ref: Option>, file_updates_bus: Option>, file_watcher_actor_ref: Option>, #[cfg(feature = "tui")] tui_actor_ref: Option>, } impl RootActor { pub const fn new( ui: bool, fs: FileSystem, net: Net, sleep_duration: std::time::Duration, tx_shutdown: tokio::sync::mpsc::Sender, ) -> Self { Self { ui, fs, net, sleep_duration, tx_shutdown, alerts_actor_ref: None, server_updates_bus: None, server_actor_ref: None, file_updates_bus: None, file_watcher_actor_ref: None, #[cfg(feature = "tui")] tui_actor_ref: None, } } } const A_DAY: Duration = Duration::from_secs(24 * 60 * 60); type RootContext<'a> = Context<'a, RootActor, Result<()>>; #[derive(Constructor, Debug)] pub struct Start { repo: Box, } impl Message for RootActor { type Reply = Result<()>; async fn handle(&mut self, msg: Start, ctx: RootContext<'_>) -> Self::Reply { let alerts_actor_ref = self.start_alerts_actor(&ctx).await?; let server_updates_bus = self.start_server_updates_bus(&ctx).await; let file_updates_bus = self.start_file_updates_bus(&ctx).await; self.start_server_actor( &ctx, alerts_actor_ref, server_updates_bus.clone(), file_updates_bus.clone(), msg.repo, ) .await?; self.start_file_watcher_actor(&ctx, file_updates_bus.clone()) .await; #[cfg(feature = "tui")] if self.ui { self.start_tui_actor(&ctx, server_updates_bus).await?; } // trigger initial config file to load publish!(file_updates_bus, FileUpdated)?; Ok(()) } } impl RootActor { async fn start_alerts_actor(&mut self, ctx: &RootContext<'_>) -> Result> { let actor_ref = AlertsActor::new(None, History::new(A_DAY), self.net.clone()) .spawn(ctx.actor_ref()) .await; self.alerts_actor_ref.replace(actor_ref.clone()); Ok(actor_ref) } async fn start_file_watcher_actor( &mut self, ctx: &RootContext<'_>, file_updates_bus: MessageBus, ) -> ActorRef { let actor_ref = FileWatcherActor::new( file_updates_bus, self.fs.base().join("git-next-server.toml"), ) .spawn_in_thread(ctx.actor_ref()) .await; self.file_watcher_actor_ref.replace(actor_ref.clone()); actor_ref } async fn start_server_updates_bus( &mut self, ctx: &RootContext<'_>, ) -> MessageBus { let actor_ref = PubSub::::new().spawn(ctx.actor_ref()).await; self.server_updates_bus.replace(actor_ref.clone()); actor_ref } async fn start_file_updates_bus(&mut self, ctx: &RootContext<'_>) -> MessageBus { let actor_ref = PubSub::::new().spawn(ctx.actor_ref()).await; self.file_updates_bus.replace(actor_ref.clone()); actor_ref } async fn start_server_actor( &mut self, ctx: &RootContext<'_>, alerts_actor_ref: ActorRef, server_updates_bus: MessageBus, file_updates_bus: MessageBus, repo: Box, ) -> Result> { let actor_ref = ServerActor::new( self.ui, self.fs.clone(), self.net.clone(), alerts_actor_ref, server_updates_bus, repo, self.sleep_duration, ) .spawn(ctx.actor_ref()) .await; subscribe!(file_updates_bus, "server", actor_ref.clone())?; self.server_actor_ref.replace(actor_ref.clone()); Ok(actor_ref) } #[cfg(feature = "tui")] async fn start_tui_actor( &mut self, ctx: &RootContext<'_>, server_updates_bus: MessageBus, ) -> Result> { let actor_ref = Tui::new().spawn_in_thread(ctx.actor_ref()).await; self.tui_actor_ref.replace(actor_ref.clone()); subscribe!(server_updates_bus, actor_ref.clone())?; Ok(actor_ref) } } impl Actor for RootActor { type Mailbox = UnboundedMailbox; default_on_actor_start!(this, actor_ref); default_on_actor_panic!(this, actor_ref, err); on_actor_link_died!(this, actor_ref, id, reason, { this.tx_shutdown.send(s!("link died")).await?; match &reason { kameo::error::ActorStopReason::Normal => Ok(None), kameo::error::ActorStopReason::Killed | kameo::error::ActorStopReason::Panicked(_) | kameo::error::ActorStopReason::LinkDied { .. } => { Ok(Some(kameo::error::ActorStopReason::LinkDied { id, reason: Box::new(reason), })) } } }); on_actor_stop!(this, actor_ref, reason, { #[allow(clippy::expect_used)] this.tx_shutdown .send(s!("stopping")) .await .expect("send shutdown"); Ok(()) }); }