WIP: refactor: repo-actor: rewrite tests using mockall
All checks were successful
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful

This commit is contained in:
Paul Campbell 2024-06-19 07:03:08 +01:00
parent 601e400300
commit 65b9c4fb1e
24 changed files with 1736 additions and 442 deletions

View file

@ -4,7 +4,7 @@
# Complete help on configuration: https://dystroy.org/bacon/config/ # Complete help on configuration: https://dystroy.org/bacon/config/
default_job = "check" default_job = "check"
reverse = true # reverse = true
[jobs.check] [jobs.check]
command = ["cargo", "check", "--color", "always"] command = ["cargo", "check", "--color", "always"]
@ -57,7 +57,7 @@ on_success = "back" # so that we don't open the browser at each change
# If you want to pass options to your program, a `--` separator # If you want to pass options to your program, a `--` separator
# will be needed. # will be needed.
[jobs.run] [jobs.run]
command = [ "cargo", "run", "--color", "always" ] command = ["cargo", "run", "--color", "always"]
need_stdout = true need_stdout = true
allow_warnings = true allow_warnings = true

View file

@ -42,7 +42,8 @@ async fn main() {
git_next_server::init(fs); git_next_server::init(fs);
} }
Server::Start => { Server::Start => {
git_next_server::start(fs, net, repo).await; let sleep_duration = std::time::Duration::from_secs(10);
git_next_server::start(fs, net, repo, sleep_duration).await;
} }
}, },
} }

View file

@ -16,6 +16,7 @@ use crate::RepoConfigSource;
serde::Serialize, serde::Serialize,
derive_more::Constructor, derive_more::Constructor,
derive_more::Display, derive_more::Display,
derive_with::With,
)] )]
#[display("{}", branches)] #[display("{}", branches)]
pub struct RepoConfig { pub struct RepoConfig {

View file

@ -1,7 +1,8 @@
use git_next_config::{ use config::{
BranchName, ForgeAlias, ForgeConfig, ForgeDetails, GitDir, RepoAlias, RepoConfig, RepoPath, BranchName, ForgeAlias, ForgeConfig, ForgeDetails, GitDir, RepoAlias, RepoConfig, RepoPath,
ServerRepoConfig, ServerRepoConfig,
}; };
use git_next_config as config;
use super::{Generation, GitRemote}; use super::{Generation, GitRemote};
@ -57,4 +58,10 @@ impl RepoDetails {
pub fn git_remote(&self) -> GitRemote { pub fn git_remote(&self) -> GitRemote {
GitRemote::new(self.forge.hostname().clone(), self.repo_path.clone()) GitRemote::new(self.forge.hostname().clone(), self.repo_path.clone())
} }
pub fn with_hostname(mut self, hostname: config::Hostname) -> Self {
let forge = self.forge;
self.forge = forge.with_hostname(hostname);
self
}
} }

View file

@ -125,7 +125,7 @@ impl std::ops::Deref for Repository {
} }
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum Direction { pub enum Direction {
/// Push local changes to the remote. /// Push local changes to the remote.
Push, Push,

View file

@ -100,13 +100,13 @@ fn is_not_based_on(commits: &[crate::commit::Commit], needle: &crate::Commit) ->
} }
fn get_commit_histories( fn get_commit_histories(
repository: &dyn git::repository::OpenRepositoryLike, open_repository: &dyn git::repository::OpenRepositoryLike,
repo_config: &config::RepoConfig, repo_config: &config::RepoConfig,
) -> git::commit::log::Result<git::commit::Histories> { ) -> git::commit::log::Result<git::commit::Histories> {
let main = (repository.commit_log(&repo_config.branches().main(), &[]))?; let main = (open_repository.commit_log(&repo_config.branches().main(), &[]))?;
let main_head = [main[0].clone()]; let main_head = [main[0].clone()];
let next = repository.commit_log(&repo_config.branches().next(), &main_head)?; let next = open_repository.commit_log(&repo_config.branches().next(), &main_head)?;
let dev = repository.commit_log(&repo_config.branches().dev(), &main_head)?; let dev = open_repository.commit_log(&repo_config.branches().dev(), &main_head)?;
let histories = git::commit::Histories { main, next, dev }; let histories = git::commit::Histories { main, next, dev };
Ok(histories) Ok(histories)
} }

View file

@ -42,12 +42,20 @@ ulid = { workspace = true }
# boilerplate # boilerplate
derive_more = { workspace = true } derive_more = { workspace = true }
thiserror = { workspace = true }
# Actors # Actors
actix = { workspace = true } actix = { workspace = true }
actix-rt = { workspace = true } actix-rt = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
[dev-dependencies]
# Testing
assert2 = { workspace = true }
rand = { workspace = true }
pretty_assertions = { workspace = true }
mockall = { workspace = true }
[lints.clippy] [lints.clippy]
nursery = { level = "warn", priority = -1 } nursery = { level = "warn", priority = -1 }
# pedantic = "warn" # pedantic = "warn"

View file

