refactor: merge server-actor crate into cli crate
All checks were successful
Rust / build (push) Successful in 2m24s
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/cron-docker-builder Pipeline was successful

This commit is contained in:
Paul Campbell 2024-07-27 08:27:04 +01:00
parent 1427284c2a
commit a679abeafc
17 changed files with 276 additions and 286 deletions

View file

@ -15,6 +15,9 @@ categories = { workspace = true }
git-next-core = { workspace = true }
git-next-file-watcher-actor = { workspace = true }
git-next-server-actor = { workspace = true }
git-next-forge = { workspace = true }
git-next-repo-actor = { workspace = true }
git-next-webhook-actor = { workspace = true }
# CLI parsing
clap = { workspace = true }
@ -31,10 +34,21 @@ tracing-subscriber = { workspace = true }
actix = { workspace = true }
actix-rt = { workspace = true }
# boilerplate
derive_more = { workspace = true }
derive-with = { workspace = true }
# Webhooks
serde = { workspace = true }
serde_json = { workspace = true }
ulid = { workspace = true }
time = { workspace = true }
secrecy = { workspace = true }
standardwebhooks = { workspace = true }
[dev-dependencies]
# Testing
assert2 = { workspace = true }
secrecy = { workspace = true }
test-log = { workspace = true }
[lints.clippy]

View file

@ -520,7 +520,9 @@ The following diagram shows the dependency between the crates that make up `git-
stateDiagram-v2
cli --> core
cli --> server_actor
cli --> forge
cli --> repo_actor
cli --> webhook_actor
cli --> file_watcher_actor
forge --> core
@ -534,12 +536,6 @@ stateDiagram-v2
repo_actor --> core
repo_actor --> forge
server_actor --> core
server_actor --> forge
server_actor --> repo_actor
server_actor --> file_watcher_actor
server_actor --> webhook_actor
webhook_actor --> core
webhook_actor --> repo_actor
```

View file

