WIP: repo-actor: ???

This commit is contained in:
Paul Campbell 2024-06-19 07:03:08 +01:00
parent 6afc4f2f68
commit a2dc3a821a
20 changed files with 1638 additions and 424 deletions

View file

@ -42,7 +42,8 @@ async fn main() {
git_next_server::init(fs); git_next_server::init(fs);
} }
Server::Start => { Server::Start => {
git_next_server::start(fs, net, repo).await; let sleep_duration = std::time::Duration::from_secs(10);
git_next_server::start(fs, net, repo, sleep_duration).await;
} }
}, },
} }

View file

@ -42,12 +42,19 @@ ulid = { workspace = true }
# boilerplate # boilerplate
derive_more = { workspace = true } derive_more = { workspace = true }
thiserror = { workspace = true }
# Actors # Actors
actix = { workspace = true } actix = { workspace = true }
actix-rt = { workspace = true } actix-rt = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
[dev-dependencies]
# Testing
assert2 = { workspace = true }
rand = { workspace = true }
pretty_assertions = { workspace = true }
[lints.clippy] [lints.clippy]
nursery = { level = "warn", priority = -1 } nursery = { level = "warn", priority = -1 }
# pedantic = "warn" # pedantic = "warn"

View file

@ -1,78 +1,67 @@
// //
use actix::prelude::*; use crate as actor;
use git_next_config as config; use git_next_config as config;
use git_next_git as git; use git_next_git as git;
use derive_more::Display;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{MessageToken, ValidateRepo};
use std::time::Duration;
// advance next to the next commit towards the head of the dev branch // advance next to the next commit towards the head of the dev branch
#[tracing::instrument(fields(next), skip_all)] #[tracing::instrument(fields(next), skip_all)]
pub async fn advance_next( pub async fn advance_next(
next: git::Commit, next: &git::Commit,
dev_commit_history: Vec<git::Commit>, dev_commit_history: &[git::Commit],
repo_details: git::RepoDetails, repo_details: git::RepoDetails,
repo_config: config::RepoConfig, repo_config: config::RepoConfig,
repository: git::OpenRepository, repository: git::OpenRepository,
addr: Addr<super::RepoActor>, message_token: actor::messages::MessageToken,
message_token: MessageToken, ) -> Result<actor::messages::MessageToken> {
) { let commit =
let next_commit = find_next_commit_on_dev(next, dev_commit_history); find_next_commit_on_dev(next, dev_commit_history).ok_or_else(|| Error::NextAtDev)?;
let Some(commit) = next_commit else { validate_commit_message(commit.message())?;
warn!("No commits to advance next to");
return;
};
if let Some(problem) = validate_commit_message(commit.message()) {
warn!("Can't advance next to commit '{}': {}", commit, problem);
return;
}
info!("Advancing next to commit '{}'", commit); info!("Advancing next to commit '{}'", commit);
if let Err(err) = git::push::reset( git::push::reset(
&repository, &repository,
&repo_details, &repo_details,
&repo_config.branches().next(), &repo_config.branches().next(),
&commit.into(), &commit.into(),
&git::push::Force::No, &git::push::Force::No,
) { )?;
warn!(?err, "Failed") Ok(message_token)
}
tokio::time::sleep(Duration::from_secs(10)).await;
addr.do_send(ValidateRepo { message_token })
} }
#[tracing::instrument] #[tracing::instrument]
fn validate_commit_message(message: &git::commit::Message) -> Option<String> { fn validate_commit_message(message: &git::commit::Message) -> Result<()> {
let message = &message.to_string(); let message = &message.to_string();
if message.to_ascii_lowercase().starts_with("wip") { if message.to_ascii_lowercase().starts_with("wip") {
return Some("Is Work-In-Progress".to_string()); return Err(Error::IsWorkInProgress);
} }
match git_conventional::Commit::parse(message) { match git_conventional::Commit::parse(message) {
Ok(commit) => { Ok(commit) => {
info!(?commit, "Pass"); info!(?commit, "Pass");
None Ok(())
} }
Err(err) => { Err(err) => {
warn!(?err, "Fail"); warn!(?err, "Fail");
Some(err.kind().to_string()) Err(Error::InvalidCommitMessage {
reason: err.kind().to_string(),
})
} }
} }
} }
pub fn find_next_commit_on_dev( pub fn find_next_commit_on_dev(
next: git::Commit, next: &git::Commit,
dev_commit_history: Vec<git::Commit>, dev_commit_history: &[git::Commit],
) -> Option<git::Commit> { ) -> Option<git::Commit> {
let mut next_commit: Option<git::Commit> = None; let mut next_commit: Option<&git::Commit> = None;
for commit in dev_commit_history.into_iter() { for commit in dev_commit_history.iter() {
if commit == next { if commit == next {
break; break;
}; };
next_commit.replace(commit); next_commit.replace(commit);
} }
next_commit next_commit.cloned()
} }
// advance main branch to the commit 'next' // advance main branch to the commit 'next'
@ -82,15 +71,30 @@ pub async fn advance_main(
repo_details: &git::RepoDetails, repo_details: &git::RepoDetails,
repo_config: &config::RepoConfig, repo_config: &config::RepoConfig,
repository: &git::OpenRepository, repository: &git::OpenRepository,
) { ) -> Result<()> {
info!("Advancing main to next"); info!("Advancing main to next");
if let Err(err) = git::push::reset( git::push::reset(
repository, repository,
repo_details, repo_details,
&repo_config.branches().main(), &repo_config.branches().main(),
&next.into(), &next.into(),
&git::push::Force::No, &git::push::Force::No,
) { )?;
warn!(?err, "Failed") Ok(())
}; }
pub type Result<T> = core::result::Result<T, Error>;
#[derive(Debug, thiserror::Error, Display)]
pub enum Error {
#[display("push: {}", 0)]
Push(#[from] git::push::Error),
#[display("no commits to advance next to")]
NextAtDev,
#[display("commit is a Work-in-progress")]
IsWorkInProgress,
#[display("commit message is not in conventional commit format: {reason}")]
InvalidCommitMessage { reason: String },
} }