@ -1,78 +1,67 @@
// //
use actix::prelude::*; use crate::messages::MessageToken;
use git_next_config as config; use git_next_config as config;
use git_next_git as git; use git_next_git as git;
use derive_more::Display;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{MessageToken, ValidateRepo};
use std::time::Duration;
// advance next to the next commit towards the head of the dev branch // advance next to the next commit towards the head of the dev branch
#[tracing::instrument(fields(next), skip_all)] #[tracing::instrument(fields(next), skip_all)]
pub async fn advance_next( pub async fn advance_next(
next: git::Commit, next: &git::Commit,
dev_commit_history: Vec<git::Commit>, dev_commit_history: &[git::Commit],
repo_details: git::RepoDetails, repo_details: git::RepoDetails,
repo_config: config::RepoConfig, repo_config: config::RepoConfig,
open_repository: &dyn git::repository::OpenRepositoryLike, open_repository: &dyn git::repository::OpenRepositoryLike,
addr: Addr<super::RepoActor>,
message_token: MessageToken, message_token: MessageToken,
) { ) -> Result<MessageToken> {
let next_commit = find_next_commit_on_dev(next, dev_commit_history); let commit =
let Some(commit) = next_commit else { find_next_commit_on_dev(next, dev_commit_history).ok_or_else(|| Error::NextAtDev)?;
warn!("No commits to advance next to"); validate_commit_message(commit.message())?;
return;
};
if let Some(problem) = validate_commit_message(commit.message()) {
warn!("Can't advance next to commit '{}': {}", commit, problem);
return;
}
info!("Advancing next to commit '{}'", commit); info!("Advancing next to commit '{}'", commit);
if let Err(err) = git::push::reset( git::push::reset(
open_repository, open_repository,
&repo_details, &repo_details,
&repo_config.branches().next(), &repo_config.branches().next(),
&commit.into(), &commit.into(),
&git::push::Force::No, &git::push::Force::No,
) { )?;
warn!(?err, "Failed") Ok(message_token)
}
tokio::time::sleep(Duration::from_secs(10)).await;
addr.do_send(ValidateRepo { message_token })
} }
#[tracing::instrument] #[tracing::instrument]
fn validate_commit_message(message: &git::commit::Message) -> Option<String> { fn validate_commit_message(message: &git::commit::Message) -> Result<()> {
let message = &message.to_string(); let message = &message.to_string();
if message.to_ascii_lowercase().starts_with("wip") { if message.to_ascii_lowercase().starts_with("wip") {
return Some("Is Work-In-Progress".to_string()); return Err(Error::IsWorkInProgress);
} }
match git_conventional::Commit::parse(message) { match git_conventional::Commit::parse(message) {
Ok(commit) => { Ok(commit) => {
info!(?commit, "Pass"); info!(?commit, "Pass");
None Ok(())
} }
Err(err) => { Err(err) => {
warn!(?err, "Fail"); warn!(?err, "Fail");
Some(err.kind().to_string()) Err(Error::InvalidCommitMessage {
reason: err.kind().to_string(),
})
} }
} }
} }
pub fn find_next_commit_on_dev( pub fn find_next_commit_on_dev(
next: git::Commit, next: &git::Commit,
dev_commit_history: Vec<git::Commit>, dev_commit_history: &[git::Commit],
) -> Option<git::Commit> { ) -> Option<git::Commit> {
let mut next_commit: Option<git::Commit> = None; let mut next_commit: Option<&git::Commit> = None;
for commit in dev_commit_history.into_iter() { for commit in dev_commit_history.iter() {
if commit == next { if commit == next {
break; break;
}; };
next_commit.replace(commit); next_commit.replace(commit);
} }
next_commit next_commit.cloned()
} }
// advance main branch to the commit 'next' // advance main branch to the commit 'next'
@ -82,15 +71,30 @@ pub async fn advance_main(
repo_details: &git::RepoDetails, repo_details: &git::RepoDetails,
repo_config: &config::RepoConfig, repo_config: &config::RepoConfig,
open_repository: &dyn git::repository::OpenRepositoryLike, open_repository: &dyn git::repository::OpenRepositoryLike,
) { ) -> Result<()> {
info!("Advancing main to next"); info!("Advancing main to next");
if let Err(err) = git::push::reset( git::push::reset(
open_repository, open_repository,
repo_details, repo_details,
&repo_config.branches().main(), &repo_config.branches().main(),
&next.into(), &next.into(),
&git::push::Force::No, &git::push::Force::No,
) { )?;
warn!(?err, "Failed") Ok(())
}; }
pub type Result<T> = core::result::Result<T, Error>;
#[derive(Debug, thiserror::Error, Display)]
pub enum Error {
#[display("push: {}", 0)]
Push(#[from] git::push::Error),
#[display("no commits to advance next to")]
NextAtDev,
#[display("commit is a Work-in-progress")]
IsWorkInProgress,
#[display("commit message is not in conventional commit format: {reason}")]
InvalidCommitMessage { reason: String },
} }

View file