@ -4,7 +4,7 @@ use actix::prelude::*;
use git_next_core::server::ServerConfig;
use git_next_file_watcher_actor::FileUpdated;
use crate::{messages::ReceiveServerConfig, ServerActor};
use crate::server::actor::{messages::ReceiveServerConfig, ServerActor};
impl Handler<FileUpdated> for ServerActor {
type Result = ();

View file

@ -5,7 +5,7 @@ use secrecy::ExposeSecret;
use standardwebhooks::Webhook;
use tracing::Instrument;
use crate::ServerActor;
use crate::server::actor::ServerActor;
use git_next_core::server::{self, Notification, NotificationType};
use git_next_repo_actor::messages::NotifyUser;

View file

@ -1,6 +1,6 @@
use actix::prelude::*;
use crate::{
use crate::server::actor::{
messages::{ReceiveServerConfig, ReceiveValidServerConfig, ValidServerConfig},
ServerActor,
};

View file

@ -1,7 +1,7 @@
use actix::prelude::*;
use git_next_webhook_actor::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter};
use crate::{
use crate::server::actor::{
messages::{ReceiveValidServerConfig, ValidServerConfig},
ServerActor,
};

View file

@ -2,7 +2,7 @@
use actix::prelude::*;
use git_next_webhook_actor::ShutdownWebhook;
use crate::{messages::Shutdown, ServerActor};
use crate::server::actor::{messages::Shutdown, ServerActor};
impl Handler<Shutdown> for ServerActor {
type Result = ();

View file

@ -0,0 +1,244 @@
//
use actix::prelude::*;
#[cfg(test)]
mod tests;
mod handlers;
pub mod messages;
use git_next_core::{
git::{repository::factory::RepositoryFactory, Generation, RepoDetails},
server::{self, InboundWebhook, ServerConfig, ServerStorage},
ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig, StoragePathType,
};
use git_next_repo_actor::messages::NotifyUser;
use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use git_next_webhook_actor::WebhookActor;
use kxio::{fs::FileSystem, network::Network};
use std::{
collections::BTreeMap,
path::PathBuf,
sync::{Arc, RwLock},
};
use tracing::{error, info};
use messages::ReceiveServerConfig;
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
#[display("Failed to create data directories")]
FailedToCreateDataDirectory(kxio::fs::Error),
#[display("The forge data path is not a directory: {path:?}")]
ForgeDirIsNotDirectory {
path: PathBuf,
},
Config(server::Error),
Io(std::io::Error),
}
type Result<T> = core::result::Result<T, Error>;
#[derive(derive_with::With)]
#[with(message_log)]
pub struct ServerActor {
server_config: Option<ServerConfig>,
generation: Generation,
webhook_actor_addr: Option<Addr<WebhookActor>>,
fs: FileSystem,
net: Network,
repository_factory: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
// testing
message_log: Option<Arc<RwLock<Vec<String>>>>,
}
impl Actor for ServerActor {
type Context = Context<Self>;
}
impl ServerActor {
pub fn new(
fs: FileSystem,
net: Network,
repo: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
) -> Self {
let generation = Generation::default();
Self {
server_config: None,
generation,
webhook_actor_addr: None,
fs,
net,
repository_factory: repo,
sleep_duration,
repo_actors: BTreeMap::new(),
message_log: None,
}
}
fn create_forge_data_directories(
&self,
server_config: &ServerConfig,
server_dir: &std::path::Path,
) -> Result<()> {
for (forge_name, _forge_config) in server_config.forges() {
let forge_dir: PathBuf = (&forge_name).into();
let path = server_dir.join(&forge_dir);
if self.fs.path_exists(&path)? {
if !self.fs.path_is_dir(&path)? {
return Err(Error::ForgeDirIsNotDirectory { path });
}
} else {
info!(%forge_name, ?path, "creating storage");
self.fs.dir_create_all(&path)?;
}
}
Ok(())
}
fn create_forge_repos(
&self,
forge_config: &ForgeConfig,
forge_name: ForgeAlias,
server_storage: &ServerStorage,
webhook: &InboundWebhook,
notify_user_recipient: Recipient<NotifyUser>,
) -> Vec<(ForgeAlias, RepoAlias, RepoActor)> {
let span =
tracing::info_span!("create_forge_repos", name = %forge_name, config = %forge_config);
let _guard = span.enter();
info!("Creating Forge");
let mut repos = vec![];
let creator = self.create_actor(forge_name, forge_config.clone(), server_storage, webhook);
for (repo_alias, server_repo_config) in forge_config.repos() {
let forge_repo = creator((
repo_alias,
server_repo_config,
notify_user_recipient.clone(),
));
info!(
alias = %forge_repo.1,
"Created Repo"
);
repos.push(forge_repo);
}
repos
}
fn create_actor(
&self,
forge_name: ForgeAlias,
forge_config: ForgeConfig,
server_storage: &ServerStorage,
webhook: &InboundWebhook,
) -> impl Fn(
(RepoAlias, &ServerRepoConfig, Recipient<NotifyUser>),
) -> (ForgeAlias, RepoAlias, RepoActor) {
let server_storage = server_storage.clone();
let webhook = webhook.clone();
let net = self.net.clone();
let repository_factory = self.repository_factory.duplicate();
let generation = self.generation;
let sleep_duration = self.sleep_duration;
// let notify_user_recipient = server_addr.recipient();
move |(repo_alias, server_repo_config, notify_user_recipient)| {
let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config);
let _guard = span.enter();
info!("Creating Repo");
let gitdir = server_repo_config.gitdir().map_or_else(
|| {
GitDir::new(
server_storage
.path()
.join(forge_name.to_string())
.join(repo_alias.to_string()),
StoragePathType::Internal,
)
},
|gitdir| gitdir,
);
// INFO: can't canonicalise gitdir as the path needs to exist to do that and we may not
// have cloned the repo yet
let repo_details = RepoDetails::new(
generation,
&repo_alias,
server_repo_config,
&forge_name,
&forge_config,
gitdir,
);
let forge = git_next_forge::Forge::create(repo_details.clone(), net.clone());
info!("Starting Repo Actor");
let actor = RepoActor::new(
repo_details,
forge,
webhook.clone(),
generation,
net.clone(),
repository_factory.duplicate(),
sleep_duration,
Some(notify_user_recipient),
);
(forge_name.clone(), repo_alias, actor)
}
}
fn start_actor(
&self,
actor: (ForgeAlias, RepoAlias, RepoActor),
) -> (RepoAlias, Addr<RepoActor>) {
let (forge_name, repo_alias, actor) = actor;
let span = tracing::info_span!("start_actor", forge = %forge_name, repo = %repo_alias);
let _guard = span.enter();
let addr = actor.start();
addr.do_send(CloneRepo);
info!("Started");
(repo_alias, addr)
}
fn server_storage(&self, server_config: &ReceiveServerConfig) -> Option<ServerStorage> {
let server_storage = server_config.storage().clone();
let dir = server_storage.path();
if !dir.exists() {
if let Err(err) = self.fs.dir_create(dir) {
error!(?err, ?dir, "Failed to create server storage");
return None;
}
}
let Ok(canon) = dir.canonicalize() else {
error!(?dir, "Failed to confirm server storage");
return None;
};
if let Err(err) = self.create_forge_data_directories(server_config, &canon) {
error!(?err, "Failure creating forge storage");
return None;
}
Some(server_storage)
}
fn do_send<M>(&mut self, msg: M, _ctx: &mut <Self as actix::Actor>::Context)
where
M: actix::Message + Send + 'static + std::fmt::Debug,
Self: actix::Handler<M>,
<M as actix::Message>::Result: Send,
{
if let Some(message_log) = &self.message_log {
let log_message = format!("send: {:?}", msg);
if let Ok(mut log) = message_log.write() {
log.push(log_message);
}
}
#[cfg(not(test))]
_ctx.address().do_send(msg);
tracing::info!("sent");
}
}

View file

@ -1,7 +1,7 @@
//
use actix::prelude::*;
use crate::{tests::given, ReceiveServerConfig, ServerActor};
use crate::server::actor::{tests::given, ReceiveServerConfig, ServerActor};
use git_next_core::{
git,
server::{Http, InboundWebhook, Notification, ServerConfig, ServerStorage},
@ -45,7 +45,7 @@ async fn when_webhook_url_has_trailing_slash_should_not_send() {
server_storage,
repos,
)));
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
actix_rt::time::sleep(std::time::Duration::from_millis(1)).await;
//then
// INFO: assert that ReceiveValidServerConfig is NOT sent

