fix: file_watcher runs on own thread

Closes kemitix/git-next#142
This commit is contained in:
Paul Campbell 2024-08-10 20:08:07 +01:00
parent e34c6e0ef6
commit 08d2377404
6 changed files with 185 additions and 17 deletions

133
Cargo.lock generated
View file

@ -890,6 +890,21 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.30" version = "0.3.30"
@ -906,12 +921,34 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.30" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.30" version = "0.3.30"
@ -924,14 +961,22 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.30" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
@ -988,7 +1033,7 @@ checksum = "8066dc2ef3bd0e2bfb84b4a2b0b04f216ffb97390e09fab0752bf0ba943dacc6"
dependencies = [ dependencies = [
"doc-comment", "doc-comment",
"unicase", "unicase",
"winnow", "winnow 0.6.7",
] ]
[[package]] [[package]]
@ -1015,6 +1060,7 @@ dependencies = [
"pretty_assertions", "pretty_assertions",
"rand", "rand",
"ratatui", "ratatui",
"rstest",
"secrecy", "secrecy",
"sendmail", "sendmail",
"serde_json", "serde_json",
@ -1180,7 +1226,7 @@ dependencies = [
"gix-utils", "gix-utils",
"itoa", "itoa",
"thiserror", "thiserror",
"winnow", "winnow 0.6.7",
] ]
[[package]] [[package]]
@ -1275,7 +1321,7 @@ dependencies = [
"smallvec", "smallvec",
"thiserror", "thiserror",
"unicode-bom", "unicode-bom",
"winnow", "winnow 0.6.7",
] ]
[[package]] [[package]]
@ -1572,7 +1618,7 @@ dependencies = [
"itoa", "itoa",
"smallvec", "smallvec",
"thiserror", "thiserror",
"winnow", "winnow 0.6.7",
] ]
[[package]] [[package]]
@ -1696,7 +1742,7 @@ dependencies = [
"gix-utils", "gix-utils",
"maybe-async", "maybe-async",
"thiserror", "thiserror",
"winnow", "winnow 0.6.7",
] ]
[[package]] [[package]]
@ -1728,7 +1774,7 @@ dependencies = [
"gix-validate", "gix-validate",
"memmap2", "memmap2",
"thiserror", "thiserror",
"winnow", "winnow 0.6.7",
] ]
[[package]] [[package]]
@ -1976,6 +2022,12 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.26" version = "0.3.26"
@ -3067,6 +3119,15 @@ dependencies = [
"yansi", "yansi",
] ]
[[package]]
name = "proc-macro-crate"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
dependencies = [
"toml_edit 0.21.1",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.81" version = "1.0.81"
@ -3260,6 +3321,12 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]]
name = "relative-path"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.12.4" version = "0.12.4"
@ -3323,6 +3390,36 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "rstest"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b423f0e62bdd61734b67cd21ff50871dfaeb9cc74f869dcd6af974fbcb19936"
dependencies = [
"futures",
"futures-timer",
"rstest_macros",
"rustc_version",
]
[[package]]
name = "rstest_macros"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5e1711e7d14f74b12a58411c542185ef7fb7f2e7f8ee6e2940a883628522b42"
dependencies = [
"cfg-if",
"glob",
"proc-macro-crate",
"proc-macro2",
"quote",
"regex",
"relative-path",
"rustc_version",
"syn 2.0.60",
"unicode-ident",
]
[[package]] [[package]]
name = "rust-argon2" name = "rust-argon2"
version = "0.8.3" version = "0.8.3"
@ -4002,7 +4099,7 @@ dependencies = [
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
"toml_edit", "toml_edit 0.22.12",
] ]
[[package]] [[package]]
@ -4014,6 +4111,17 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "toml_edit"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
dependencies = [
"indexmap",
"toml_datetime",
"winnow 0.5.40",
]
[[package]] [[package]]
name = "toml_edit" name = "toml_edit"
version = "0.22.12" version = "0.22.12"
@ -4024,7 +4132,7 @@ dependencies = [
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
"winnow", "winnow 0.6.7",
] ]
[[package]] [[package]]
@ -4634,6 +4742,15 @@ version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
[[package]]
name = "winnow"
version = "0.5.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "winnow" name = "winnow"
version = "0.6.7" version = "0.6.7"

View file

@ -111,3 +111,4 @@ pretty_assertions = "1.4"
rand = "0.8" rand = "0.8"
mockall = "0.13" mockall = "0.13"
test-log = "0.2" test-log = "0.2"
rstest = { version = "0.22", features = ["async-timeout"] }

View file

