feat(server): restart server when config file is updated

Closes kemitix/git-next#59
This commit is contained in:
Paul Campbell 2024-05-07 19:32:15 +01:00
parent 7e79f4877a
commit 6d147efe32
7 changed files with 148 additions and 21 deletions

View file

@ -58,6 +58,9 @@ warp = "0.3"
derive_more = { version = "1.0.0-beta.6", features = ["from", "display"] } derive_more = { version = "1.0.0-beta.6", features = ["from", "display"] }
terrors = "0.3" terrors = "0.3"
# file watcher
inotify = "0.10"
# Actors # Actors
actix = "0.13" actix = "0.13"
actix-rt = "2.9" actix-rt = "2.9"

View file

@ -0,0 +1,84 @@
use actix::prelude::*;
use actix::Recipient;
use inotify::{EventMask, Inotify, WatchMask};
use std::{path::PathBuf, time::Duration};
use tracing::{debug, info};
const CHECK_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Debug, Clone)]
pub struct WatchFile;
impl Message for WatchFile {
type Result = ();
}
#[derive(Debug)]
pub struct FileUpdated;
impl Message for FileUpdated {
type Result = ();
}
#[derive(Debug, derive_more::From, derive_more::Display)]
pub enum Error {
Io(std::io::Error),
}
impl std::error::Error for Error {}
pub struct FileWatcher {
path: PathBuf,
inotify: Inotify,
recipient: Recipient<FileUpdated>,
run_interval: Option<SpawnHandle>,
}
impl FileWatcher {
pub fn new(path: PathBuf, recipient: Recipient<FileUpdated>) -> Result<Self, Error> {
let inotify = Inotify::init()?;
inotify.watches().add(
path.clone(),
WatchMask::MODIFY | WatchMask::CREATE | WatchMask::DELETE,
)?;
Ok(Self {
path,
inotify,
recipient,
run_interval: None,
})
}
}
impl Actor for FileWatcher {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Starting file watcher actor");
self.run_interval
.replace(ctx.run_interval(CHECK_INTERVAL, |_act, ctx| {
ctx.notify(WatchFile);
}));
}
}
impl Handler<WatchFile> for FileWatcher {
type Result = ();
fn handle(&mut self, _msg: WatchFile, _ctx: &mut Self::Context) -> Self::Result {
debug!("Watching {} for activity...", self.path.display());
let mut buffer = [0u8; 4096];
if let Ok(mut events) = self.inotify.read_events(&mut buffer) {
if events.any(|event| event.mask.contains(EventMask::MODIFY)) {
info!("File modified");
self.recipient.do_send(FileUpdated);
};
}
}
}
// impl Handler<Stop> for FileWatcher {
// type Result = anyhow::Result<()>;
//
// fn handle(&mut self, _msg: Stop, ctx: &mut Self::Context) -> Self::Result {
// warn!("Stopping file watcher actor");
// self.run_interval.take();
// ctx.stop();
// Ok(())
// }
// }

View file

@ -1,3 +1,4 @@
pub mod file_watcher;
pub mod repo; pub mod repo;
pub mod server; pub mod server;
pub mod webhook; pub mod webhook;

View file