View file

@ -1,12 +1,14 @@
//
mod actor;
#[cfg(test)]
mod tests;
use actix::prelude::*;
use actor::ServerActor;
use git_next_core::git::RepositoryFactory;
use git_next_file_watcher_actor::{FileUpdated, FileWatcher};
use git_next_server_actor::ServerActor;
use kxio::{fs::FileSystem, network::Network};
use tracing::{error, info, level_filters::LevelFilter};
@ -61,7 +63,7 @@ pub fn start(
info!("Server running - Press Ctrl-C to stop...");
let _ = actix_rt::signal::ctrl_c().await;
info!("Ctrl-C received, shutting down...");
server.do_send(git_next_server_actor::messages::Shutdown);
server.do_send(crate::server::actor::messages::Shutdown);
actix_rt::time::sleep(std::time::Duration::from_millis(200)).await;
System::current().stop();
};

View file

@ -7,32 +7,7 @@ repository = { workspace = true }
description = "Server actor for git-next, the trunk-based development manager"
[dependencies]
git-next-core = { workspace = true }
git-next-forge = { workspace = true }
git-next-repo-actor = { workspace = true }
git-next-file-watcher-actor = { workspace = true }
git-next-webhook-actor = { workspace = true }
# logging
tracing = { workspace = true }
# fs/network
kxio = { workspace = true }
# boilerplate
derive_more = { workspace = true }
derive-with = { workspace = true }
# Actors
actix = { workspace = true }
# Webhooks
serde = { workspace = true }
serde_json = { workspace = true }
ulid = { workspace = true }
time = { workspace = true }
secrecy = { workspace = true }
standardwebhooks = { workspace = true }
[dev-dependencies]
# Testing

View file

