git-next/crates/server-actor/src/lib.rs

246 lines
7.8 KiB
Rust

//
use actix::prelude::*;
#[cfg(test)]
mod tests;
mod handlers;
pub mod messages;
use git_next_core::{
server::{self, InboundWebhook, ServerConfig, ServerStorage},
ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig, StoragePathType,
};
use git_next_git::{repository::factory::RepositoryFactory, Generation, RepoDetails};
use git_next_repo_actor::messages::NotifyUser;
use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use git_next_webhook_actor::WebhookActor;
use kxio::{fs::FileSystem, network::Network};
use std::{
collections::BTreeMap,
path::PathBuf,
sync::{Arc, RwLock},
};
use tracing::{error, info};
use messages::ReceiveServerConfig;
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
#[display("Failed to create data directories")]
FailedToCreateDataDirectory(kxio::fs::Error),
#[display("The forge data path is not a directory: {path:?}")]
ForgeDirIsNotDirectory {
path: PathBuf,
},
Config(server::Error),
Io(std::io::Error),
}
type Result<T> = core::result::Result<T, Error>;
#[derive(derive_with::With)]
#[with(message_log)]
pub struct ServerActor {
server_config: Option<ServerConfig>,
generation: Generation,
webhook_actor_addr: Option<Addr<WebhookActor>>,
fs: FileSystem,
net: Network,
repository_factory: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
repo_actors: BTreeMap<(ForgeAlias, RepoAlias), Addr<RepoActor>>,
// testing
message_log: Option<Arc<RwLock<Vec<String>>>>,
}
impl Actor for ServerActor {
type Context = Context<Self>;
}
impl ServerActor {
pub fn new(
fs: FileSystem,
net: Network,
repo: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
) -> Self {
let generation = Generation::default();
Self {
server_config: None,
generation,
webhook_actor_addr: None,
fs,
net,
repository_factory: repo,
sleep_duration,
repo_actors: BTreeMap::new(),
message_log: None,
}
}
fn create_forge_data_directories(
&self,
server_config: &ServerConfig,
server_dir: &std::path::Path,
) -> Result<()> {
for (forge_name, _forge_config) in server_config.forges() {
let forge_dir: PathBuf = (&forge_name).into();
let path = server_dir.join(&forge_dir);
if self.fs.path_exists(&path)? {
if !self.fs.path_is_dir(&path)? {
return Err(Error::ForgeDirIsNotDirectory { path });
}
} else {
info!(%forge_name, ?path, "creating storage");
self.fs.dir_create_all(&path)?;
}
}
Ok(())
}
fn create_forge_repos(
&self,
forge_config: &ForgeConfig,
forge_name: ForgeAlias,
server_storage: &ServerStorage,
webhook: &InboundWebhook,
notify_user_recipient: Recipient<NotifyUser>,
) -> Vec<(ForgeAlias, RepoAlias, RepoActor)> {
let span =
tracing::info_span!("create_forge_repos", name = %forge_name, config = %forge_config);
let _guard = span.enter();
info!("Creating Forge");
let mut repos = vec![];
let creator = self.create_actor(forge_name, forge_config.clone(), server_storage, webhook);
for (repo_alias, server_repo_config) in forge_config.repos() {
let forge_repo = creator((
repo_alias,
server_repo_config,
notify_user_recipient.clone(),
));
info!(
alias = %forge_repo.1,
"Created Repo"
);
repos.push(forge_repo);
}
repos
}
fn create_actor(
&self,
forge_name: ForgeAlias,
forge_config: ForgeConfig,
server_storage: &ServerStorage,
webhook: &InboundWebhook,
) -> impl Fn(
(RepoAlias, &ServerRepoConfig, Recipient<NotifyUser>),
) -> (ForgeAlias, RepoAlias, RepoActor) {
let server_storage = server_storage.clone();
let webhook = webhook.clone();
let net = self.net.clone();
let repository_factory = self.repository_factory.duplicate();
let generation = self.generation;
let sleep_duration = self.sleep_duration;
// let notify_user_recipient = server_addr.recipient();
move |(repo_alias, server_repo_config, notify_user_recipient)| {
let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config);
let _guard = span.enter();
info!("Creating Repo");
let gitdir = server_repo_config.gitdir().map_or_else(
|| {
GitDir::new(
server_storage
.path()
.join(forge_name.to_string())
.join(repo_alias.to_string()),
StoragePathType::Internal,
)
},
|gitdir| gitdir,
);
// INFO: can't canonicalise gitdir as the path needs to exist to do that and we may not
// have cloned the repo yet
let repo_details = RepoDetails::new(
generation,
&repo_alias,
server_repo_config,
&forge_name,
&forge_config,
gitdir,
);
let forge = git_next_forge::Forge::create(repo_details.clone(), net.clone());
info!("Starting Repo Actor");
let actor = RepoActor::new(
repo_details,
forge,
webhook.clone(),
generation,
net.clone(),
repository_factory.duplicate(),
sleep_duration,
Some(notify_user_recipient),
);
(forge_name.clone(), repo_alias, actor)
}
}
fn start_actor(
&self,
actor: (ForgeAlias, RepoAlias, RepoActor),
) -> (RepoAlias, Addr<RepoActor>) {
let (forge_name, repo_alias, actor) = actor;
let span = tracing::info_span!("start_actor", forge = %forge_name, repo = %repo_alias);
let _guard = span.enter();
let addr = actor.start();
addr.do_send(CloneRepo);
info!("Started");
(repo_alias, addr)
}
fn server_storage(&self, server_config: &ReceiveServerConfig) -> Option<ServerStorage> {
let server_storage = server_config.storage().clone();
let dir = server_storage.path();
if !dir.exists() {
if let Err(err) = self.fs.dir_create(dir) {
error!(?err, ?dir, "Failed to create server storage");
return None;
}
}
let Ok(canon) = dir.canonicalize() else {
error!(?dir, "Failed to confirm server storage");
return None;
};
if let Err(err) = self.create_forge_data_directories(server_config, &canon) {
error!(?err, "Failure creating forge storage");
return None;
}
Some(server_storage)
}
fn do_send<M>(&mut self, msg: M, _ctx: &mut <Self as actix::Actor>::Context)
where
M: actix::Message + Send + 'static + std::fmt::Debug,
Self: actix::Handler<M>,
<M as actix::Message>::Result: Send,
{
tracing::info!(?msg, "send");
if let Some(message_log) = &self.message_log {
let log_message = format!("send: {:?}", msg);
tracing::debug!(log_message);
if let Ok(mut log) = message_log.write() {
log.push(log_message);
}
}
#[cfg(not(test))]
_ctx.address().do_send(msg);
tracing::info!("sent");
}
}