diff --git a/Cargo.toml b/Cargo.toml index ab594cc..d94bb98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,9 @@ warp = "0.3" derive_more = { version = "1.0.0-beta.6", features = ["from", "display"] } terrors = "0.3" +# file watcher +inotify = "0.10" + # Actors actix = "0.13" actix-rt = "2.9" diff --git a/src/server/actors/file_watcher.rs b/src/server/actors/file_watcher.rs new file mode 100644 index 0000000..8d616f7 --- /dev/null +++ b/src/server/actors/file_watcher.rs @@ -0,0 +1,84 @@ +use actix::prelude::*; + +use actix::Recipient; +use inotify::{EventMask, Inotify, WatchMask}; +use std::{path::PathBuf, time::Duration}; +use tracing::{debug, info}; + +const CHECK_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(Debug, Clone)] +pub struct WatchFile; +impl Message for WatchFile { + type Result = (); +} + +#[derive(Debug)] +pub struct FileUpdated; +impl Message for FileUpdated { + type Result = (); +} + +#[derive(Debug, derive_more::From, derive_more::Display)] +pub enum Error { + Io(std::io::Error), +} +impl std::error::Error for Error {} + +pub struct FileWatcher { + path: PathBuf, + inotify: Inotify, + recipient: Recipient, + run_interval: Option, +} +impl FileWatcher { + pub fn new(path: PathBuf, recipient: Recipient) -> Result { + let inotify = Inotify::init()?; + inotify.watches().add( + path.clone(), + WatchMask::MODIFY | WatchMask::CREATE | WatchMask::DELETE, + )?; + Ok(Self { + path, + inotify, + recipient, + run_interval: None, + }) + } +} +impl Actor for FileWatcher { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + info!("Starting file watcher actor"); + self.run_interval + .replace(ctx.run_interval(CHECK_INTERVAL, |_act, ctx| { + ctx.notify(WatchFile); + })); + } +} + +impl Handler for FileWatcher { + type Result = (); + + fn handle(&mut self, _msg: WatchFile, _ctx: &mut Self::Context) -> Self::Result { + debug!("Watching {} for activity...", self.path.display()); + let mut buffer = [0u8; 4096]; + if let Ok(mut events) = self.inotify.read_events(&mut buffer) { + if events.any(|event| event.mask.contains(EventMask::MODIFY)) { + info!("File modified"); + self.recipient.do_send(FileUpdated); + }; + } + } +} +// impl Handler for FileWatcher { +// type Result = anyhow::Result<()>; +// +// fn handle(&mut self, _msg: Stop, ctx: &mut Self::Context) -> Self::Result { +// warn!("Stopping file watcher actor"); +// self.run_interval.take(); +// ctx.stop(); +// Ok(()) +// } +// } diff --git a/src/server/actors/mod.rs b/src/server/actors/mod.rs index ab61cf2..92309df 100644 --- a/src/server/actors/mod.rs +++ b/src/server/actors/mod.rs @@ -1,3 +1,4 @@ +pub mod file_watcher; pub mod repo; pub mod server; pub mod webhook; diff --git a/src/server/actors/server.rs b/src/server/actors/server.rs index 3361217..819c532 100644 --- a/src/server/actors/server.rs +++ b/src/server/actors/server.rs @@ -7,8 +7,9 @@ use tracing::{error, info}; use crate::server::{ actors::{ + file_watcher::FileUpdated, repo::{CloneRepo, RepoActor}, - webhook::{AddWebhookRecipient, WebhookActor, WebhookRouter}, + webhook::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter}, }, config::{ ForgeConfig, ForgeName, GitDir, RepoAlias, RepoDetails, ServerConfig, ServerRepoConfig, @@ -35,17 +36,36 @@ type Result = core::result::Result; pub struct Server { generation: ServerGeneration, + webhook: Option>, fs: FileSystem, net: Network, } impl Actor for Server { type Context = Context; } +impl Handler for Server { + type Result = (); + + fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result { + let server_config = match ServerConfig::load(&self.fs) { + Ok(server_config) => server_config, + Err(err) => { + error!("Failed to load config file. Error: {}", err); + return; + } + }; + ctx.notify(server_config); + } +} impl Handler for Server { type Result = (); #[allow(clippy::cognitive_complexity)] // TODO: (#75) reduce complexity fn handle(&mut self, msg: ServerConfig, _ctx: &mut Self::Context) -> Self::Result { + if let Some(webhook) = self.webhook.take() { + webhook.do_send(ShutdownWebhook); + } + self.generation.inc(); let server_config = msg; // Server Storage let dir = server_config.storage().path(); @@ -83,13 +103,16 @@ impl Handler for Server { .for_each(|msg| webhook_router.do_send(msg)); } - WebhookActor::new(webhook_router.recipient()).start(); + let webhook = WebhookActor::new(webhook_router.recipient()).start(); + self.webhook.replace(webhook); } } impl Server { - pub const fn new(generation: ServerGeneration, fs: FileSystem, net: Network) -> Self { + pub fn new(fs: FileSystem, net: Network) -> Self { + let generation = ServerGeneration::new(); Self { generation, + webhook: None, fs, net, } diff --git a/src/server/actors/webhook/mod.rs b/src/server/actors/webhook/mod.rs index f90d271..37f256d 100644 --- a/src/server/actors/webhook/mod.rs +++ b/src/server/actors/webhook/mod.rs @@ -37,3 +37,17 @@ impl Actor for WebhookActor { self.spawn_handle.replace(spawn_handle); } } + +#[derive(Debug)] +pub struct ShutdownWebhook; +impl Message for ShutdownWebhook { + type Result = (); +} +impl Handler for WebhookActor { + type Result = (); + + fn handle(&mut self, _msg: ShutdownWebhook, ctx: &mut Self::Context) -> Self::Result { + self.spawn_handle.take(); + ctx.stop(); + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 418d211..20b0e23 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -13,7 +13,10 @@ use tracing::{error, info, level_filters::LevelFilter}; use crate::{ fs::FileSystem, - server::{actors::server::Server, types::ServerGeneration}, + server::actors::{ + file_watcher::{self, FileUpdated}, + server::Server, + }, }; pub fn init(fs: FileSystem) { @@ -40,22 +43,21 @@ pub fn init(fs: FileSystem) { pub async fn start(fs: FileSystem, net: Network) { init_logging(); - let generation = ServerGeneration::new(); - { - let span = tracing::info_span!("Server", %generation); - let _guard = span.enter(); - info!("Starting Server..."); - let server_config = match config::ServerConfig::load(&fs) { - Ok(server_config) => server_config, - Err(err) => { - error!("Failed to load config file. Error: {}", err); - return; - } - }; - let server = Server::new(generation, fs.clone(), net.clone()).start(); - server.do_send(server_config); - } + info!("Starting Server..."); + let server = Server::new(fs.clone(), net.clone()).start(); + server.do_send(FileUpdated); + + info!("Starting File Watcher..."); + let fw = match file_watcher::FileWatcher::new("git-next-server.toml".into(), server.recipient()) + { + Ok(fw) => fw, + Err(err) => { + error!(?err, "Failed to start file watcher"); + return; + } + }; + fw.start(); info!("Server running - Press Ctrl-C to stop..."); let _ = actix_rt::signal::ctrl_c().await; diff --git a/src/server/types.rs b/src/server/types.rs index dd7b0b3..681e267 100644 --- a/src/server/types.rs +++ b/src/server/types.rs @@ -42,8 +42,8 @@ impl ServerGeneration { pub fn new() -> Self { Self::default() } - pub const fn next(&self) -> Self { - Self(self.0 + 1) + pub fn inc(&mut self) { + self.0 += 1 } } impl std::fmt::Display for ServerGeneration {