@ -0,0 +1,53 @@
//
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
impl Handler<actor::messages::AdvanceMainTo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(repo = %self.repo_details, commit = ?msg))]
fn handle(
&mut self,
msg: actor::messages::AdvanceMainTo,
ctx: &mut Self::Context,
) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
tracing::warn!("No config loaded");
return;
};
self.open_repository.iter().for_each(|open_repository| {
let open_repository = open_repository.duplicate();
let repo_details = self.repo_details.clone();
let repo_config = repo_config.clone();
let addr = ctx.address();
let msg = msg.clone();
let message_token = self.message_token;
async move {
match actor::branch::advance_main(
msg.unwrap(),
&repo_details,
&repo_config,
&*open_repository,
)
.await
{
Err(err) => {
tracing::warn!("advance main: {err}");
}
Ok(_) => match repo_config.source() {
git_next_config::RepoConfigSource::Repo => {
addr.do_send(actor::messages::LoadConfigFromRepo)
}
git_next_config::RepoConfigSource::Server => {
addr.do_send(actor::messages::ValidateRepo::new(message_token))
}
},
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
});
}
}

View file

@ -0,0 +1,37 @@
//
use actix::prelude::*;
use crate as actor;
use git_next_git as git;
impl Handler<actor::messages::CloneRepo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self.repo_details /*, gitdir = %self.repo_details.gitdir */))]
fn handle(
&mut self,
_msg: actor::messages::CloneRepo,
ctx: &mut Self::Context,
) -> Self::Result {
println!("handler clone repo start");
let gitdir = self.repo_details.gitdir.clone();
match git::repository::open(&*self.repository_factory, &self.repo_details, gitdir) {
Ok(repository) => {
println!("- open okay");
self.open_repository.replace(repository);
if self.repo_details.repo_config.is_none() {
println!("need to load config from repo");
ctx.address().do_send(actor::messages::LoadConfigFromRepo);
} else {
println!("need to validate repo");
ctx.address()
.do_send(actor::messages::ValidateRepo::new(self.message_token));
}
}
Err(err) => {
println!("err: {err:?}");
tracing::warn!("Could not open repo: {err}")
}
}
println!("handler clone repo finish");
}
}

View file

@ -0,0 +1,32 @@
//
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
impl Handler<actor::messages::LoadConfigFromRepo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self.repo_details))]
fn handle(
&mut self,
_msg: actor::messages::LoadConfigFromRepo,
ctx: &mut Self::Context,
) -> Self::Result {
self.open_repository.iter().for_each(|open_repository| {
let open_repository = open_repository.duplicate();
let repo_details = self.repo_details.clone();
let addr = ctx.address();
async move {
match actor::load::config_from_repository(repo_details, &*open_repository).await {
Ok(repo_config) => {
addr.do_send(actor::messages::LoadedConfig::new(repo_config))
}
Err(err) => tracing::warn!(?err, "Failed to load config"),
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
});
}
}

View file

@ -0,0 +1,20 @@
//
use actix::prelude::*;
use crate as actor;
impl Handler<actor::messages::LoadedConfig> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.repo_details, branches = ?msg))]
fn handle(
&mut self,
msg: actor::messages::LoadedConfig,
ctx: &mut Self::Context,
) -> Self::Result {
let repo_config = msg.unwrap();
self.repo_details.repo_config.replace(repo_config);
ctx.address()
.do_send(actor::messages::ValidateRepo::new(self.message_token));
}
}

View file

@ -0,0 +1,8 @@
pub mod advance_to_main;
pub mod clone;
pub mod load_config_from_repo;
pub mod loaded_config;
pub mod start_monitoring;
pub mod validate_repo;
pub mod webhook_message;
pub mod webhook_registered;

View file