@ -7,3 +7,5 @@ development workflows where each commit must pass CI before being included in
the main branch.
See [git-next](https://crates.io/crates/git-next) for more information.
N.B. this crate has been merged into [git-next](https://crates.io/git-next).

View file

@ -1,244 +1 @@
//
use actix::prelude::*;
#[cfg(test)]
mod tests;
mod handlers;
pub mod messages;
use git_next_core::{
git::{repository::factory::RepositoryFactory, Generation, RepoDetails},
server::{self, InboundWebhook, ServerConfig, ServerStorage},
ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig, StoragePathType,
};
use git_next_repo_actor::messages::NotifyUser;
use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use git_next_webhook_actor::WebhookActor;
use kxio::{fs::FileSystem, network::Network};
use std::{
collections::BTreeMap,
path::PathBuf,
sync::{Arc, RwLock},
};
use tracing::{error, info};
use messages::ReceiveServerConfig;
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
#[display("Failed to create data directories")]
FailedToCreateDataDirectory(kxio::fs::Error),
#[display("The forge data path is not a directory: {path:?}")]
ForgeDirIsNotDirectory {
path: PathBuf,
},
Config(server::Error),
Io(std::io::Error),
}
type Result<T> = core::result::Result<T, Error>;
#[derive(derive_with::With)]
#[with(message_log)]
pub struct ServerActor {
server_config: Option<ServerConfig>,
generation: Generation,
webhook_actor_addr: Option<Addr<WebhookActor>>,
fs: FileSystem,
net: Network,
repository_factory: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
// testing
message_log: Option<Arc<RwLock<Vec<String>>>>,
}
impl Actor for ServerActor {
type Context = Context<Self>;
}
impl ServerActor {
pub fn new(
fs: FileSystem,
net: Network,
repo: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
) -> Self {
let generation = Generation::default();
Self {
server_config: None,
generation,
webhook_actor_addr: None,
fs,
net,
repository_factory: repo,
sleep_duration,
repo_actors: BTreeMap::new(),
message_log: None,
}
}
fn create_forge_data_directories(
&self,
server_config: &ServerConfig,
server_dir: &std::path::Path,
) -> Result<()> {
for (forge_name, _forge_config) in server_config.forges() {
let forge_dir: PathBuf = (&forge_name).into();
let path = server_dir.join(&forge_dir);
if self.fs.path_exists(&path)? {
if !self.fs.path_is_dir(&path)? {
return Err(Error::ForgeDirIsNotDirectory { path });
}
} else {
info!(%forge_name, ?path, "creating storage");
self.fs.dir_create_all(&path)?;
}
}
Ok(())
}
fn create_forge_repos(
&self,
forge_config: &ForgeConfig,
forge_name: ForgeAlias,
server_storage: &ServerStorage,
webhook: &InboundWebhook,
notify_user_recipient: Recipient<NotifyUser>,
) -> Vec<(ForgeAlias, RepoAlias, RepoActor)> {
let span =
tracing::info_span!("create_forge_repos", name = %forge_name, config = %forge_config);
let _guard = span.enter();
info!("Creating Forge");
let mut repos = vec![];
let creator = self.create_actor(forge_name, forge_config.clone(), server_storage, webhook);
for (repo_alias, server_repo_config) in forge_config.repos() {
let forge_repo = creator((
repo_alias,
server_repo_config,
notify_user_recipient.clone(),
));
info!(
alias = %forge_repo.1,
"Created Repo"
);
repos.push(forge_repo);
}
repos
}
fn create_actor(
&self,
forge_name: ForgeAlias,
forge_config: ForgeConfig,
server_storage: &ServerStorage,
webhook: &InboundWebhook,
) -> impl Fn(
(RepoAlias, &ServerRepoConfig, Recipient<NotifyUser>),
) -> (ForgeAlias, RepoAlias, RepoActor) {
let server_storage = server_storage.clone();
let webhook = webhook.clone();
let net = self.net.clone();
let repository_factory = self.repository_factory.duplicate();
let generation = self.generation;
let sleep_duration = self.sleep_duration;
// let notify_user_recipient = server_addr.recipient();
move |(repo_alias, server_repo_config, notify_user_recipient)| {
let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config);
let _guard = span.enter();
info!("Creating Repo");
let gitdir = server_repo_config.gitdir().map_or_else(
|| {
GitDir::new(
server_storage
.path()
.join(forge_name.to_string())
.join(repo_alias.to_string()),
StoragePathType::Internal,
)
},
|gitdir| gitdir,
);
// INFO: can't canonicalise gitdir as the path needs to exist to do that and we may not
// have cloned the repo yet
let repo_details = RepoDetails::new(
generation,
&repo_alias,
server_repo_config,
&forge_name,
&forge_config,
gitdir,
);
let forge = git_next_forge::Forge::create(repo_details.clone(), net.clone());
info!("Starting Repo Actor");
let actor = RepoActor::new(
repo_details,
forge,
webhook.clone(),
generation,
net.clone(),
repository_factory.duplicate(),
sleep_duration,
Some(notify_user_recipient),
);
(forge_name.clone(), repo_alias, actor)
}
}
fn start_actor(
&self,
actor: (ForgeAlias, RepoAlias, RepoActor),
) -> (RepoAlias, Addr<RepoActor>) {
let (forge_name, repo_alias, actor) = actor;
let span = tracing::info_span!("start_actor", forge = %forge_name, repo = %repo_alias);
let _guard = span.enter();
let addr = actor.start();
addr.do_send(CloneRepo);
info!("Started");
(repo_alias, addr)
}
fn server_storage(&self, server_config: &ReceiveServerConfig) -> Option<ServerStorage> {
let server_storage = server_config.storage().clone();
let dir = server_storage.path();
if !dir.exists() {
if let Err(err) = self.fs.dir_create(dir) {
error!(?err, ?dir, "Failed to create server storage");
return None;
}
}
let Ok(canon) = dir.canonicalize() else {
error!(?dir, "Failed to confirm server storage");
return None;
};
if let Err(err) = self.create_forge_data_directories(server_config, &canon) {
error!(?err, "Failure creating forge storage");
return None;
}
Some(server_storage)
}
fn do_send<M>(&mut self, msg: M, _ctx: &mut <Self as actix::Actor>::Context)
where
M: actix::Message + Send + 'static + std::fmt::Debug,
Self: actix::Handler<M>,
<M as actix::Message>::Result: Send,
{
if let Some(message_log) = &self.message_log {
let log_message = format!("send: {:?}", msg);
if let Ok(mut log) = message_log.write() {
log.push(log_message);
}
}
#[cfg(not(test))]
_ctx.address().do_send(msg);
tracing::info!("sent");
}
}
// moved to /crates/cli/src/server/actor