refactor: split messages and handlers for server-actor
All checks were successful
Rust / build (push) Successful in 1m46s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful

This commit is contained in:
Paul Campbell 2024-07-11 19:10:04 +01:00
parent 7578ab3144
commit 681b2c4c10
6 changed files with 127 additions and 94 deletions

View file

@ -0,0 +1,21 @@
//-
use actix::prelude::*;
use git_next_config::server::ServerConfig;
use git_next_file_watcher_actor::FileUpdated;
use crate::{messages::ReceiveServerConfig, Server};
impl Handler<FileUpdated> for Server {
type Result = ();
fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result {
let server_config = match ServerConfig::load(&self.fs) {
Ok(server_config) => server_config,
Err(err) => {
tracing::error!("Failed to load config file. Error: {}", err);
return;
}
};
self.do_send(ReceiveServerConfig::new(server_config), ctx);
}
}

View file

@ -0,0 +1,3 @@
mod file_updated;
mod receive_server_config;
mod receive_valid_server_config;

View file

@ -0,0 +1,38 @@
use actix::prelude::*;
use crate::{
messages::{ReceiveServerConfig, ReceiveValidServerConfig, ValidServerConfig},
Server,
};
impl Handler<ReceiveServerConfig> for Server {
type Result = ();
#[allow(clippy::cognitive_complexity)]
fn handle(&mut self, msg: ReceiveServerConfig, ctx: &mut Self::Context) -> Self::Result {
tracing::info!("recieved server config");
let Ok(socket_addr) = msg.http() else {
tracing::error!("Unable to parse http.addr");
return;
};
let Some(server_storage) = self.server_storage(&msg) else {
tracing::error!("Server storage not available");
return;
};
if msg.webhook().base_url().ends_with('/') {
tracing::error!("webhook.url must not end with a '/'");
return;
}
self.do_send(
ReceiveValidServerConfig::new(ValidServerConfig::new(
msg.unwrap(),
socket_addr,
server_storage,
)),
ctx,
);
}
}

View file

@ -0,0 +1,39 @@
use actix::prelude::*;
use git_next_webhook_actor::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter};
use crate::{
messages::{ReceiveValidServerConfig, ValidServerConfig},
Server,
};
impl Handler<ReceiveValidServerConfig> for Server {
type Result = ();
fn handle(&mut self, msg: ReceiveValidServerConfig, _ctx: &mut Self::Context) -> Self::Result {
let ValidServerConfig {
server_config,
socket_address,
server_storage,
} = msg.unwrap();
if let Some(webhook) = self.webhook.take() {
webhook.do_send(ShutdownWebhook);
}
self.generation.inc();
// Webhook Server
tracing::info!("Starting Webhook Server...");
let webhook_router = WebhookRouter::default().start();
let webhook = server_config.webhook();
// Forge Actors
for (forge_alias, forge_config) in server_config.forges() {
self.create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook)
.into_iter()
.map(|a| self.start_actor(a))
.map(|(repo_alias, addr)| {
AddWebhookRecipient::new(forge_alias.clone(), repo_alias, addr.recipient())
})
.for_each(|msg| webhook_router.do_send(msg));
}
let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start();
self.webhook.replace(webhook);
}
}

View file

@ -2,41 +2,27 @@
#[cfg(test)]
mod tests;
mod handlers;
mod messages;
use actix::prelude::*;
use derive_more::Constructor;
use git_next_actor_macros::message;
use git_next_config as config;
use git_next_config::server::{ServerConfig, ServerStorage, Webhook};
use git_next_config::{ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig};
use git_next_file_watcher_actor::FileUpdated;
use git_next_git::{Generation, RepoDetails};
use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use git_next_webhook_actor as webhook;
use kxio::{fs::FileSystem, network::Network};
use std::{
net::SocketAddr,
path::PathBuf,
sync::{Arc, RwLock},
};
use tracing::{error, info};
use webhook::{AddWebhookRecipient, ShutdownWebhook, WebhookActor, WebhookRouter};
use webhook::WebhookActor;
pub use git_next_git::repository::{factory::real as repository_factory, RepositoryFactory};
message!(ReceiveServerConfig: ServerConfig: "Notification of newly loaded server configuration.
This message will prompt the `git-next` server to stop and restart all repo-actors.
Contains the new server configuration.");
#[derive(Clone, Debug, PartialEq, Eq, Constructor)]
pub struct ValidServerConfig {
server_config: ServerConfig,
socket_address: SocketAddr,
server_storage: ServerStorage,
}
message!(ReceiveValidServerConfig: ValidServerConfig: "Notification of validated server configuration.");
use crate::messages::ReceiveServerConfig;
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
@ -70,82 +56,7 @@ pub struct Server {
impl Actor for Server {
type Context = Context<Self>;
}
impl Handler<FileUpdated> for Server {
type Result = ();
fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result {
let server_config = match ServerConfig::load(&self.fs) {
Ok(server_config) => server_config,
Err(err) => {
error!("Failed to load config file. Error: {}", err);
return;
}
};
self.do_send(ReceiveServerConfig::new(server_config), ctx);
}
}
impl Handler<ReceiveServerConfig> for Server {
type Result = ();
#[allow(clippy::cognitive_complexity)]
fn handle(&mut self, msg: ReceiveServerConfig, ctx: &mut Self::Context) -> Self::Result {
tracing::info!("recieved server config");
let Ok(socket_addr) = msg.http() else {
error!("Unable to parse http.addr");
return;
};
let Some(server_storage) = self.server_storage(&msg) else {
error!("Server storage not available");
return;
};
if msg.webhook().base_url().ends_with('/') {
error!("webhook.url must not end with a '/'");
return;
}
self.do_send(
ReceiveValidServerConfig::new(ValidServerConfig::new(
msg.0,
socket_addr,
server_storage,
)),
ctx,
);
}
}
impl Handler<ReceiveValidServerConfig> for Server {
type Result = ();
fn handle(&mut self, msg: ReceiveValidServerConfig, _ctx: &mut Self::Context) -> Self::Result {
let ValidServerConfig {
server_config,
socket_address,
server_storage,
} = msg.0;
if let Some(webhook) = self.webhook.take() {
webhook.do_send(ShutdownWebhook);
}
self.generation.inc();
// Webhook Server
info!("Starting Webhook Server...");
let webhook_router = WebhookRouter::default().start();
let webhook = server_config.webhook();
// Forge Actors
for (forge_alias, forge_config) in server_config.forges() {
self.create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook)
.into_iter()
.map(|a| self.start_actor(a))
.map(|(repo_alias, addr)| {
AddWebhookRecipient::new(forge_alias.clone(), repo_alias, addr.recipient())
})
.for_each(|msg| webhook_router.do_send(msg));
}
let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start();
self.webhook.replace(webhook);
}
}
impl Server {
pub fn new(
fs: FileSystem,

View file

@ -0,0 +1,21 @@
//-
use derive_more::Constructor;
use git_next_actor_macros::message;
use git_next_config::server::{ServerConfig, ServerStorage};
use std::net::SocketAddr;
// receive server config
message!(ReceiveServerConfig: ServerConfig: "Notification of newly loaded server configuration.
This message will prompt the `git-next` server to stop and restart all repo-actors.
Contains the new server configuration.");
// receive valid server config
#[derive(Clone, Debug, PartialEq, Eq, Constructor)]
pub struct ValidServerConfig {
pub server_config: ServerConfig,
pub socket_address: SocketAddr,
pub server_storage: ServerStorage,
}
message!(ReceiveValidServerConfig: ValidServerConfig: "Notification of validated server configuration.");