@ -0,0 +1,88 @@
//
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
use git_next_git as git;
impl Handler<actor::messages::StartMonitoring> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all,
fields(token = %self.message_token, repo = %self.repo_details, main = %msg.main(), next = %msg.next(), dev = %msg.dev()))
]
fn handle(
&mut self,
msg: actor::messages::StartMonitoring,
ctx: &mut Self::Context,
) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
tracing::warn!("No config loaded");
return;
};
let next_ahead_of_main = msg.main() != msg.next();
let dev_ahead_of_next = msg.next() != msg.dev();
tracing::info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring");
let addr = ctx.address();
let forge = self.forge.clone();
if next_ahead_of_main {
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
async move {
// get the status - pass, fail, pending (all others map to fail, e.g. error)
let status = forge.commit_status(msg.next()).await;
tracing::info!(?status, "Checking next branch");
match status {
git::forge::commit::Status::Pass => {
addr.do_send(actor::messages::AdvanceMainTo::new(msg.next().clone()));
}
git::forge::commit::Status::Pending => {
tokio::time::sleep(sleep_duration).await;
addr.do_send(actor::messages::ValidateRepo::new(message_token));
}
git::forge::commit::Status::Fail => {
tracing::warn!("Checks have failed");
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
} else if dev_ahead_of_next {
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
self.open_repository.iter().for_each(|open_repository| {
let open_repository = open_repository.duplicate();
let msg = msg.clone();
let repo_details = self.repo_details.clone();
let repo_config = repo_config.clone();
let addr = addr.clone();
async move {
match actor::branch::advance_next(
msg.next(),
msg.dev_commit_history(),
repo_details,
repo_config,
&*open_repository,
message_token,
)
.await
{
Ok(message_token) => {
// pause to allow any CI checks to be started
tokio::time::sleep(sleep_duration).await;
addr.do_send(actor::messages::ValidateRepo::new(message_token))
}
Err(err) => tracing::warn!("advance next: {err}"),
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
});
}
}
}

View file

@ -0,0 +1,99 @@
//
use actix::prelude::*;
use derive_more::Deref as _;
use tracing::Instrument as _;
use crate as actor;
use git_next_git as git;
impl Handler<actor::messages::ValidateRepo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.deref()))]
fn handle(
&mut self,
msg: actor::messages::ValidateRepo,
ctx: &mut Self::Context,
) -> Self::Result {
println!("handler validate repo - start");
match msg.unwrap() {
message_token if self.message_token < message_token => {
tracing::info!(%message_token, "New message token");
self.message_token = message_token;
}
message_token if self.message_token > message_token => {
tracing::info!("Dropping message from previous generation");
return; // message is expired
}
_ => {
// do nothing
}
}
if self.webhook_id.is_none() {
let forge_alias = self.repo_details.forge.forge_alias();
let repo_alias = &self.repo_details.repo_alias;
let webhook_url = self.webhook.url(forge_alias, repo_alias);
let forge = self.forge.clone();
let addr = ctx.address();
async move {
if let Err(err) =
forge
.register_webhook(&webhook_url)
.await
.and_then(|registered_webhook| {
addr.try_send(actor::messages::WebhookRegistered::from(
registered_webhook,
))
.map_err(|e| {
git::forge::webhook::Error::FailedToNotifySelf(e.to_string())
})
})
{
tracing::warn!("registering webhook: {err}");
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
self.open_repository.iter().for_each(|open_repository| {
if let Some(repo_config) = self.repo_details.repo_config.clone() {
let repo_details = self.repo_details.clone();
let addr = ctx.address();
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
let open_repository = open_repository.duplicate();
async move {
match git::validation::positions::validate_positions(
&*open_repository,
&repo_details,
repo_config,
) {
Ok(git::validation::positions::Positions {
main,
next,
dev,
dev_commit_history,
}) => {
addr.do_send(actor::messages::StartMonitoring::new(
main,
next,
dev,
dev_commit_history,
));
}
Err(err) => {
tracing::warn!("{:?}", err);
tokio::time::sleep(sleep_duration).await;
addr.do_send(actor::messages::ValidateRepo::new(message_token));
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
});
println!("handler validate repo - finish");
}
}

View file

@ -1,13 +1,13 @@
// //
use actix::prelude::*; use actix::prelude::*;
use crate::{RepoActor, ValidateRepo}; use crate as actor;
use git_next_config as config; use git_next_config as config;
use git_next_git as git; use git_next_git as git;
use tracing::{info, warn}; use tracing::{info, warn};
impl Handler<config::WebhookMessage> for RepoActor { impl Handler<config::WebhookMessage> for actor::RepoActor {
type Result = (); type Result = ();
#[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity #[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity
@ -88,6 +88,7 @@ impl Handler<config::WebhookMessage> for RepoActor {
token = %message_token, token = %message_token,
"New commit" "New commit"
); );
ctx.address().do_send(ValidateRepo { message_token }); ctx.address()
.do_send(actor::messages::ValidateRepo::new(message_token));
} }
} }

View file

@ -0,0 +1,17 @@
//
use actix::prelude::*;
use crate as actor;
impl Handler<actor::messages::WebhookRegistered> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(repo = %self.repo_details, webhook_id = %msg.webhook_id()))]
fn handle(
&mut self,
msg: actor::messages::WebhookRegistered,
_ctx: &mut Self::Context,
) -> Self::Result {
self.webhook_id.replace(msg.webhook_id().clone());
self.webhook_auth.replace(msg.webhook_auth().clone());
}
}

View file