@ -7,8 +7,9 @@ use tracing::{error, info};
use crate::server::{ use crate::server::{
actors::{ actors::{
file_watcher::FileUpdated,
repo::{CloneRepo, RepoActor}, repo::{CloneRepo, RepoActor},
webhook::{AddWebhookRecipient, WebhookActor, WebhookRouter}, webhook::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter},
}, },
config::{ config::{
ForgeConfig, ForgeName, GitDir, RepoAlias, RepoDetails, ServerConfig, ServerRepoConfig, ForgeConfig, ForgeName, GitDir, RepoAlias, RepoDetails, ServerConfig, ServerRepoConfig,
@ -35,17 +36,36 @@ type Result<T> = core::result::Result<T, Error>;
pub struct Server { pub struct Server {
generation: ServerGeneration, generation: ServerGeneration,
webhook: Option<Addr<WebhookActor>>,
fs: FileSystem, fs: FileSystem,
net: Network, net: Network,
} }
impl Actor for Server { impl Actor for Server {
type Context = Context<Self>; type Context = Context<Self>;
} }
impl Handler<FileUpdated> for Server {
type Result = ();
fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result {
let server_config = match ServerConfig::load(&self.fs) {
Ok(server_config) => server_config,
Err(err) => {
error!("Failed to load config file. Error: {}", err);
return;
}
};
ctx.notify(server_config);
}
}
impl Handler<ServerConfig> for Server { impl Handler<ServerConfig> for Server {
type Result = (); type Result = ();
#[allow(clippy::cognitive_complexity)] // TODO: (#75) reduce complexity #[allow(clippy::cognitive_complexity)] // TODO: (#75) reduce complexity
fn handle(&mut self, msg: ServerConfig, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ServerConfig, _ctx: &mut Self::Context) -> Self::Result {
if let Some(webhook) = self.webhook.take() {
webhook.do_send(ShutdownWebhook);
}
self.generation.inc();
let server_config = msg; let server_config = msg;
// Server Storage // Server Storage
let dir = server_config.storage().path(); let dir = server_config.storage().path();
@ -83,13 +103,16 @@ impl Handler<ServerConfig> for Server {
.for_each(|msg| webhook_router.do_send(msg)); .for_each(|msg| webhook_router.do_send(msg));
} }
WebhookActor::new(webhook_router.recipient()).start(); let webhook = WebhookActor::new(webhook_router.recipient()).start();
self.webhook.replace(webhook);
} }
} }
impl Server { impl Server {
pub const fn new(generation: ServerGeneration, fs: FileSystem, net: Network) -> Self { pub fn new(fs: FileSystem, net: Network) -> Self {
let generation = ServerGeneration::new();
Self { Self {
generation, generation,
webhook: None,
fs, fs,
net, net,
} }

View file

@ -37,3 +37,17 @@ impl Actor for WebhookActor {
self.spawn_handle.replace(spawn_handle); self.spawn_handle.replace(spawn_handle);
} }
} }
#[derive(Debug)]
pub struct ShutdownWebhook;
impl Message for ShutdownWebhook {
type Result = ();
}
impl Handler<ShutdownWebhook> for WebhookActor {
type Result = ();
fn handle(&mut self, _msg: ShutdownWebhook, ctx: &mut Self::Context) -> Self::Result {
self.spawn_handle.take();
ctx.stop();
}
}

View file

@ -13,7 +13,10 @@ use tracing::{error, info, level_filters::LevelFilter};
use crate::{ use crate::{
fs::FileSystem, fs::FileSystem,
server::{actors::server::Server, types::ServerGeneration}, server::actors::{
file_watcher::{self, FileUpdated},
server::Server,
},
}; };
pub fn init(fs: FileSystem) { pub fn init(fs: FileSystem) {
@ -40,22 +43,21 @@ pub fn init(fs: FileSystem) {
pub async fn start(fs: FileSystem, net: Network) { pub async fn start(fs: FileSystem, net: Network) {
init_logging(); init_logging();
let generation = ServerGeneration::new();
{
let span = tracing::info_span!("Server", %generation);
let _guard = span.enter();
info!("Starting Server...");
let server_config = match config::ServerConfig::load(&fs) {
Ok(server_config) => server_config,
Err(err) => {
error!("Failed to load config file. Error: {}", err);
return;
}
};
let server = Server::new(generation, fs.clone(), net.clone()).start(); info!("Starting Server...");
server.do_send(server_config); let server = Server::new(fs.clone(), net.clone()).start();
} server.do_send(FileUpdated);
info!("Starting File Watcher...");
let fw = match file_watcher::FileWatcher::new("git-next-server.toml".into(), server.recipient())
{
Ok(fw) => fw,
Err(err) => {
error!(?err, "Failed to start file watcher");
return;
}
};
fw.start();
info!("Server running - Press Ctrl-C to stop..."); info!("Server running - Press Ctrl-C to stop...");
let _ = actix_rt::signal::ctrl_c().await; let _ = actix_rt::signal::ctrl_c().await;

View file

@ -42,8 +42,8 @@ impl ServerGeneration {
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
pub const fn next(&self) -> Self { pub fn inc(&mut self) {
Self(self.0 + 1) self.0 += 1
} }
} }
impl std::fmt::Display for ServerGeneration { impl std::fmt::Display for ServerGeneration {