WIP: feat: support non-linux platforms
All checks were successful
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful

Closes kemitix/git-next#108
This commit is contained in:
Paul Campbell 2024-07-28 15:37:58 +01:00
parent 22faa851dc
commit 6fccd1ff82
5 changed files with 45 additions and 65 deletions

View file

@ -91,7 +91,7 @@ thiserror = "1.0"
pike = "0.1" pike = "0.1"
# file watcher # file watcher
inotify = "0.10" notify = "6.1"
# Actors # Actors
actix = "0.13" actix = "0.13"

View file

@ -65,7 +65,7 @@ bytes = { workspace = true }
warp = { workspace = true } warp = { workspace = true }
# file watcher (linux) # file watcher (linux)
inotify = { workspace = true } notify = { workspace = true }
[dev-dependencies] [dev-dependencies]
# Testing # Testing

View file

@ -2,15 +2,13 @@
use actix::prelude::*; use actix::prelude::*;
use actix::Recipient; use actix::Recipient;
use inotify::{EventMask, Inotify, WatchMask}; use notify::event::ModifyKind;
use notify::Watcher;
use tracing::error;
use tracing::info;
use std::{path::PathBuf, time::Duration}; use std::path::PathBuf;
use std::time::Duration;
const CHECK_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Debug, Clone, Message)]
#[rtype(result = "()")]
pub struct WatchFile;
#[derive(Debug, Message)] #[derive(Debug, Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
@ -21,50 +19,40 @@ pub enum Error {
#[error("io")] #[error("io")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
} }
pub async fn watch_file(path: PathBuf, recipient: Recipient<FileUpdated>) {
let (tx, rx) = std::sync::mpsc::channel();
pub struct FileWatcher { #[allow(clippy::expect_used)]
inotify: Inotify, let mut handler = notify::recommended_watcher(tx).expect("file watcher");
recipient: Recipient<FileUpdated>, #[allow(clippy::expect_used)]
run_interval: Option<SpawnHandle>, handler
} .watch(&path, notify::RecursiveMode::NonRecursive)
impl FileWatcher { .expect("watch file");
pub fn new(path: PathBuf, recipient: Recipient<FileUpdated>) -> Result<Self, Error> { info!("Watching: {:?}", path);
let inotify = Inotify::init()?; async move {
inotify.watches().add( loop {
path, for result in rx.try_iter() {
WatchMask::MODIFY | WatchMask::CREATE | WatchMask::DELETE | WatchMask::ATTRIB, match result {
)?; Ok(event) => match event.kind {
Ok(Self { notify::EventKind::Modify(ModifyKind::Data(_)) => {
inotify,
recipient,
run_interval: None,
})
}
}
impl Actor for FileWatcher {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
tracing::info!("Starting file watcher actor");
self.run_interval
.replace(ctx.run_interval(CHECK_INTERVAL, |_act, ctx| {
ctx.notify(WatchFile);
}));
}
}
impl Handler<WatchFile> for FileWatcher {
type Result = ();
fn handle(&mut self, _msg: WatchFile, _ctx: &mut Self::Context) -> Self::Result {
let mut buffer = [0u8; 4096];
if let Ok(mut events) = self.inotify.read_events(&mut buffer) {
if events.any(|event| {
event.mask.contains(EventMask::MODIFY) || event.mask.contains(EventMask::ATTRIB)
}) {
tracing::info!("File modified"); tracing::info!("File modified");
self.recipient.do_send(FileUpdated); recipient.do_send(FileUpdated);
}; break;
}
notify::EventKind::Modify(_)
| notify::EventKind::Create(_)
| notify::EventKind::Remove(_)
| notify::EventKind::Any
| notify::EventKind::Access(_)
| notify::EventKind::Other => { /* do nothing */ }
},
Err(err) => {
error!(?err, "Watching file: {path:?}");
} }
} }
} }
actix_rt::time::sleep(Duration::from_millis(1000)).await;
}
}
.await;
}

View file

@ -243,6 +243,5 @@ impl ServerActor {
} }
#[cfg(not(test))] #[cfg(not(test))]
_ctx.address().do_send(msg); _ctx.address().do_send(msg);
tracing::info!("sent");
} }
} }

View file

@ -6,12 +6,12 @@ mod tests;
use actix::prelude::*; use actix::prelude::*;
use crate::file_watcher::{FileUpdated, FileWatcher}; use crate::file_watcher::{watch_file, FileUpdated};
use actor::ServerActor; use actor::ServerActor;
use git_next_core::git::RepositoryFactory; use git_next_core::git::RepositoryFactory;
use kxio::{fs::FileSystem, network::Network}; use kxio::{fs::FileSystem, network::Network};
use tracing::{error, info, level_filters::LevelFilter}; use tracing::{info, level_filters::LevelFilter};
use std::path::PathBuf; use std::path::PathBuf;
@ -45,20 +45,13 @@ pub fn start(
) { ) {
init_logging(); init_logging();
info!("Starting Server...");
let execution = async move { let execution = async move {
info!("Starting Server...");
let server = ServerActor::new(fs.clone(), net.clone(), repo, sleep_duration).start(); let server = ServerActor::new(fs.clone(), net.clone(), repo, sleep_duration).start();
server.do_send(FileUpdated); server.do_send(FileUpdated);
info!("Starting File Watcher..."); info!("Starting File Watcher...");
let fw = match FileWatcher::new("git-next-server.toml".into(), server.clone().recipient()) { watch_file("git-next-server.toml".into(), server.clone().recipient()).await;
Ok(fw) => fw,
Err(err) => {
error!(?err, "Failed to start file watcher");
return;
}
};
fw.start();
info!("Server running - Press Ctrl-C to stop..."); info!("Server running - Press Ctrl-C to stop...");
let _ = actix_rt::signal::ctrl_c().await; let _ = actix_rt::signal::ctrl_c().await;