// #[cfg(test)] mod tests; use actix::prelude::*; use derive_more::Constructor; use git_next_actor_macros::message; use git_next_config as config; use git_next_config::server::{ServerConfig, ServerStorage, Webhook}; use git_next_config::{ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig}; use git_next_file_watcher_actor::FileUpdated; use git_next_git::{Generation, RepoDetails}; use git_next_repo_actor::{messages::CloneRepo, RepoActor}; use git_next_webhook_actor as webhook; use kxio::{fs::FileSystem, network::Network}; use std::{ net::SocketAddr, path::PathBuf, sync::{Arc, RwLock}, }; use tracing::{error, info}; use webhook::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter}; pub use git_next_git::repository::{factory::real as repository_factory, RepositoryFactory}; message!(ReceiveServerConfig: ServerConfig: "Notification of newly loaded server configuration. This message will prompt the `git-next` server to stop and restart all repo-actors. Contains the new server configuration."); #[derive(Clone, Debug, PartialEq, Eq, Constructor)] pub struct ValidServerConfig { server_config: ServerConfig, socket_address: SocketAddr, server_storage: ServerStorage, } message!(ReceiveValidServerConfig: ValidServerConfig: "Notification of validated server configuration."); #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { #[display("Failed to create data directories")] FailedToCreateDataDirectory(kxio::fs::Error), #[display("The forge data path is not a directory: {path:?}")] ForgeDirIsNotDirectory { path: PathBuf, }, Config(config::server::Error), Io(std::io::Error), } type Result = core::result::Result; #[derive(derive_with::With)] #[with(message_log)] pub struct Server { generation: Generation, webhook: Option>, fs: FileSystem, net: Network, repository_factory: Box, sleep_duration: std::time::Duration, // testing message_log: Option>>>, } impl Actor for Server { type Context = Context; } impl Handler for Server { type Result = (); fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result { let server_config = match ServerConfig::load(&self.fs) { Ok(server_config) => server_config, Err(err) => { error!("Failed to load config file. Error: {}", err); return; } }; self.do_send(ReceiveServerConfig::new(server_config), ctx); } } impl Handler for Server { type Result = (); #[allow(clippy::cognitive_complexity)] fn handle(&mut self, msg: ReceiveServerConfig, ctx: &mut Self::Context) -> Self::Result { tracing::info!("recieved server config"); let Ok(socket_addr) = msg.http() else { error!("Unable to parse http.addr"); return; }; let Some(server_storage) = self.server_storage(&msg) else { error!("Server storage not available"); return; }; if msg.webhook().base_url().ends_with('/') { error!("webhook.url must not end with a '/'"); return; } self.do_send( ReceiveValidServerConfig::new(ValidServerConfig::new( msg.0, socket_addr, server_storage, )), ctx, ); } } impl Handler for Server { type Result = (); fn handle(&mut self, msg: ReceiveValidServerConfig, _ctx: &mut Self::Context) -> Self::Result { let ValidServerConfig { server_config, socket_address, server_storage, } = msg.0; if let Some(webhook) = self.webhook.take() { webhook.do_send(ShutdownWebhook); } self.generation.inc(); // Webhook Server info!("Starting Webhook Server..."); let webhook_router = WebhookRouter::default().start(); let webhook = server_config.webhook(); // Forge Actors for (forge_alias, forge_config) in server_config.forges() { self.create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook) .into_iter() .map(|a| self.start_actor(a)) .map(|(repo_alias, addr)| { AddWebhookRecipient::new(forge_alias.clone(), repo_alias, addr.recipient()) }) .for_each(|msg| webhook_router.do_send(msg)); } let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start(); self.webhook.replace(webhook); } } impl Server { pub fn new( fs: FileSystem, net: Network, repo: Box, sleep_duration: std::time::Duration, ) -> Self { let generation = Generation::default(); Self { generation, webhook: None, fs, net, repository_factory: repo, sleep_duration, message_log: None, } } fn create_forge_data_directories( &self, server_config: &ServerConfig, server_dir: &std::path::Path, ) -> Result<()> { for (forge_name, _forge_config) in server_config.forges() { let forge_dir: PathBuf = (&forge_name).into(); let path = server_dir.join(&forge_dir); if self.fs.path_exists(&path)? { if !self.fs.path_is_dir(&path)? { return Err(Error::ForgeDirIsNotDirectory { path }); } } else { info!(%forge_name, ?path, "creating storage"); self.fs.dir_create_all(&path)?; } } Ok(()) } fn create_forge_repos( &self, forge_config: &ForgeConfig, forge_name: ForgeAlias, server_storage: &ServerStorage, webhook: &Webhook, ) -> Vec<(ForgeAlias, RepoAlias, RepoActor)> { let span = tracing::info_span!("create_forge_repos", name = %forge_name, config = %forge_config); let _guard = span.enter(); info!("Creating Forge"); let mut repos = vec![]; let creator = self.create_actor(forge_name, forge_config.clone(), server_storage, webhook); for (repo_alias, server_repo_config) in forge_config.repos() { let forge_repo = creator((repo_alias, server_repo_config)); info!( alias = %forge_repo.1, "Created Repo" ); repos.push(forge_repo); } repos } fn create_actor( &self, forge_name: ForgeAlias, forge_config: ForgeConfig, server_storage: &ServerStorage, webhook: &Webhook, ) -> impl Fn((RepoAlias, &ServerRepoConfig)) -> (ForgeAlias, RepoAlias, RepoActor) { let server_storage = server_storage.clone(); let webhook = webhook.clone(); let net = self.net.clone(); let repository_factory = self.repository_factory.duplicate(); let generation = self.generation; let sleep_duration = self.sleep_duration; move |(repo_alias, server_repo_config)| { let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config); let _guard = span.enter(); info!("Creating Repo"); let gitdir = server_repo_config.gitdir().map_or_else( || { GitDir::new( server_storage .path() .join(forge_name.to_string()) .join(repo_alias.to_string()), config::git_dir::StoragePathType::Internal, ) }, |gitdir| gitdir, ); // INFO: can't canonicalise gitdir as the path needs to exist to do that and we may not // have cloned the repo yet let repo_details = RepoDetails::new( generation, &repo_alias, server_repo_config, &forge_name, &forge_config, gitdir, ); let forge = git_next_forge::Forge::create(repo_details.clone(), net.clone()); info!("Starting Repo Actor"); let actor = RepoActor::new( repo_details, forge, webhook.clone(), generation, net.clone(), repository_factory.duplicate(), sleep_duration, ); (forge_name.clone(), repo_alias, actor) } } fn start_actor( &self, actor: (ForgeAlias, RepoAlias, RepoActor), ) -> (RepoAlias, Addr) { let (forge_name, repo_alias, actor) = actor; let span = tracing::info_span!("start_actor", forge = %forge_name, repo = %repo_alias); let _guard = span.enter(); let addr = actor.start(); addr.do_send(CloneRepo); info!("Started"); (repo_alias, addr) } fn server_storage(&self, server_config: &ReceiveServerConfig) -> Option { let server_storage = server_config.storage().clone(); let dir = server_storage.path(); if !dir.exists() { if let Err(err) = self.fs.dir_create(dir) { error!(?err, ?dir, "Failed to create server storage"); return None; } } let Ok(canon) = dir.canonicalize() else { error!(?dir, "Failed to confirm server storage"); return None; }; if let Err(err) = self.create_forge_data_directories(server_config, &canon) { error!(?err, "Failure creating forge storage"); return None; } Some(server_storage) } fn do_send(&mut self, msg: M, _ctx: &mut ::Context) where M: actix::Message + Send + 'static + std::fmt::Debug, Self: actix::Handler, ::Result: Send, { tracing::info!(?msg, "send"); if let Some(message_log) = &self.message_log { let log_message = format!("send: {:?}", msg); tracing::debug!(log_message); if let Ok(mut log) = message_log.write() { log.push(log_message); } } #[cfg(not(test))] _ctx.address().do_send(msg); tracing::info!("sent"); } }