git-next/crates/cli/src/root.rs
Paul Campbell b7aa231925
All checks were successful
Rust / build (map[name:nightly]) (push) Successful in 7m32s
Rust / build (map[name:stable]) (push) Successful in 14m44s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
Release Please / Release-plz (push) Successful in 1m14s
feat: switch to kameo actor system (dropping actix)
2024-11-29 09:33:36 +00:00

221 lines
6.4 KiB
Rust

//
// Root actor for all other actors - supervises them all
use std::time::Duration;
use color_eyre::Result;
use derive_more::derive::Constructor;
use git_next_core::{git::RepositoryFactory, s};
use kameo::{
actor::{pubsub::PubSub, ActorRef},
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
Actor,
};
use kxio::{fs::FileSystem, net::Net};
#[cfg(feature = "tui")]
use crate::tui::Tui;
use crate::{
alerts::{AlertsActor, History},
base_actor::BaseActor,
default_on_actor_panic, default_on_actor_start,
file_watcher::{FileUpdated, FileWatcherActor},
on_actor_link_died, on_actor_stop, publish,
server::{actor::messages::ServerUpdate, ServerActor},
subscribe, MessageBus,
};
#[derive(Debug)]
pub struct RootActor {
ui: bool,
fs: FileSystem,
net: Net,
sleep_duration: std::time::Duration,
tx_shutdown: tokio::sync::mpsc::Sender<String>,
alerts_actor_ref: Option<ActorRef<AlertsActor>>,
server_updates_bus: Option<MessageBus<ServerUpdate>>,
server_actor_ref: Option<ActorRef<ServerActor>>,
file_updates_bus: Option<MessageBus<FileUpdated>>,
file_watcher_actor_ref: Option<ActorRef<FileWatcherActor>>,
#[cfg(feature = "tui")]
tui_actor_ref: Option<ActorRef<Tui>>,
}
impl RootActor {
pub const fn new(
ui: bool,
fs: FileSystem,
net: Net,
sleep_duration: std::time::Duration,
tx_shutdown: tokio::sync::mpsc::Sender<String>,
) -> Self {
Self {
ui,
fs,
net,
sleep_duration,
tx_shutdown,
alerts_actor_ref: None,
server_updates_bus: None,
server_actor_ref: None,
file_updates_bus: None,
file_watcher_actor_ref: None,
#[cfg(feature = "tui")]
tui_actor_ref: None,
}
}
}
const A_DAY: Duration = Duration::from_secs(24 * 60 * 60);
type RootContext<'a> = Context<'a, RootActor, Result<()>>;
#[derive(Constructor, Debug)]
pub struct Start {
repo: Box<dyn RepositoryFactory>,
}
impl Message<Start> for RootActor {
type Reply = Result<()>;
async fn handle(&mut self, msg: Start, ctx: RootContext<'_>) -> Self::Reply {
let alerts_actor_ref = self.start_alerts_actor(&ctx).await?;
let server_updates_bus = self.start_server_updates_bus(&ctx).await;
let file_updates_bus = self.start_file_updates_bus(&ctx).await;
self.start_server_actor(
&ctx,
alerts_actor_ref,
server_updates_bus.clone(),
file_updates_bus.clone(),
msg.repo,
)
.await?;
self.start_file_watcher_actor(&ctx, file_updates_bus.clone())
.await;
#[cfg(feature = "tui")]
if self.ui {
self.start_tui_actor(&ctx, server_updates_bus).await?;
}
// trigger initial config file to load
publish!(file_updates_bus, FileUpdated)?;
Ok(())
}
}
impl RootActor {
async fn start_alerts_actor(&mut self, ctx: &RootContext<'_>) -> Result<ActorRef<AlertsActor>> {
let actor_ref = AlertsActor::new(None, History::new(A_DAY), self.net.clone())
.spawn(ctx.actor_ref())
.await;
self.alerts_actor_ref.replace(actor_ref.clone());
Ok(actor_ref)
}
async fn start_file_watcher_actor(
&mut self,
ctx: &RootContext<'_>,
file_updates_bus: MessageBus<FileUpdated>,
) -> ActorRef<FileWatcherActor> {
let actor_ref = FileWatcherActor::new(
file_updates_bus,
self.fs.base().join("git-next-server.toml"),
)
.spawn_in_thread(ctx.actor_ref())
.await;
self.file_watcher_actor_ref.replace(actor_ref.clone());
actor_ref
}
async fn start_server_updates_bus(
&mut self,
ctx: &RootContext<'_>,
) -> MessageBus<ServerUpdate> {
let actor_ref = PubSub::<ServerUpdate>::new().spawn(ctx.actor_ref()).await;
self.server_updates_bus.replace(actor_ref.clone());
actor_ref
}
async fn start_file_updates_bus(&mut self, ctx: &RootContext<'_>) -> MessageBus<FileUpdated> {
let actor_ref = PubSub::<FileUpdated>::new().spawn(ctx.actor_ref()).await;
self.file_updates_bus.replace(actor_ref.clone());
actor_ref
}
async fn start_server_actor(
&mut self,
ctx: &RootContext<'_>,
alerts_actor_ref: ActorRef<AlertsActor>,
server_updates_bus: MessageBus<ServerUpdate>,
file_updates_bus: MessageBus<FileUpdated>,
repo: Box<dyn RepositoryFactory>,
) -> Result<ActorRef<ServerActor>> {
let actor_ref = ServerActor::new(
self.ui,
self.fs.clone(),
self.net.clone(),
alerts_actor_ref,
server_updates_bus,
repo,
self.sleep_duration,
)
.spawn(ctx.actor_ref())
.await;
subscribe!(file_updates_bus, "server", actor_ref.clone())?;
self.server_actor_ref.replace(actor_ref.clone());
Ok(actor_ref)
}
#[cfg(feature = "tui")]
async fn start_tui_actor(
&mut self,
ctx: &RootContext<'_>,
server_updates_bus: MessageBus<ServerUpdate>,
) -> Result<ActorRef<Tui>> {
let actor_ref = Tui::new().spawn_in_thread(ctx.actor_ref()).await;
self.tui_actor_ref.replace(actor_ref.clone());
subscribe!(server_updates_bus, actor_ref.clone())?;
Ok(actor_ref)
}
}
impl Actor for RootActor {
type Mailbox = UnboundedMailbox<Self>;
default_on_actor_start!(this, actor_ref);
default_on_actor_panic!(this, actor_ref, err);
on_actor_link_died!(this, actor_ref, id, reason, {
this.tx_shutdown.send(s!("link died")).await?;
match &reason {
kameo::error::ActorStopReason::Normal => Ok(None),
kameo::error::ActorStopReason::Killed
| kameo::error::ActorStopReason::Panicked(_)
| kameo::error::ActorStopReason::LinkDied { .. } => {
Ok(Some(kameo::error::ActorStopReason::LinkDied {
id,
reason: Box::new(reason),
}))
}
}
});
on_actor_stop!(this, actor_ref, reason, {
#[allow(clippy::expect_used)]
this.tx_shutdown
.send(s!("stopping"))
.await
.expect("send shutdown");
Ok(())
});
}