View file

@ -0,0 +1,52 @@
//
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
impl Handler<actor::messages::AdvanceMainTo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(repo = %self.repo_details, commit = ?msg))]
fn handle(
&mut self,
msg: actor::messages::AdvanceMainTo,
ctx: &mut Self::Context,
) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
tracing::warn!("No config loaded");
return;
};
let Some(repository) = self.open_repository.clone() else {
tracing::warn!("No repository opened");
return;
};
let repo_details = self.repo_details.clone();
let addr = ctx.address();
let message_token = self.message_token;
async move {
match actor::branch::advance_main(
msg.unwrap(),
&repo_details,
&repo_config,
&repository,
)
.await
{
Err(err) => {
tracing::warn!("advance main: {err}");
}
Ok(_) => match repo_config.source() {
git_next_config::RepoConfigSource::Repo => {
addr.do_send(actor::messages::LoadConfigFromRepo)
}
git_next_config::RepoConfigSource::Server => {
addr.do_send(actor::messages::ValidateRepo::new(message_token))
}
},
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}

View file

@ -0,0 +1,37 @@
//
use actix::prelude::*;
use crate as actor;
use git_next_git as git;
impl Handler<actor::messages::CloneRepo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self.repo_details /*, gitdir = %self.repo_details.gitdir */))]
fn handle(
&mut self,
_msg: actor::messages::CloneRepo,
ctx: &mut Self::Context,
) -> Self::Result {
println!("handler clone repo start");
let gitdir = self.repo_details.gitdir.clone();
match git::repository::open(&self.repository, &self.repo_details, gitdir) {
Ok(repository) => {
println!("- open okay");
self.open_repository.replace(repository);
if self.repo_details.repo_config.is_none() {
println!("need to load config from repo");
ctx.address().do_send(actor::messages::LoadConfigFromRepo);
} else {
println!("need to validate repo");
ctx.address()
.do_send(actor::messages::ValidateRepo::new(self.message_token));
}
}
Err(err) => {
println!("err: {err:?}");
tracing::warn!("Could not open repo: {err}")
}
}
println!("handler clone repo finish");
}
}

View file

@ -0,0 +1,31 @@
//
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
impl Handler<actor::messages::LoadConfigFromRepo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self.repo_details))]
fn handle(
&mut self,
_msg: actor::messages::LoadConfigFromRepo,
ctx: &mut Self::Context,
) -> Self::Result {
let details = self.repo_details.clone();
let addr = ctx.address();
let Some(open_repository) = self.open_repository.clone() else {
tracing::warn!("missing open repository - can't load configuration");
return;
};
async move {
match actor::load::config_from_repository(details, open_repository).await {
Ok(repo_config) => addr.do_send(actor::messages::LoadedConfig::new(repo_config)),
Err(err) => tracing::warn!(?err, "Failed to load config"),
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}

View file

@ -0,0 +1,20 @@
//
use actix::prelude::*;
use crate as actor;
impl Handler<actor::messages::LoadedConfig> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.repo_details, branches = ?msg))]
fn handle(
&mut self,
msg: actor::messages::LoadedConfig,
ctx: &mut Self::Context,
) -> Self::Result {
let repo_config = msg.unwrap();
self.repo_details.repo_config.replace(repo_config);
ctx.address()
.do_send(actor::messages::ValidateRepo::new(self.message_token));
}
}

