Compare commits
2 commits
89975894a4
...
7b64e300b6
Author | SHA1 | Date | |
---|---|---|---|
7b64e300b6 | |||
f6bc2e1283 |
8 changed files with 27 additions and 25 deletions
|
@ -2,6 +2,7 @@
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
|
|
||||||
use actix::Recipient;
|
use actix::Recipient;
|
||||||
|
use anyhow::{Context, Result};
|
||||||
use notify::event::ModifyKind;
|
use notify::event::ModifyKind;
|
||||||
use notify::Watcher;
|
use notify::Watcher;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
@ -19,15 +20,13 @@ pub enum Error {
|
||||||
#[error("io")]
|
#[error("io")]
|
||||||
Io(#[from] std::io::Error),
|
Io(#[from] std::io::Error),
|
||||||
}
|
}
|
||||||
pub async fn watch_file(path: PathBuf, recipient: Recipient<FileUpdated>) {
|
pub async fn watch_file(path: PathBuf, recipient: Recipient<FileUpdated>) -> Result<()> {
|
||||||
let (tx, rx) = std::sync::mpsc::channel();
|
let (tx, rx) = std::sync::mpsc::channel();
|
||||||
|
|
||||||
#[allow(clippy::expect_used)]
|
let mut handler = notify::recommended_watcher(tx).context("file watcher")?;
|
||||||
let mut handler = notify::recommended_watcher(tx).expect("file watcher");
|
|
||||||
#[allow(clippy::expect_used)]
|
|
||||||
handler
|
handler
|
||||||
.watch(&path, notify::RecursiveMode::NonRecursive)
|
.watch(&path, notify::RecursiveMode::NonRecursive)
|
||||||
.expect("watch file");
|
.with_context(|| format!("Watch file: {path:?}"))?;
|
||||||
info!("Watching: {:?}", path);
|
info!("Watching: {:?}", path);
|
||||||
async move {
|
async move {
|
||||||
loop {
|
loop {
|
||||||
|
@ -55,4 +54,5 @@ pub async fn watch_file(path: PathBuf, recipient: Recipient<FileUpdated>) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.await;
|
.await;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,13 +12,9 @@ impl Handler<FileUpdated> for ServerActor {
|
||||||
type Result = ();
|
type Result = ();
|
||||||
|
|
||||||
fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result {
|
||||||
let server_config = match ServerConfig::load(&self.fs) {
|
match ServerConfig::load(&self.fs) {
|
||||||
Ok(server_config) => server_config,
|
Ok(server_config) => self.do_send(ReceiveServerConfig::new(server_config), ctx),
|
||||||
Err(err) => {
|
Err(err) => self.abort(ctx, format!("Failed to load config file. Error: {}", err)),
|
||||||
tracing::error!("Failed to load config file. Error: {}", err);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
self.do_send(ReceiveServerConfig::new(server_config), ctx);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,18 +12,15 @@ impl Handler<ReceiveServerConfig> for ServerActor {
|
||||||
fn handle(&mut self, msg: ReceiveServerConfig, ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: ReceiveServerConfig, ctx: &mut Self::Context) -> Self::Result {
|
||||||
tracing::info!("recieved server config");
|
tracing::info!("recieved server config");
|
||||||
let Ok(socket_addr) = msg.http() else {
|
let Ok(socket_addr) = msg.http() else {
|
||||||
tracing::error!("Unable to parse http.addr");
|
return self.abort(ctx, "Unable to parse http.addr");
|
||||||
return;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some(server_storage) = self.server_storage(&msg) else {
|
let Some(server_storage) = self.server_storage(&msg) else {
|
||||||
tracing::error!("Server storage not available");
|
return self.abort(ctx, "Server storage not available");
|
||||||
return;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if msg.inbound_webhook().base_url().ends_with('/') {
|
if msg.inbound_webhook().base_url().ends_with('/') {
|
||||||
tracing::error!("webhook.url must not end with a '/'");
|
return self.abort(ctx, "webhook.url must not end with a '/'");
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.do_send(
|
self.do_send(
|
||||||
|
|
|
@ -229,6 +229,13 @@ impl ServerActor {
|
||||||
Some(server_storage)
|
Some(server_storage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to gracefully shutdown the server before stopping the system.
|
||||||
|
fn abort(&mut self, ctx: &mut <Self as actix::Actor>::Context, message: impl Into<String>) {
|
||||||
|
tracing::error!("Aborting: {}", message.into());
|
||||||
|
self.do_send(crate::server::actor::messages::Shutdown, ctx);
|
||||||
|
System::current().stop_with_code(1);
|
||||||
|
}
|
||||||
|
|
||||||
fn do_send<M>(&mut self, msg: M, _ctx: &mut <Self as actix::Actor>::Context)
|
fn do_send<M>(&mut self, msg: M, _ctx: &mut <Self as actix::Actor>::Context)
|
||||||
where
|
where
|
||||||
M: actix::Message + Send + 'static + std::fmt::Debug,
|
M: actix::Message + Send + 'static + std::fmt::Debug,
|
||||||
|
|
|
@ -52,5 +52,5 @@ async fn when_webhook_url_has_trailing_slash_should_not_send() {
|
||||||
tracing::debug!(?message_log, "");
|
tracing::debug!(?message_log, "");
|
||||||
assert!(message_log.read().iter().any(|log| !log
|
assert!(message_log.read().iter().any(|log| !log
|
||||||
.iter()
|
.iter()
|
||||||
.any(|line| line != "send: ReceiveValidServerConfig")));
|
.any(|line| line == "send: ReceiveValidServerConfig")));
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,10 @@ pub fn start(
|
||||||
server.do_send(FileUpdated);
|
server.do_send(FileUpdated);
|
||||||
|
|
||||||
info!("Starting File Watcher...");
|
info!("Starting File Watcher...");
|
||||||
watch_file("git-next-server.toml".into(), server.clone().recipient()).await;
|
#[allow(clippy::expect_used)]
|
||||||
|
watch_file("git-next-server.toml".into(), server.clone().recipient())
|
||||||
|
.await
|
||||||
|
.expect("file watcher");
|
||||||
|
|
||||||
info!("Server running - Press Ctrl-C to stop...");
|
info!("Server running - Press Ctrl-C to stop...");
|
||||||
let _ = actix_rt::signal::ctrl_c().await;
|
let _ = actix_rt::signal::ctrl_c().await;
|
||||||
|
|
|
@ -133,7 +133,7 @@ impl super::OpenRepositoryLike for RealOpenRepository {
|
||||||
) -> Result<Vec<git::Commit>, git::commit::log::Error> {
|
) -> Result<Vec<git::Commit>, git::commit::log::Error> {
|
||||||
let limit = match find_commits.is_empty() {
|
let limit = match find_commits.is_empty() {
|
||||||
true => 1,
|
true => 1,
|
||||||
false => 50,
|
false => 25,
|
||||||
};
|
};
|
||||||
self.0
|
self.0
|
||||||
.read()
|
.read()
|
||||||
|
|
|
@ -19,18 +19,17 @@ fn should_return_single_item_in_commit_log_when_not_searching() -> TestResult {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
// assumes running in the git-next repo which should have main, next and dev as remote branches
|
// assumes running in the git-next repo which should have main, next and dev as remote branches
|
||||||
fn should_return_capacity_50_in_commit_log_when_searching_for_garbage() -> TestResult {
|
fn should_return_capacity_25_in_commit_log_when_searching_for_garbage() -> TestResult {
|
||||||
let_assert!(Ok(fs) = kxio::fs::temp());
|
let_assert!(Ok(fs) = kxio::fs::temp());
|
||||||
let branch_name = given::a_branch_name();
|
let branch_name = given::a_branch_name();
|
||||||
let gitdir = GitDir::new(fs.base().to_path_buf(), StoragePathType::Internal);
|
let gitdir = GitDir::new(fs.base().to_path_buf(), StoragePathType::Internal);
|
||||||
let test_repository = git::repository::test(fs.clone());
|
let test_repository = git::repository::test(fs.clone());
|
||||||
let_assert!(Ok(open_repository) = test_repository.open(&gitdir));
|
let_assert!(Ok(open_repository) = test_repository.open(&gitdir));
|
||||||
for _ in [0; 60] {
|
for _ in [0; 25] {
|
||||||
// create 60 commits
|
|
||||||
then::create_a_commit_on_branch(&fs, &gitdir, &branch_name)?;
|
then::create_a_commit_on_branch(&fs, &gitdir, &branch_name)?;
|
||||||
}
|
}
|
||||||
let_assert!(Ok(result) = open_repository.commit_log(&branch_name, &[given::a_commit()]));
|
let_assert!(Ok(result) = open_repository.commit_log(&branch_name, &[given::a_commit()]));
|
||||||
assert_eq!(result.len(), 50);
|
assert_eq!(result.len(), 25);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue