From 08d2377404935144be6135f0471a92721604e1a0 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Sat, 10 Aug 2024 20:08:07 +0100 Subject: [PATCH] fix: file_watcher runs on own thread Closes kemitix/git-next#142 --- Cargo.lock | 133 +++++++++++++++++++++++++++++++-- Cargo.toml | 1 + crates/cli/Cargo.toml | 1 + crates/cli/src/file_watcher.rs | 20 +++-- crates/cli/src/server/mod.rs | 6 +- crates/cli/src/tests.rs | 41 ++++++++++ 6 files changed, 185 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1744c46..20f6c9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -890,6 +890,21 @@ dependencies = [ "libc", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -906,12 +921,34 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -924,14 +961,22 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -988,7 +1033,7 @@ checksum = "8066dc2ef3bd0e2bfb84b4a2b0b04f216ffb97390e09fab0752bf0ba943dacc6" dependencies = [ "doc-comment", "unicase", - "winnow", + "winnow 0.6.7", ] [[package]] @@ -1015,6 +1060,7 @@ dependencies = [ "pretty_assertions", "rand", "ratatui", + "rstest", "secrecy", "sendmail", "serde_json", @@ -1180,7 +1226,7 @@ dependencies = [ "gix-utils", "itoa", "thiserror", - "winnow", + "winnow 0.6.7", ] [[package]] @@ -1275,7 +1321,7 @@ dependencies = [ "smallvec", "thiserror", "unicode-bom", - "winnow", + "winnow 0.6.7", ] [[package]] @@ -1572,7 +1618,7 @@ dependencies = [ "itoa", "smallvec", "thiserror", - "winnow", + "winnow 0.6.7", ] [[package]] @@ -1696,7 +1742,7 @@ dependencies = [ "gix-utils", "maybe-async", "thiserror", - "winnow", + "winnow 0.6.7", ] [[package]] @@ -1728,7 +1774,7 @@ dependencies = [ "gix-validate", "memmap2", "thiserror", - "winnow", + "winnow 0.6.7", ] [[package]] @@ -1976,6 +2022,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.26" @@ -3067,6 +3119,15 @@ dependencies = [ "yansi", ] +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit 0.21.1", +] + [[package]] name = "proc-macro2" version = "1.0.81" @@ -3260,6 +3321,12 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "reqwest" version = "0.12.4" @@ -3323,6 +3390,36 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rstest" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b423f0e62bdd61734b67cd21ff50871dfaeb9cc74f869dcd6af974fbcb19936" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e1711e7d14f74b12a58411c542185ef7fb7f2e7f8ee6e2940a883628522b42" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.60", + "unicode-ident", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -4002,7 +4099,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_edit 0.22.12", ] [[package]] @@ -4014,6 +4111,17 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_edit" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap", + "toml_datetime", + "winnow 0.5.40", +] + [[package]] name = "toml_edit" version = "0.22.12" @@ -4024,7 +4132,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.7", ] [[package]] @@ -4634,6 +4742,15 @@ version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +[[package]] +name = "winnow" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "0.6.7" diff --git a/Cargo.toml b/Cargo.toml index 164c1d7..99d465e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,3 +111,4 @@ pretty_assertions = "1.4" rand = "0.8" mockall = "0.13" test-log = "0.2" +rstest = { version = "0.22", features = ["async-timeout"] } diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index affe9c0..e1ff60d 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -77,6 +77,7 @@ test-log = { workspace = true } rand = { workspace = true } pretty_assertions = { workspace = true } mockall = { workspace = true } +rstest = { workspace = true } [lints.clippy] nursery = { level = "warn", priority = -1 } diff --git a/crates/cli/src/file_watcher.rs b/crates/cli/src/file_watcher.rs index c6bfc3d..eac23b7 100644 --- a/crates/cli/src/file_watcher.rs +++ b/crates/cli/src/file_watcher.rs @@ -9,6 +9,9 @@ use tracing::error; use tracing::info; use std::path::PathBuf; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; use std::time::Duration; #[derive(Debug, Message)] @@ -20,16 +23,22 @@ pub enum Error { #[error("io")] Io(#[from] std::io::Error), } -pub async fn watch_file(path: PathBuf, recipient: Recipient) -> Result<()> { +pub fn watch_file(path: PathBuf, recipient: Recipient) -> Result> { let (tx, rx) = std::sync::mpsc::channel(); + let shutdown = Arc::new(AtomicBool::default()); let mut handler = notify::recommended_watcher(tx).context("file watcher")?; handler .watch(&path, notify::RecursiveMode::NonRecursive) .with_context(|| format!("Watch file: {path:?}"))?; info!("Watching: {:?}", path); - async move { + let thread_shutdown = shutdown.clone(); + actix_rt::task::spawn_blocking(move || { loop { + if thread_shutdown.load(Ordering::Relaxed) { + drop(handler); + break; + } for result in rx.try_iter() { match result { Ok(event) => match event.kind { @@ -50,9 +59,8 @@ pub async fn watch_file(path: PathBuf, recipient: Recipient) -> Res } } } - actix_rt::time::sleep(Duration::from_millis(1000)).await; + std::thread::sleep(Duration::from_millis(1000)); } - } - .await; - Ok(()) + }); + Ok(shutdown) } diff --git a/crates/cli/src/server/mod.rs b/crates/cli/src/server/mod.rs index 1d517ea..8acabe4 100644 --- a/crates/cli/src/server/mod.rs +++ b/crates/cli/src/server/mod.rs @@ -17,7 +17,7 @@ use anyhow::{Context, Result}; use kxio::{fs::FileSystem, network::Network}; use tracing::info; -use std::{path::PathBuf, time::Duration}; +use std::{path::PathBuf, sync::atomic::Ordering, time::Duration}; const A_DAY: Duration = Duration::from_secs(24 * 60 * 60); @@ -62,8 +62,7 @@ pub fn start( info!("Starting File Watcher..."); #[allow(clippy::expect_used)] - watch_file("git-next-server.toml".into(), server.clone().recipient()) - .await + let fw_shutdown = watch_file("git-next-server.toml".into(), server.clone().recipient()) .expect("file watcher"); info!("Server running - Press Ctrl-C to stop..."); @@ -72,6 +71,7 @@ pub fn start( server.do_send(crate::server::actor::messages::Shutdown); actix_rt::time::sleep(std::time::Duration::from_millis(200)).await; System::current().stop(); + fw_shutdown.store(true, Ordering::Relaxed); }; let system = System::new(); Arbiter::current().spawn(execution); diff --git a/crates/cli/src/tests.rs b/crates/cli/src/tests.rs index 627c378..6248879 100644 --- a/crates/cli/src/tests.rs +++ b/crates/cli/src/tests.rs @@ -38,3 +38,44 @@ mod init { Ok(()) } } +mod file_watcher { + use std::{sync::atomic::Ordering, time::Duration}; + + use actix::{Actor, Context, Handler}; + use rstest::*; + + use crate::file_watcher::{self, FileUpdated}; + + use super::TestResult; + + #[rstest] + #[actix::test] + #[timeout(Duration::from_millis(80))] + async fn should_not_block_calling_thread() -> TestResult { + let fs = kxio::fs::temp()?; + let path = fs.base().join("file"); + fs.file_write(&path, "foo")?; + + let listener = Listener; + let l_addr = listener.start(); + let recipient = l_addr.recipient(); + + let fw_shutdown = file_watcher::watch_file(path, recipient)?; + std::thread::sleep(Duration::from_millis(10)); + fw_shutdown.store(true, Ordering::Relaxed); + + Ok(()) // was not blocked + } + + struct Listener; + impl Actor for Listener { + type Context = Context; + } + impl Handler for Listener { + type Result = (); + + fn handle(&mut self, _msg: FileUpdated, _ctx: &mut Self::Context) -> Self::Result { + // todo!() + } + } +}