git-next/crates/cli/src/file_watcher.rs
Paul Campbell b7aa231925
All checks were successful
Rust / build (map[name:nightly]) (push) Successful in 7m32s
Rust / build (map[name:stable]) (push) Successful in 14m44s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
Release Please / Release-plz (push) Successful in 1m14s
feat: switch to kameo actor system (dropping actix)
2024-11-29 09:33:36 +00:00

99 lines
3.1 KiB
Rust

//
use std::{path::PathBuf, sync::mpsc::Receiver};
use anyhow::Context;
use kameo::{mailbox::unbounded::UnboundedMailbox, message::Message, Actor};
use notify::{event::ModifyKind, RecommendedWatcher, Watcher};
use tracing::{error, info};
use git_next_core::message;
use crate::{
default_on_actor_link_died, default_on_actor_panic, default_on_actor_stop, on_actor_start,
publish, tell, MessageBus,
};
message!(
FileUpdated,
"Notification that watched file has been updated"
);
message!(Watch, "Watch for the next event on the file");
pub struct FileWatcherActor {
file_updates_bus: MessageBus<FileUpdated>,
file: PathBuf,
event_receiver: Option<Receiver<Result<notify::Event, notify::Error>>>,
watcher: Option<RecommendedWatcher>,
}
impl FileWatcherActor {
pub const fn new(file_updates_bus: MessageBus<FileUpdated>, file: PathBuf) -> Self {
Self {
file_updates_bus,
file,
event_receiver: None,
watcher: None,
}
}
}
impl Actor for FileWatcherActor {
type Mailbox = UnboundedMailbox<Self>;
on_actor_start!(this, actor_ref, {
//
let (tx, rx) = std::sync::mpsc::channel();
this.event_receiver.replace(rx);
let mut watcher = notify::recommended_watcher(tx).context("file watcher")?;
watcher
.watch(&this.file, notify::RecursiveMode::NonRecursive)
.with_context(|| format!("Watching: {:?}", this.file))?;
this.watcher.replace(watcher);
tell!("file_watcher", actor_ref, Watch)?;
Ok(())
});
default_on_actor_panic!(this, actor_ref, err);
default_on_actor_link_died!(this, actor_ref, id, reason);
default_on_actor_stop!(this, actor_ref, reason);
}
impl Message<Watch> for FileWatcherActor {
type Reply = color_eyre::Result<()>;
async fn handle(
&mut self,
msg: Watch,
ctx: kameo::message::Context<'_, Self, Self::Reply>,
) -> Self::Reply {
if let Some(rx) = &self.event_receiver {
while let Ok(result) = rx.recv() {
match result {
Ok(event) => match event.kind {
notify::EventKind::Modify(ModifyKind::Data(_)) => {
info!("===================================================================================");
info!(?event, "File modified");
publish!("file_updates_bus", self.file_updates_bus, FileUpdated)?;
break;
}
notify::EventKind::Modify(_)
| notify::EventKind::Create(_)
| notify::EventKind::Remove(_)
| notify::EventKind::Any
| notify::EventKind::Access(_)
| notify::EventKind::Other => { /* do nothing */ }
},
Err(err) => {
error!(?err, "Watching file: {:?}", self.file);
}
}
}
}
tell!("file_watcher", ctx.actor_ref(), msg)?;
Ok(())
}
}