View file

@ -0,0 +1,11 @@
pub mod advance_to_main;
pub mod clone;
pub mod load_config_from_repo;
pub mod loaded_config;
pub mod start_monitoring;
pub mod validate_repo;
pub mod webhook_message;
pub mod webhook_registered;
#[cfg(test)]
pub mod test;

View file

@ -0,0 +1,83 @@
//
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
use git_next_git as git;
impl Handler<actor::messages::StartMonitoring> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all,
fields(token = %self.message_token, repo = %self.repo_details, main = %msg.main(), next = %msg.next(), dev = %msg.dev()))
]
fn handle(
&mut self,
msg: actor::messages::StartMonitoring,
ctx: &mut Self::Context,
) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
tracing::warn!("No config loaded");
return;
};
let next_ahead_of_main = msg.main() != msg.next();
let dev_ahead_of_next = msg.next() != msg.dev();
tracing::info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring");
let addr = ctx.address();
let forge = self.forge.clone();
if next_ahead_of_main {
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
async move {
// get the status - pass, fail, pending (all others map to fail, e.g. error)
let status = forge.commit_status(msg.next()).await;
tracing::info!(?status, "Checking next branch");
match status {
git::forge::commit::Status::Pass => {
addr.do_send(actor::messages::AdvanceMainTo::new(msg.next().clone()));
}
git::forge::commit::Status::Pending => {
tokio::time::sleep(sleep_duration).await;
addr.do_send(actor::messages::ValidateRepo::new(message_token));
}
git::forge::commit::Status::Fail => {
tracing::warn!("Checks have failed");
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
} else if dev_ahead_of_next {
if let Some(repository) = self.open_repository.clone() {
let repo_details = self.repo_details.clone();
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
async move {
match actor::branch::advance_next(
msg.next(),
msg.dev_commit_history(),
repo_details,
repo_config,
repository,
message_token,
)
.await
{
Ok(message_token) => {
// pause to allow any CI checks to be started
tokio::time::sleep(sleep_duration).await;
addr.do_send(actor::messages::ValidateRepo::new(message_token))
}
Err(err) => tracing::warn!("advance next: {err}"),
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
}
}

View file

@ -0,0 +1,18 @@
use crate as repo_actor;
use actix::{Handler, Message};
use git_next_git as git;
use repo_actor::message;
message!(GetRepositoryLog => Vec<String>);
impl Handler<GetRepositoryLog> for repo_actor::RepoActor {
type Result = Vec<String>;
fn handle(&mut self, _msg: GetRepositoryLog, _ctx: &mut Self::Context) -> Self::Result {
match &mut self.open_repository {
Some(git::OpenRepository::Test(tor)) => tor.take_log(),
Some(git::OpenRepository::Mock(mor)) => mor.take_log(),
_ => unimplemented!("Only Test and Mock repositories support this message"),
}
}
}

View file

@ -0,0 +1,98 @@
//
use actix::prelude::*;
use derive_more::Deref as _;
use tracing::Instrument as _;
use crate as actor;
use git_next_git as git;
impl Handler<actor::messages::ValidateRepo> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.deref()))]
fn handle(
&mut self,
msg: actor::messages::ValidateRepo,
ctx: &mut Self::Context,
) -> Self::Result {
println!("handler validate repo - start");
match msg.unwrap() {
message_token if self.message_token < message_token => {
tracing::info!(%message_token, "New message token");
self.message_token = message_token;
}
message_token if self.message_token > message_token => {
tracing::info!("Dropping message from previous generation");
return; // message is expired
}
_ => {
// do nothing
}
}
if self.webhook_id.is_none() {
let forge_alias = self.repo_details.forge.forge_alias();
let repo_alias = &self.repo_details.repo_alias;
let webhook_url = self.webhook.url(forge_alias, repo_alias);
let forge = self.forge.clone();
let addr = ctx.address();
async move {
if let Err(err) =
forge
.register_webhook(&webhook_url)
.await
.and_then(|registered_webhook| {
addr.try_send(actor::messages::WebhookRegistered::from(
registered_webhook,
))
.map_err(|e| {
git::forge::webhook::Error::FailedToNotifySelf(e.to_string())
})
})
{
tracing::warn!("registering webhook: {err}");
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
if let (Some(repository), Some(repo_config)) = (
self.open_repository.clone(),
self.repo_details.repo_config.clone(),
) {
let repo_details = self.repo_details.clone();
let addr = ctx.address();
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
async move {
match git::validation::positions::validate_positions(
&repository,
&repo_details,
repo_config,
) {
Ok(git::validation::positions::Positions {
main,
next,
dev,
dev_commit_history,
}) => {
addr.do_send(actor::messages::StartMonitoring::new(
main,
next,
dev,
dev_commit_history,
));
}
Err(err) => {
tracing::warn!("{:?}", err);
tokio::time::sleep(sleep_duration).await;
addr.do_send(actor::messages::ValidateRepo::new(message_token));
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
println!("handler validate repo - finish");
}
}

View file

@ -1,13 +1,13 @@
// //
use actix::prelude::*; use actix::prelude::*;
use crate::{RepoActor, ValidateRepo}; use crate as actor;
use git_next_config as config; use git_next_config as config;
use git_next_git as git; use git_next_git as git;
use tracing::{info, warn}; use tracing::{info, warn};
impl Handler<config::WebhookMessage> for RepoActor { impl Handler<config::WebhookMessage> for actor::RepoActor {
type Result = (); type Result = ();
#[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity #[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity
@ -88,6 +88,7 @@ impl Handler<config::WebhookMessage> for RepoActor {
token = %message_token, token = %message_token,
"New commit" "New commit"
); );
ctx.address().do_send(ValidateRepo { message_token }); ctx.address()
.do_send(actor::messages::ValidateRepo::new(message_token));
} }
} }

View file

@ -0,0 +1,17 @@
//
use actix::prelude::*;
use crate as actor;
impl Handler<actor::messages::WebhookRegistered> for actor::RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(repo = %self.repo_details, webhook_id = %msg.webhook_id()))]
fn handle(
&mut self,
msg: actor::messages::WebhookRegistered,
_ctx: &mut Self::Context,
) -> Self::Result {
self.webhook_id.replace(msg.webhook_id().clone());
self.webhook_auth.replace(msg.webhook_auth().clone());
}
}

View file

@ -1,30 +1,58 @@
mod branch; mod branch;
pub mod handlers;
mod load; mod load;
pub mod status; pub mod messages;
pub mod webhook;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use std::time::Duration;
use actix::prelude::*; use actix::prelude::*;
use config::RegisteredWebhook;
use git::validation::positions::{validate_positions, Positions};
use crate as repo_actor;
use git_next_config as config; use git_next_config as config;
use git_next_forge as forge; use git_next_forge as forge;
use git_next_git as git; use git_next_git as git;
use kxio::network::Network; use kxio::network::Network;
use tracing::{debug, info, warn, Instrument}; use tracing::{info, warn, Instrument};
#[derive(Debug, derive_more::Display)] #[derive(Debug, derive_more::Display)]
#[display("{}:{}:{}", generation, repo_details.forge.forge_alias(), repo_details.repo_alias)] #[display("{}:{}:{}", generation, repo_details.forge.forge_alias(), repo_details.repo_alias)]
/// An actor that represents a Git Repository.
///
/// When this actor is started it is sent the [CloneRepo] message.
///
/// ```mermaid
/// stateDiagram-v2
/// [*] --> CloneRepo :START
///
/// CloneRepo --> LoadConfigFromRepo
/// CloneRepo --> ValidateRepo
///
/// LoadConfigFromRepo --> LoadedConfig
///
/// ValidateRepo --> WebhookRegistered
/// ValidateRepo --> StartMonitoring
/// ValidateRepo --> ValidateRepo :SLEEP 10s
///
/// LoadedConfig --> ValidateRepo
///
/// WebhookRegistered --> [*]
///
/// StartMonitoring --> AdvanceMainTo
/// StartMonitoring --> ValidateRepo :SLEEP 10s
///
/// AdvanceMainTo --> LoadConfigFromRepo
/// AdvanceMainTo --> ValidateRepo
///
/// [*] --> WebhookMessage :WEBHOOK
///
/// WebhookMessage --> ValidateRepo
/// ```
///
pub struct RepoActor { pub struct RepoActor {
sleep_duration: std::time::Duration,
generation: git::Generation, generation: git::Generation,
message_token: MessageToken, message_token: messages::MessageToken,
repo_details: git::RepoDetails, repo_details: git::RepoDetails,
webhook: config::server::Webhook, webhook: config::server::Webhook,
webhook_id: Option<config::WebhookId>, // INFO: if [None] then no webhook is configured webhook_id: Option<config::WebhookId>, // INFO: if [None] then no webhook is configured
@ -39,28 +67,30 @@ pub struct RepoActor {
} }
impl RepoActor { impl RepoActor {
pub fn new( pub fn new(
details: git::RepoDetails, repo_details: git::RepoDetails,
webhook: config::server::Webhook, webhook: config::server::Webhook,
generation: git::Generation, generation: git::Generation,
net: Network, net: Network,
repo: git::Repository, repository: git::Repository,
sleep_duration: std::time::Duration,
) -> Self { ) -> Self {
let forge = forge::Forge::new(details.clone(), net.clone()); let message_token = messages::MessageToken::default();
debug!(?forge, "new"); let forge = forge::Forge::new(repo_details.clone(), net.clone());
Self { Self {
generation, generation,
message_token: MessageToken::new(), message_token,
repo_details: details, repo_details,
webhook, webhook,
webhook_id: None, webhook_id: None,
webhook_auth: None, webhook_auth: None,
last_main_commit: None, last_main_commit: None,
last_next_commit: None, last_next_commit: None,
last_dev_commit: None, last_dev_commit: None,
repository: repo, repository,
open_repository: None, open_repository: None,
net,
forge, forge,
net,
sleep_duration,
} }
} }
} }
@ -68,9 +98,11 @@ impl Actor for RepoActor {
type Context = Context<Self>; type Context = Context<Self>;
#[tracing::instrument(name = "RepoActor::stopping", skip_all, fields(repo = %self.repo_details))] #[tracing::instrument(name = "RepoActor::stopping", skip_all, fields(repo = %self.repo_details))]
fn stopping(&mut self, ctx: &mut Self::Context) -> Running { fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
eprintln!("stopping");
info!("Checking webhook"); info!("Checking webhook");
match self.webhook_id.take() { match self.webhook_id.take() {
Some(webhook_id) => { Some(webhook_id) => {
eprintln!("stopping - unregistering webhook");
info!(%webhook_id, "Unregistring webhook"); info!(%webhook_id, "Unregistring webhook");
let forge = self.forge.clone(); let forge = self.forge.clone();
async move { async move {
@ -87,252 +119,3 @@ impl Actor for RepoActor {
} }
} }
} }
#[derive(Message)]
#[rtype(result = "()")]
pub struct CloneRepo;
impl Handler<CloneRepo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self.repo_details /*, gitdir = %self.repo_details.gitdir */))]
fn handle(&mut self, _msg: CloneRepo, ctx: &mut Self::Context) -> Self::Result {
let gitdir = self.repo_details.gitdir.clone();
match git::repository::open(&self.repository, &self.repo_details, gitdir) {
Ok(repository) => {
self.open_repository.replace(repository);
if self.repo_details.repo_config.is_none() {
ctx.address().do_send(LoadConfigFromRepo);
} else {
ctx.address().do_send(ValidateRepo {
message_token: self.message_token,
});
}
}
Err(err) => warn!("Could not open repo: {err}"),
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct LoadConfigFromRepo;
impl Handler<LoadConfigFromRepo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self.repo_details))]
fn handle(&mut self, _msg: LoadConfigFromRepo, ctx: &mut Self::Context) -> Self::Result {
let details = self.repo_details.clone();
let addr = ctx.address();
let Some(open_repository) = self.open_repository.clone() else {
warn!("missing open repository - can't load configuration");
return;
};
repo_actor::load::load_file(details, addr, open_repository)
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct LoadedConfig(git_next_config::RepoConfig);
impl Handler<LoadedConfig> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.repo_details, branches = %msg.0))]
fn handle(&mut self, msg: LoadedConfig, ctx: &mut Self::Context) -> Self::Result {
let repo_config = msg.0;
self.repo_details.repo_config.replace(repo_config);
ctx.address().do_send(ValidateRepo {
message_token: self.message_token,
});
}
}
#[derive(derive_more::Constructor, Message)]
#[rtype(result = "()")]
pub struct ValidateRepo {
message_token: MessageToken,
}
impl Handler<ValidateRepo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.message_token))]
fn handle(&mut self, msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result {
match msg.message_token {
message_token if self.message_token < message_token => {
info!(%message_token, "New message token");
self.message_token = msg.message_token;
}
message_token if self.message_token > message_token => {
info!("Dropping message from previous generation");
return; // message is expired
}
_ => {
// do nothing
}
}
if self.webhook_id.is_none() {
let forge_alias = self.repo_details.forge.forge_alias();
let repo_alias = &self.repo_details.repo_alias;
let webhook_url = self.webhook.url(forge_alias, repo_alias);
let forge = self.forge.clone();
let addr = ctx.address();
async move {
if let Err(err) =
forge
.register_webhook(&webhook_url)
.await
.and_then(|registered_webhook| {
addr.try_send(WebhookRegistered::from(registered_webhook))
.map_err(|e| {
git::forge::webhook::Error::FailedToNotifySelf(e.to_string())
})
})
{
warn!("registering webhook: {err}");
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
if let (Some(repository), Some(repo_config)) = (
self.open_repository.clone(),
self.repo_details.repo_config.clone(),
) {
let repo_details = self.repo_details.clone();
let addr = ctx.address();
let message_token = self.message_token;
async move {
match validate_positions(&repository, &repo_details, repo_config) {
Ok(Positions {
main,
next,
dev,
dev_commit_history,
}) => {
addr.do_send(StartMonitoring::new(main, next, dev, dev_commit_history));
}
Err(err) => {
warn!("{:?}", err);
tokio::time::sleep(Duration::from_secs(10)).await;
addr.do_send(ValidateRepo::new(message_token));
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
}
#[derive(Debug, derive_more::Constructor, Message)]
#[rtype(result = "()")]
pub struct StartMonitoring {
main: git::Commit,
next: git::Commit,
dev: git::Commit,
dev_commit_history: Vec<git::Commit>,
}
impl Handler<StartMonitoring> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all,
fields(token = %self.message_token, repo = %self.repo_details, main = %msg.main, next= %msg.next, dev = %msg.dev))
]
fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
warn!("No config loaded");
return;
};
let next_ahead_of_main = msg.main != msg.next;
let dev_ahead_of_next = msg.next != msg.dev;
info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring");
let addr = ctx.address();
let forge = self.forge.clone();
if next_ahead_of_main {
status::check_next(msg.next, addr, forge, self.message_token)
.in_current_span()
.into_actor(self)
.wait(ctx);
} else if dev_ahead_of_next {
if let Some(repository) = self.open_repository.clone() {
branch::advance_next(
msg.next,
msg.dev_commit_history,
self.repo_details.clone(),
repo_config,
repository,
addr,
self.message_token,
)
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct WebhookRegistered(config::WebhookId, config::WebhookAuth);
impl From<RegisteredWebhook> for WebhookRegistered {
fn from(value: RegisteredWebhook) -> Self {
Self(value.id().clone(), value.auth().clone())
}
}
impl Handler<WebhookRegistered> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(repo = %self.repo_details, webhook_id = %msg.0))]
fn handle(&mut self, msg: WebhookRegistered, _ctx: &mut Self::Context) -> Self::Result {
self.webhook_id.replace(msg.0);
self.webhook_auth.replace(msg.1);
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct AdvanceMainTo(git::Commit);
impl Handler<AdvanceMainTo> for RepoActor {
type Result = ();
#[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(repo = %self.repo_details, commit = %msg.0))]
fn handle(&mut self, msg: AdvanceMainTo, ctx: &mut Self::Context) -> Self::Result {
let Some(repo_config) = self.repo_details.repo_config.clone() else {
warn!("No config loaded");
return;
};
let Some(repository) = self.open_repository.clone() else {
warn!("No repository opened");
return;
};
let repo_details = self.repo_details.clone();
let addr = ctx.address();
let message_token = self.message_token;
async move {
branch::advance_main(msg.0, &repo_details, &repo_config, &repository).await;
match repo_config.source() {
git_next_config::RepoConfigSource::Repo => addr.do_send(LoadConfigFromRepo),
git_next_config::RepoConfigSource::Server => {
addr.do_send(ValidateRepo { message_token })
}
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, PartialOrd, Ord, derive_more::Display)]
pub struct MessageToken(u32);
impl MessageToken {
pub fn new() -> Self {
Self::default()
}
pub const fn next(&self) -> Self {
Self(self.0 + 1)
}
}

View file

@ -1,75 +1,54 @@
use std::path::PathBuf;
// //
use actix::prelude::*; use derive_more::Display;
use std::path::PathBuf;
use tracing::{error, info}; use tracing::info;
use git_next_config as config; use git_next_config as config;
use git_next_git as git; use git_next_git as git;
use super::{LoadedConfig, RepoActor};
/// Loads the [RepoConfig] from the `.git-next.toml` file in the repository /// Loads the [RepoConfig] from the `.git-next.toml` file in the repository
#[tracing::instrument(skip_all, fields(branch = %repo_details.branch))] #[tracing::instrument(skip_all, fields(branch = %repo_details.branch))]
pub async fn load_file( pub async fn config_from_repository(
repo_details: git::RepoDetails, repo_details: git::RepoDetails,
addr: Addr<RepoActor>,
open_repository: git::OpenRepository, open_repository: git::OpenRepository,
) { ) -> Result<config::RepoConfig> {
info!("Loading .git-next.toml from repo"); info!("Loading .git-next.toml from repo");
let repo_config = match load(&repo_details, &open_repository).await { let contents =
Ok(repo_config) => repo_config, open_repository.read_file(&repo_details.branch, &PathBuf::from(".git-next.toml"))?;
Err(err) => {
error!(?err, "Failed to load config");
return;
}
};
info!("Loaded .git-next.toml from repo");
addr.do_send(LoadedConfig(repo_config));
}
async fn load(
details: &git::RepoDetails,
open_repository: &git::OpenRepository,
) -> Result<config::RepoConfig, Error> {
let contents = open_repository.read_file(&details.branch, &PathBuf::from(".git-next.toml"))?;
let config = config::RepoConfig::parse(&contents)?; let config = config::RepoConfig::parse(&contents)?;
let config = validate(config, open_repository).await?; let branches = open_repository.remote_branches()?;
required_branch(&config.branches().main(), &branches)?;
required_branch(&config.branches().next(), &branches)?;
required_branch(&config.branches().dev(), &branches)?;
Ok(config) Ok(config)
} }
#[derive(Debug, derive_more::From, derive_more::Display)] fn required_branch(
branch_name: &config::BranchName,
branches: &[config::BranchName],
) -> Result<()> {
branches
.iter()
.find(|branch| *branch == branch_name)
.ok_or_else(|| Error::BranchNotFound(branch_name.clone()))?;
Ok(())
}
pub type Result<T> = core::result::Result<T, Error>;
#[derive(Debug, thiserror::Error, Display)]
pub enum Error { pub enum Error {
File(git::file::Error), #[display("file")]
Config(config::server::Error), File(#[from] git::file::Error),
Toml(toml::de::Error),
Branch(git::push::Error), #[display("config")]
Config(#[from] config::server::Error),
#[display("toml")]
Toml(#[from] toml::de::Error),
#[display("push")]
Push(#[from] git::push::Error),
#[display("branch not found: {}", 0)]
BranchNotFound(config::BranchName), BranchNotFound(config::BranchName),
} }
pub async fn validate(
config: config::RepoConfig,
open_repository: &git::OpenRepository,
) -> Result<config::RepoConfig, Error> {
let branches = open_repository.remote_branches()?;
if !branches
.iter()
.any(|branch| branch == &config.branches().main())
{
return Err(Error::BranchNotFound(config.branches().main()));
}
if !branches
.iter()
.any(|branch| branch == &config.branches().next())
{
return Err(Error::BranchNotFound(config.branches().next()));
}
if !branches
.iter()
.any(|branch| branch == &config.branches().dev())
{
return Err(Error::BranchNotFound(config.branches().dev()));
}
Ok(config)
}

View file

@ -0,0 +1,89 @@
//
use actix::prelude::*;
use config::newtype;
use derive_more::{Constructor, Display};
use git_next_config as config;
use git_next_git as git;
#[macro_export]
macro_rules! message {
($name:ident wraps $value:ty) => {
git_next_config::newtype!($name is a $value);
impl Message for $name {
type Result = ();
}
};
($name:ident) => {
git_next_config::newtype!($name);
impl Message for $name {
type Result = ();
}
};
($name:ident wraps $value:ty => $result:ty) => {
git_next_config::newtype!($name is a $value);
impl Message for $name {
type Result = $result;
}
};
($name:ident => $result:ty) => {
git_next_config::newtype!($name);
impl Message for $name {
type Result = $result;
}
};
}
message!(LoadConfigFromRepo);
message!(CloneRepo);
message!(LoadedConfig wraps config::RepoConfig);
message!(ValidateRepo wraps MessageToken);
#[derive(Debug, Constructor, Message)]
#[rtype(result = "()")]
pub struct StartMonitoring {
main: git::Commit,
next: git::Commit,
dev: git::Commit,
dev_commit_history: Vec<git::Commit>,
}
impl StartMonitoring {
pub const fn main(&self) -> &git::Commit {
&self.main
}
pub const fn next(&self) -> &git::Commit {
&self.next
}
pub const fn dev(&self) -> &git::Commit {
&self.dev
}
pub fn dev_commit_history(&self) -> &[git::Commit] {
&self.dev_commit_history
}
}
message!(WebhookRegistered wraps (config::WebhookId, config::WebhookAuth));
impl WebhookRegistered {
pub const fn webhook_id(&self) -> &config::WebhookId {
&self.0 .0
}
pub const fn webhook_auth(&self) -> &config::WebhookAuth {
&self.0 .1
}
}
impl From<config::RegisteredWebhook> for WebhookRegistered {
fn from(value: config::RegisteredWebhook) -> Self {
let webhook_id = value.id().clone();
let webhook_auth = value.auth().clone();
Self::from((webhook_id, webhook_auth))
}
}
message!(AdvanceMainTo wraps git::Commit);
newtype!(MessageToken is a u32, Copy, Default, Display);
impl MessageToken {
pub const fn next(&self) -> Self {
Self(self.0 + 1)
}
}

View file

@ -1,33 +0,0 @@
//
use actix::prelude::*;
use git_next_forge as forge;
use git_next_git as git;
use tracing::{info, warn};
use crate::{MessageToken, ValidateRepo};
use super::AdvanceMainTo;
pub async fn check_next(
next: git::Commit,
addr: Addr<super::RepoActor>,
forge: forge::Forge,
message_token: MessageToken,
) {
// get the status - pass, fail, pending (all others map to fail, e.g. error)
let status = forge.commit_status(&next).await;
info!(?status, "Checking next branch");
match status {
git::forge::commit::Status::Pass => {
addr.do_send(AdvanceMainTo(next));
}
git::forge::commit::Status::Pending => {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
addr.do_send(ValidateRepo { message_token });
}
git::forge::commit::Status::Fail => {
warn!("Checks have failed");
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,7 @@ use git_next_config::{
self as config, ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig, self as config, ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig,
}; };
use git_next_git::{Generation, RepoDetails, Repository}; use git_next_git::{Generation, RepoDetails, Repository};
use git_next_repo_actor::{CloneRepo, RepoActor}; use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use kxio::{fs::FileSystem, network::Network}; use kxio::{fs::FileSystem, network::Network};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
@ -39,6 +39,7 @@ pub struct Server {
fs: FileSystem, fs: FileSystem,
net: Network, net: Network,
repo: Repository, repo: Repository,
sleep_duration: std::time::Duration,
} }
impl Actor for Server { impl Actor for Server {
type Context = Context<Self>; type Context = Context<Self>;
@ -114,7 +115,12 @@ impl Handler<ServerConfig> for Server {
} }
} }
impl Server { impl Server {
pub fn new(fs: FileSystem, net: Network, repo: Repository) -> Self { pub fn new(
fs: FileSystem,
net: Network,
repo: Repository,
sleep_duration: std::time::Duration,
) -> Self {
let generation = Generation::new(); let generation = Generation::new();
Self { Self {
generation, generation,
@ -122,6 +128,7 @@ impl Server {
fs, fs,
net, net,
repo, repo,
sleep_duration,
} }
} }
fn create_forge_data_directories( fn create_forge_data_directories(
@ -182,6 +189,7 @@ impl Server {
let net = self.net.clone(); let net = self.net.clone();
let repo = self.repo.clone(); let repo = self.repo.clone();
let generation = self.generation; let generation = self.generation;
let sleep_duration = self.sleep_duration.clone();
move |(repo_alias, server_repo_config)| { move |(repo_alias, server_repo_config)| {
let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config); let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config);
let _guard = span.enter(); let _guard = span.enter();
@ -214,6 +222,7 @@ impl Server {
generation, generation,
net.clone(), net.clone(),
repo.clone(), repo.clone(),
sleep_duration,
); );
(forge_name.clone(), repo_alias, actor) (forge_name.clone(), repo_alias, actor)
} }

View file

@ -37,11 +37,16 @@ pub fn init(fs: FileSystem) {
} }
} }
pub async fn start(fs: FileSystem, net: Network, repo: Repository) { pub async fn start(
fs: FileSystem,
net: Network,
repo: Repository,
sleep_duration: std::time::Duration,
) {
init_logging(); init_logging();
info!("Starting Server..."); info!("Starting Server...");
let server = Server::new(fs.clone(), net.clone(), repo).start(); let server = Server::new(fs.clone(), net.clone(), repo, sleep_duration).start();
server.do_send(FileUpdated); server.do_send(FileUpdated);
info!("Starting File Watcher..."); info!("Starting File Watcher...");