@ -1,30 +1,58 @@
mod branch; mod branch;
pub mod handlers;
mod load; mod load;
pub mod status; pub mod messages;
pub mod webhook;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use std::time::Duration;
use actix::prelude::*; use actix::prelude::*;
use config::RegisteredWebhook;
use git::validation::positions::{validate_positions, Positions};
use crate as repo_actor;
use git_next_config as config; use git_next_config as config;
use git_next_forge as forge; use git_next_forge as forge;
use git_next_git as git; use git_next_git as git;
use kxio::network::Network; use kxio::network::Network;
use tracing::{debug, info, warn, Instrument}; use tracing::{info, warn, Instrument};
#[derive(Debug, derive_more::Display)] #[derive(Debug, derive_more::Display)]
#[display("{}:{}:{}", generation, repo_details.forge.forge_alias(), repo_details.repo_alias)] #[display("{}:{}:{}", generation, repo_details.forge.forge_alias(), repo_details.repo_alias)]
/// An actor that represents a Git Repository.
///
/// When this actor is started it is sent the [CloneRepo] message.
///
/// ```mermaid
/// stateDiagram-v2
/// [*] --> CloneRepo :START
///
/// CloneRepo --> LoadConfigFromRepo
/// CloneRepo --> ValidateRepo
///
/// LoadConfigFromRepo --> LoadedConfig
///
/// ValidateRepo --> WebhookRegistered
/// ValidateRepo --> StartMonitoring
/// ValidateRepo --> ValidateRepo :SLEEP 10s
///
/// LoadedConfig --> ValidateRepo
///
/// WebhookRegistered --> [*]
///
/// StartMonitoring --> AdvanceMainTo
/// StartMonitoring --> ValidateRepo :SLEEP 10s
///
/// AdvanceMainTo --> LoadConfigFromRepo
/// AdvanceMainTo --> ValidateRepo
///
/// [*] --> WebhookMessage :WEBHOOK
///
/// WebhookMessage --> ValidateRepo
/// ```
///
pub struct RepoActor { pub struct RepoActor {
sleep_duration: std::time::Duration,
generation: git::Generation, generation: git::Generation,
message_token: MessageToken, message_token: messages::MessageToken,
repo_details: git::RepoDetails, repo_details: git::RepoDetails,
webhook: config::server::Webhook, webhook: config::server::Webhook,
webhook_id: Option<config::WebhookId>, // INFO: if [None] then no webhook is configured webhook_id: Option<config::WebhookId>, // INFO: if [None] then no webhook is configured
@ -32,35 +60,37 @@ pub struct RepoActor {
last_main_commit: Option<git::Commit>, last_main_commit: Option<git::Commit>,
last_next_commit: Option<git::Commit>, last_next_commit: Option<git::Commit>,
last_dev_commit: Option<git::Commit>, last_dev_commit: Option<git::Commit>,
repository: Box<dyn git::repository::RepositoryFactory>, repository_factory: Box<dyn git::repository::RepositoryFactory>,
open_repository: Option<Box<dyn git::repository::OpenRepositoryLike>>, open_repository: Option<Box<dyn git::repository::OpenRepositoryLike>>,
net: Network, net: Network,
forge: forge::Forge, forge: forge::Forge,
} }
impl RepoActor { impl RepoActor {
pub fn new( pub fn new(
details: git::RepoDetails, repo_details: git::RepoDetails,
webhook: config::server::Webhook, webhook: config::server::Webhook,
generation: git::Generation, generation: git::Generation,
net: Network, net: Network,
repository: Box<dyn git::repository::RepositoryFactory>, repository_factory: Box<dyn git::repository::RepositoryFactory>,
sleep_duration: std::time::Duration,
) -> Self { ) -> Self {
let forge = forge::Forge::new(details.clone(), net.clone()); let message_token = messages::MessageToken::default();
debug!(?forge, "new"); let forge = forge::Forge::new(repo_details.clone(), net.clone());
Self { Self {
generation, generation,
message_token: MessageToken::new(), message_token,
repo_details: details, repo_details,
webhook, webhook,
webhook_id: None, webhook_id: None,
webhook_auth: None, webhook_auth: None,
last_main_commit: None, last_main_commit: None,
last_next_commit: None, last_next_commit: None,
last_dev_commit: None, last_dev_commit: None,
repository, repository_factory,
open_repository: None, open_repository: None,
net,
forge, forge,
net,
sleep_duration,
} }
} }
} }
@ -68,9 +98,11 @@ impl Actor for RepoActor {
type Context = Context<Self>; type Context = Context<Self>;
#[tracing::instrument(name = "RepoActor::stopping", skip_all, fields(repo = %self.repo_details))] #[tracing::instrument(name = "RepoActor::stopping", skip_all, fields(repo = %self.repo_details))]
fn stopping(&mut self, ctx: &mut Self::Context) -> Running { fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
eprintln!("stopping");
info!("Checking webhook"); info!("Checking webhook");
match self.webhook_id.take() { match self.webhook_id.take() {
Some(webhook_id) => { Some(webhook_id) => {
eprintln!("stopping - unregistering webhook");
info!(%webhook_id, "Unregistring webhook"); info!(%webhook_id, "Unregistring webhook");
let forge = self.forge.clone(); let forge = self.forge.clone();
async move { async move {
@ -87,260 +119,3 @@ impl Actor for RepoActor {
} }
} }
} }
#[derive(Message)]
#[rtype(result = "()")]
pub struct CloneRepo;
impl Handler<CloneRepo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self.repo_details /*, gitdir = %self.repo_details.gitdir */))]
fn handle(&mut self, _msg: CloneRepo, ctx: &mut Self::Context) -> Self::Result {
let gitdir = self.repo_details.gitdir.clone();
match git::repository::open(&*self.repository, &self.repo_details, gitdir) {
Ok(repository) => {
self.open_repository.replace(repository);
if self.repo_details.repo_config.is_none() {
ctx.address().do_send(LoadConfigFromRepo);
} else {
ctx.address().do_send(ValidateRepo {
message_token: self.message_token,
});
}
}
Err(err) => warn!("Could not open repo: {err}"),
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct LoadConfigFromRepo;
impl Handler<LoadConfigFromRepo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self.repo_details))]
fn handle(&mut self, _msg: LoadConfigFromRepo, ctx: &mut Self::Context) -> Self::Result {
let details = self.repo_details.clone();
let addr = ctx.address();
let Some(open_repository) = &self.open_repository else {
warn!("missing open repository - can't load configuration");
return;
};
let open_repository = open_repository.duplicate();
async move { repo_actor::load::load_file(details, addr, &*open_repository).await }
.in_current_span()
.into_actor(self)
.wait(ctx)
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct LoadedConfig(git_next_config::RepoConfig);
impl Handler<LoadedConfig> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.repo_details, branches = %msg.0))]
fn handle(&mut self, msg: LoadedConfig, ctx: &mut Self::Context) -> Self::Result {
let repo_config = msg.0;
self.repo_details.repo_config.replace(repo_config);
ctx.address().do_send(ValidateRepo {
message_token: self.message_token,
});
}
}
#[derive(derive_more::Constructor, Message)]
#[rtype(result = "()")]
pub struct ValidateRepo {
message_token: MessageToken,
}
impl Handler<ValidateRepo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.message_token))]
fn handle(&mut self, msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result {
match msg.message_token {
message_token if self.message_token < message_token => {
info!(%message_token, "New message token");
self.message_token = msg.message_token;
}
message_token if self.message_token > message_token => {
info!("Dropping message from previous generation");
return; // message is expired
}
_ => {
// do nothing
}
}
if self.webhook_id.is_none() {
let forge_alias = self.repo_details.forge.forge_alias();
let repo_alias = &self.repo_details.repo_alias;
let webhook_url = self.webhook.url(forge_alias, repo_alias);
let forge = self.forge.clone();
let addr = ctx.address();
async move {
if let Err(err) =
forge
.register_webhook(&webhook_url)
.await
.and_then(|registered_webhook| {
addr.try_send(WebhookRegistered::from(registered_webhook))
.map_err(|e| {
git::forge::webhook::Error::FailedToNotifySelf(e.to_string())
})
})
{
warn!("registering webhook: {err}");
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
if let (Some(open_repository), Some(repo_config)) =
(&self.open_repository, self.repo_details.repo_config.clone())
{
let repo_details = self.repo_details.clone();
let addr = ctx.address();
let message_token = self.message_token;
let open_repository = open_repository.duplicate();
async move {
match validate_positions(&*open_repository, &repo_details, repo_config) {
Ok(Positions {
main,
next,
dev,
dev_commit_history,
}) => {
addr.do_send(StartMonitoring::new(main, next, dev, dev_commit_history));
}
Err(err) => {
warn!("{:?}", err);
tokio::time::sleep(Duration::from_secs(10)).await;
addr.do_send(ValidateRepo::new(message_token));
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
}
#[derive(Debug, derive_more::Constructor, Message)]
#[rtype(result = "()")]
pub struct StartMonitoring {
main: git::Commit,
next: git::Commit,
dev: git::Commit,
dev_commit_history: Vec<git::Commit>,
}
impl Handler<StartMonitoring> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all,
fields(token = %self.message_token, repo = %self.repo_details, main = %msg.main, next= %msg.next, dev = %msg.dev))
]
fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
warn!("No config loaded");
return;
};
let next_ahead_of_main = msg.main != msg.next;
let dev_ahead_of_next = msg.next != msg.dev;
info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring");
let addr = ctx.address();
let forge = self.forge.clone();
if next_ahead_of_main {
status::check_next(msg.next, addr, forge, self.message_token)
.in_current_span()
.into_actor(self)
.wait(ctx);
} else if dev_ahead_of_next {
if let Some(open_repository) = &self.open_repository {
let open_repository = open_repository.duplicate();
let repo_details = self.repo_details.clone();
let message_token = self.message_token;
async move {
branch::advance_next(
msg.next,
msg.dev_commit_history,
repo_details,
repo_config,
&*open_repository,
addr,
message_token,
)
.await
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct WebhookRegistered(config::WebhookId, config::WebhookAuth);
impl From<RegisteredWebhook> for WebhookRegistered {
fn from(value: RegisteredWebhook) -> Self {
Self(value.id().clone(), value.auth().clone())
}
}
impl Handler<WebhookRegistered> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(repo = %self.repo_details, webhook_id = %msg.0))]
fn handle(&mut self, msg: WebhookRegistered, _ctx: &mut Self::Context) -> Self::Result {
self.webhook_id.replace(msg.0);
self.webhook_auth.replace(msg.1);
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct AdvanceMainTo(git::Commit);
impl Handler<AdvanceMainTo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(repo = %self.repo_details, commit = %msg.0))]
fn handle(&mut self, msg: AdvanceMainTo, ctx: &mut Self::Context) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
warn!("No config loaded");
return;
};
let Some(open_repository) = &self.open_repository else {
warn!("No repository opened");
return;
};
let repo_details = self.repo_details.clone();
let addr = ctx.address();
let message_token = self.message_token;
let open_repository = open_repository.duplicate();
async move {
branch::advance_main(msg.0, &repo_details, &repo_config, &*open_repository).await;
match repo_config.source() {
git_next_config::RepoConfigSource::Repo => addr.do_send(LoadConfigFromRepo),
git_next_config::RepoConfigSource::Server => {
addr.do_send(ValidateRepo { message_token })
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, PartialOrd, Ord, derive_more::Display)]
pub struct MessageToken(u32);
impl MessageToken {
pub fn new() -> Self {
Self::default()
}
pub const fn next(&self) -> Self {
Self(self.0 + 1)
}
}

View file

@ -1,75 +1,54 @@
use std::path::PathBuf;
// //
use actix::prelude::*; use derive_more::Display;
use std::path::PathBuf;
use tracing::{error, info}; use tracing::info;
use git_next_config as config; use git_next_config as config;
use git_next_git as git; use git_next_git as git;
use super::{LoadedConfig, RepoActor};
/// Loads the [RepoConfig] from the `.git-next.toml` file in the repository /// Loads the [RepoConfig] from the `.git-next.toml` file in the repository
#[tracing::instrument(skip_all, fields(branch = %repo_details.branch))] #[tracing::instrument(skip_all, fields(branch = %repo_details.branch))]
pub async fn load_file( pub async fn config_from_repository(
repo_details: git::RepoDetails, repo_details: git::RepoDetails,
addr: Addr<RepoActor>,
open_repository: &dyn git::repository::OpenRepositoryLike, open_repository: &dyn git::repository::OpenRepositoryLike,
) { ) -> Result<config::RepoConfig> {
info!("Loading .git-next.toml from repo"); info!("Loading .git-next.toml from repo");
let repo_config = match load(&repo_details, open_repository).await { let contents =
Ok(repo_config) => repo_config, open_repository.read_file(&repo_details.branch, &PathBuf::from(".git-next.toml"))?;
Err(err) => {
error!(?err, "Failed to load config");
return;
}
};
info!("Loaded .git-next.toml from repo");
addr.do_send(LoadedConfig(repo_config));
}
async fn load(
details: &git::RepoDetails,
open_repository: &dyn git::repository::OpenRepositoryLike,
) -> Result<config::RepoConfig, Error> {
let contents = open_repository.read_file(&details.branch, &PathBuf::from(".git-next.toml"))?;
let config = config::RepoConfig::parse(&contents)?; let config = config::RepoConfig::parse(&contents)?;
let config = validate(config, open_repository).await?; let branches = open_repository.remote_branches()?;
required_branch(&config.branches().main(), &branches)?;
required_branch(&config.branches().next(), &branches)?;
required_branch(&config.branches().dev(), &branches)?;
Ok(config) Ok(config)
} }
#[derive(Debug, derive_more::From, derive_more::Display)] fn required_branch(
branch_name: &config::BranchName,
branches: &[config::BranchName],
) -> Result<()> {
branches
.iter()
.find(|branch| *branch == branch_name)
.ok_or_else(|| Error::BranchNotFound(branch_name.clone()))?;
Ok(())
}
pub type Result<T> = core::result::Result<T, Error>;
#[derive(Debug, thiserror::Error, Display)]
pub enum Error { pub enum Error {
File(git::file::Error), #[display("file")]
Config(config::server::Error), File(#[from] git::file::Error),
Toml(toml::de::Error),
Branch(git::push::Error), #[display("config")]
Config(#[from] config::server::Error),
#[display("toml")]
Toml(#[from] toml::de::Error),
#[display("push")]
Push(#[from] git::push::Error),
#[display("branch not found: {}", 0)]
BranchNotFound(config::BranchName), BranchNotFound(config::BranchName),
} }
pub async fn validate(
config: config::RepoConfig,
open_repository: &dyn git::repository::OpenRepositoryLike,
) -> Result<config::RepoConfig, Error> {
let branches = open_repository.remote_branches()?;
if !branches
.iter()
.any(|branch| branch == &config.branches().main())
{
return Err(Error::BranchNotFound(config.branches().main()));
}
if !branches
.iter()
.any(|branch| branch == &config.branches().next())
{
return Err(Error::BranchNotFound(config.branches().next()));
}
if !branches
.iter()
.any(|branch| branch == &config.branches().dev())
{
return Err(Error::BranchNotFound(config.branches().dev()));
}
Ok(config)
}

View file

@ -0,0 +1,89 @@
//
use actix::prelude::*;
use config::newtype;
use derive_more::{Constructor, Display};
use git_next_config as config;
use git_next_git as git;
#[macro_export]
macro_rules! message {
($name:ident wraps $value:ty) => {
git_next_config::newtype!($name is a $value);
impl Message for $name {
type Result = ();
}
};
($name:ident) => {
git_next_config::newtype!($name);
impl Message for $name {
type Result = ();
}
};
($name:ident wraps $value:ty => $result:ty) => {
git_next_config::newtype!($name is a $value);
impl Message for $name {
type Result = $result;
}
};
($name:ident => $result:ty) => {
git_next_config::newtype!($name);
impl Message for $name {
type Result = $result;
}
};
}
message!(LoadConfigFromRepo);
message!(CloneRepo);
message!(LoadedConfig wraps config::RepoConfig);
message!(ValidateRepo wraps MessageToken);
#[derive(Clone, Debug, Constructor, Message)]
#[rtype(result = "()")]
pub struct StartMonitoring {
main: git::Commit,
next: git::Commit,
dev: git::Commit,
dev_commit_history: Vec<git::Commit>,
}
impl StartMonitoring {
pub const fn main(&self) -> &git::Commit {
&self.main
}
pub const fn next(&self) -> &git::Commit {
&self.next
}
pub const fn dev(&self) -> &git::Commit {
&self.dev
}
pub fn dev_commit_history(&self) -> &[git::Commit] {
&self.dev_commit_history
}
}
message!(WebhookRegistered wraps (config::WebhookId, config::WebhookAuth));
impl WebhookRegistered {
pub const fn webhook_id(&self) -> &config::WebhookId {
&self.0 .0
}
pub const fn webhook_auth(&self) -> &config::WebhookAuth {
&self.0 .1
}
}
impl From<config::RegisteredWebhook> for WebhookRegistered {
fn from(value: config::RegisteredWebhook) -> Self {
let webhook_id = value.id().clone();
let webhook_auth = value.auth().clone();
Self::from((webhook_id, webhook_auth))
}
}
message!(AdvanceMainTo wraps git::Commit);
newtype!(MessageToken is a u32, Copy, Default, Display);
impl MessageToken {
pub const fn next(&self) -> Self {
Self(self.0 + 1)
}
}

View file

@ -1,33 +0,0 @@
//
use actix::prelude::*;
use git_next_forge as forge;
use git_next_git as git;
use tracing::{info, warn};
use crate::{MessageToken, ValidateRepo};
use super::AdvanceMainTo;
pub async fn check_next(
next: git::Commit,
addr: Addr<super::RepoActor>,
forge: forge::Forge,
message_token: MessageToken,
) {
// get the status - pass, fail, pending (all others map to fail, e.g. error)
let status = forge.commit_status(&next).await;
info!(?status, "Checking next branch");
match status {
git::forge::commit::Status::Pass => {
addr.do_send(AdvanceMainTo(next));
}
git::forge::commit::Status::Pending => {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo { message_token });
}
git::forge::commit::Status::Fail => {
warn!("Checks have failed");
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,8 @@ use git_next_config::{
self as config, ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig, self as config, ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig,
}; };
use git_next_git::{repository::RepositoryFactory, Generation, RepoDetails}; use git_next_git::{repository::RepositoryFactory, Generation, RepoDetails};
use git_next_repo_actor::{CloneRepo, RepoActor}; use git_next_repo_actor::messages::CloneRepo;
use git_next_repo_actor::RepoActor;
use kxio::{fs::FileSystem, network::Network}; use kxio::{fs::FileSystem, network::Network};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
@ -39,6 +40,7 @@ pub struct Server {
fs: FileSystem, fs: FileSystem,
net: Network, net: Network,
repository_factory: Box<dyn RepositoryFactory>, repository_factory: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
} }
impl Actor for Server { impl Actor for Server {
type Context = Context<Self>; type Context = Context<Self>;
@ -114,7 +116,12 @@ impl Handler<ServerConfig> for Server {
} }
} }
impl Server { impl Server {
pub fn new(fs: FileSystem, net: Network, repo: Box<dyn RepositoryFactory>) -> Self { pub fn new(
fs: FileSystem,
net: Network,
repo: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
) -> Self {
let generation = Generation::default(); let generation = Generation::default();
Self { Self {
generation, generation,
@ -122,6 +129,7 @@ impl Server {
fs, fs,
net, net,
repository_factory: repo, repository_factory: repo,
sleep_duration,
} }
} }
fn create_forge_data_directories( fn create_forge_data_directories(
@ -182,6 +190,7 @@ impl Server {
let net = self.net.clone(); let net = self.net.clone();
let repository_factory = self.repository_factory.duplicate(); let repository_factory = self.repository_factory.duplicate();
let generation = self.generation; let generation = self.generation;
let sleep_duration = self.sleep_duration;
move |(repo_alias, server_repo_config)| { move |(repo_alias, server_repo_config)| {
let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config); let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config);
let _guard = span.enter(); let _guard = span.enter();
@ -214,6 +223,7 @@ impl Server {
generation, generation,
net.clone(), net.clone(),
repository_factory.duplicate(), repository_factory.duplicate(),
sleep_duration,
); );
(forge_name.clone(), repo_alias, actor) (forge_name.clone(), repo_alias, actor)
} }

View file

@ -37,11 +37,16 @@ pub fn init(fs: FileSystem) {
} }
} }
pub async fn start(fs: FileSystem, net: Network, repo: Box<dyn RepositoryFactory>) { pub async fn start(
fs: FileSystem,
net: Network,
repo: Box<dyn RepositoryFactory>,
sleep_duration: std::time::Duration,
) {
init_logging(); init_logging();
info!("Starting Server..."); info!("Starting Server...");
let server = Server::new(fs.clone(), net.clone(), repo).start(); let server = Server::new(fs.clone(), net.clone(), repo, sleep_duration).start();
server.do_send(FileUpdated); server.do_send(FileUpdated);
info!("Starting File Watcher..."); info!("Starting File Watcher...");