@ -77,6 +77,7 @@ test-log = { workspace = true }
rand = { workspace = true } rand = { workspace = true }
pretty_assertions = { workspace = true } pretty_assertions = { workspace = true }
mockall = { workspace = true } mockall = { workspace = true }
rstest = { workspace = true }
[lints.clippy] [lints.clippy]
nursery = { level = "warn", priority = -1 } nursery = { level = "warn", priority = -1 }

View file

@ -9,6 +9,9 @@ use tracing::error;
use tracing::info; use tracing::info;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
#[derive(Debug, Message)] #[derive(Debug, Message)]
@ -20,16 +23,22 @@ pub enum Error {
#[error("io")] #[error("io")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
} }
pub async fn watch_file(path: PathBuf, recipient: Recipient<FileUpdated>) -> Result<()> { pub fn watch_file(path: PathBuf, recipient: Recipient<FileUpdated>) -> Result<Arc<AtomicBool>> {
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
let shutdown = Arc::new(AtomicBool::default());
let mut handler = notify::recommended_watcher(tx).context("file watcher")?; let mut handler = notify::recommended_watcher(tx).context("file watcher")?;
handler handler
.watch(&path, notify::RecursiveMode::NonRecursive) .watch(&path, notify::RecursiveMode::NonRecursive)
.with_context(|| format!("Watch file: {path:?}"))?; .with_context(|| format!("Watch file: {path:?}"))?;
info!("Watching: {:?}", path); info!("Watching: {:?}", path);
async move { let thread_shutdown = shutdown.clone();
actix_rt::task::spawn_blocking(move || {
loop { loop {
if thread_shutdown.load(Ordering::Relaxed) {
drop(handler);
break;
}
for result in rx.try_iter() { for result in rx.try_iter() {
match result { match result {
Ok(event) => match event.kind { Ok(event) => match event.kind {
@ -50,9 +59,8 @@ pub async fn watch_file(path: PathBuf, recipient: Recipient<FileUpdated>) -> Res
} }
} }
} }
actix_rt::time::sleep(Duration::from_millis(1000)).await; std::thread::sleep(Duration::from_millis(1000));
} }
} });
.await; Ok(shutdown)
Ok(())
} }

View file

@ -17,7 +17,7 @@ use anyhow::{Context, Result};
use kxio::{fs::FileSystem, network::Network}; use kxio::{fs::FileSystem, network::Network};
use tracing::info; use tracing::info;
use std::{path::PathBuf, time::Duration}; use std::{path::PathBuf, sync::atomic::Ordering, time::Duration};
const A_DAY: Duration = Duration::from_secs(24 * 60 * 60); const A_DAY: Duration = Duration::from_secs(24 * 60 * 60);
@ -62,8 +62,7 @@ pub fn start(
info!("Starting File Watcher..."); info!("Starting File Watcher...");
#[allow(clippy::expect_used)] #[allow(clippy::expect_used)]
watch_file("git-next-server.toml".into(), server.clone().recipient()) let fw_shutdown = watch_file("git-next-server.toml".into(), server.clone().recipient())
.await
.expect("file watcher"); .expect("file watcher");
info!("Server running - Press Ctrl-C to stop..."); info!("Server running - Press Ctrl-C to stop...");
@ -72,6 +71,7 @@ pub fn start(
server.do_send(crate::server::actor::messages::Shutdown); server.do_send(crate::server::actor::messages::Shutdown);
actix_rt::time::sleep(std::time::Duration::from_millis(200)).await; actix_rt::time::sleep(std::time::Duration::from_millis(200)).await;
System::current().stop(); System::current().stop();
fw_shutdown.store(true, Ordering::Relaxed);
}; };
let system = System::new(); let system = System::new();
Arbiter::current().spawn(execution); Arbiter::current().spawn(execution);

View file

@ -38,3 +38,44 @@ mod init {
Ok(()) Ok(())
} }
} }
mod file_watcher {
use std::{sync::atomic::Ordering, time::Duration};
use actix::{Actor, Context, Handler};
use rstest::*;
use crate::file_watcher::{self, FileUpdated};
use super::TestResult;
#[rstest]
#[actix::test]
#[timeout(Duration::from_millis(80))]
async fn should_not_block_calling_thread() -> TestResult {
let fs = kxio::fs::temp()?;
let path = fs.base().join("file");
fs.file_write(&path, "foo")?;
let listener = Listener;
let l_addr = listener.start();
let recipient = l_addr.recipient();
let fw_shutdown = file_watcher::watch_file(path, recipient)?;
std::thread::sleep(Duration::from_millis(10));
fw_shutdown.store(true, Ordering::Relaxed);
Ok(()) // was not blocked
}
struct Listener;
impl Actor for Listener {
type Context = Context<Self>;
}
impl Handler<FileUpdated> for Listener {
type Result = ();
fn handle(&mut self, _msg: FileUpdated, _ctx: &mut Self::Context) -> Self::Result {
// todo!()
}
}
}