WIP: repo-actor: ???
This commit is contained in:
parent
a9b35bdac5
commit
bc338d7703
17 changed files with 1619 additions and 420 deletions
|
@ -42,12 +42,19 @@ ulid = { workspace = true }
|
|||
|
||||
# boilerplate
|
||||
derive_more = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
# Actors
|
||||
actix = { workspace = true }
|
||||
actix-rt = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
# Testing
|
||||
assert2 = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
[lints.clippy]
|
||||
nursery = { level = "warn", priority = -1 }
|
||||
# pedantic = "warn"
|
||||
|
|
|
@ -1,78 +1,67 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
|
||||
use crate as actor;
|
||||
use git_next_config as config;
|
||||
use git_next_git as git;
|
||||
|
||||
use derive_more::Display;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::{MessageToken, ValidateRepo};
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
// advance next to the next commit towards the head of the dev branch
|
||||
#[tracing::instrument(fields(next), skip_all)]
|
||||
pub async fn advance_next(
|
||||
next: git::Commit,
|
||||
dev_commit_history: Vec<git::Commit>,
|
||||
next: &git::Commit,
|
||||
dev_commit_history: &[git::Commit],
|
||||
repo_details: git::RepoDetails,
|
||||
repo_config: config::RepoConfig,
|
||||
repository: git::OpenRepository,
|
||||
addr: Addr<super::RepoActor>,
|
||||
message_token: MessageToken,
|
||||
) {
|
||||
let next_commit = find_next_commit_on_dev(next, dev_commit_history);
|
||||
let Some(commit) = next_commit else {
|
||||
warn!("No commits to advance next to");
|
||||
return;
|
||||
};
|
||||
if let Some(problem) = validate_commit_message(commit.message()) {
|
||||
warn!("Can't advance next to commit '{}': {}", commit, problem);
|
||||
return;
|
||||
}
|
||||
message_token: actor::messages::MessageToken,
|
||||
) -> Result<actor::messages::MessageToken> {
|
||||
let commit =
|
||||
find_next_commit_on_dev(next, dev_commit_history).ok_or_else(|| Error::NextAtDev)?;
|
||||
validate_commit_message(commit.message())?;
|
||||
info!("Advancing next to commit '{}'", commit);
|
||||
if let Err(err) = git::push::reset(
|
||||
git::push::reset(
|
||||
&repository,
|
||||
&repo_details,
|
||||
&repo_config.branches().next(),
|
||||
&commit.into(),
|
||||
&git::push::Force::No,
|
||||
) {
|
||||
warn!(?err, "Failed")
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
addr.do_send(ValidateRepo { message_token })
|
||||
)?;
|
||||
Ok(message_token)
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
fn validate_commit_message(message: &git::commit::Message) -> Option<String> {
|
||||
fn validate_commit_message(message: &git::commit::Message) -> Result<()> {
|
||||
let message = &message.to_string();
|
||||
if message.to_ascii_lowercase().starts_with("wip") {
|
||||
return Some("Is Work-In-Progress".to_string());
|
||||
return Err(Error::IsWorkInProgress);
|
||||
}
|
||||
match git_conventional::Commit::parse(message) {
|
||||
Ok(commit) => {
|
||||
info!(?commit, "Pass");
|
||||
None
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(?err, "Fail");
|
||||
Some(err.kind().to_string())
|
||||
Err(Error::InvalidCommitMessage {
|
||||
reason: err.kind().to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_next_commit_on_dev(
|
||||
next: git::Commit,
|
||||
dev_commit_history: Vec<git::Commit>,
|
||||
next: &git::Commit,
|
||||
dev_commit_history: &[git::Commit],
|
||||
) -> Option<git::Commit> {
|
||||
let mut next_commit: Option<git::Commit> = None;
|
||||
for commit in dev_commit_history.into_iter() {
|
||||
let mut next_commit: Option<&git::Commit> = None;
|
||||
for commit in dev_commit_history.iter() {
|
||||
if commit == next {
|
||||
break;
|
||||
};
|
||||
next_commit.replace(commit);
|
||||
}
|
||||
next_commit
|
||||
next_commit.cloned()
|
||||
}
|
||||
|
||||
// advance main branch to the commit 'next'
|
||||
|
@ -82,15 +71,30 @@ pub async fn advance_main(
|
|||
repo_details: &git::RepoDetails,
|
||||
repo_config: &config::RepoConfig,
|
||||
repository: &git::OpenRepository,
|
||||
) {
|
||||
) -> Result<()> {
|
||||
info!("Advancing main to next");
|
||||
if let Err(err) = git::push::reset(
|
||||
git::push::reset(
|
||||
repository,
|
||||
repo_details,
|
||||
&repo_config.branches().main(),
|
||||
&next.into(),
|
||||
&git::push::Force::No,
|
||||
) {
|
||||
warn!(?err, "Failed")
|
||||
};
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub type Result<T> = core::result::Result<T, Error>;
|
||||
#[derive(Debug, thiserror::Error, Display)]
|
||||
pub enum Error {
|
||||
#[display("push: {}", 0)]
|
||||
Push(#[from] git::push::Error),
|
||||
|
||||
#[display("no commits to advance next to")]
|
||||
NextAtDev,
|
||||
|
||||
#[display("commit is a Work-in-progress")]
|
||||
IsWorkInProgress,
|
||||
|
||||
#[display("commit message is not in conventional commit format: {reason}")]
|
||||
InvalidCommitMessage { reason: String },
|
||||
}
|
||||
|
|
52
crates/repo-actor/src/handlers/advance_to_main.rs
Normal file
52
crates/repo-actor/src/handlers/advance_to_main.rs
Normal file
|
@ -0,0 +1,52 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
use tracing::Instrument as _;
|
||||
|
||||
use crate as actor;
|
||||
|
||||
impl Handler<actor::messages::AdvanceMainTo> for actor::RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(repo = %self.repo_details, commit = ?msg))]
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: actor::messages::AdvanceMainTo,
|
||||
ctx: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
let Some(repo_config) = self.repo_details.repo_config.clone() else {
|
||||
tracing::warn!("No config loaded");
|
||||
return;
|
||||
};
|
||||
let Some(repository) = self.open_repository.clone() else {
|
||||
tracing::warn!("No repository opened");
|
||||
return;
|
||||
};
|
||||
let repo_details = self.repo_details.clone();
|
||||
let addr = ctx.address();
|
||||
let message_token = self.message_token;
|
||||
async move {
|
||||
match actor::branch::advance_main(
|
||||
msg.unwrap(),
|
||||
&repo_details,
|
||||
&repo_config,
|
||||
&repository,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
tracing::warn!("advance main: {err}");
|
||||
}
|
||||
Ok(_) => match repo_config.source() {
|
||||
git_next_config::RepoConfigSource::Repo => {
|
||||
addr.do_send(actor::messages::LoadConfigFromRepo)
|
||||
}
|
||||
git_next_config::RepoConfigSource::Server => {
|
||||
addr.do_send(actor::messages::ValidateRepo::new(message_token))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
37
crates/repo-actor/src/handlers/clone.rs
Normal file
37
crates/repo-actor/src/handlers/clone.rs
Normal file
|
@ -0,0 +1,37 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
|
||||
use crate as actor;
|
||||
use git_next_git as git;
|
||||
|
||||
impl Handler<actor::messages::CloneRepo> for actor::RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self.repo_details /*, gitdir = %self.repo_details.gitdir */))]
|
||||
fn handle(
|
||||
&mut self,
|
||||
_msg: actor::messages::CloneRepo,
|
||||
ctx: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
println!("handler clone repo start");
|
||||
let gitdir = self.repo_details.gitdir.clone();
|
||||
match git::repository::open(&self.repository, &self.repo_details, gitdir) {
|
||||
Ok(repository) => {
|
||||
println!("- open okay");
|
||||
self.open_repository.replace(repository);
|
||||
if self.repo_details.repo_config.is_none() {
|
||||
println!("need to load config from repo");
|
||||
ctx.address().do_send(actor::messages::LoadConfigFromRepo);
|
||||
} else {
|
||||
println!("need to validate repo");
|
||||
ctx.address()
|
||||
.do_send(actor::messages::ValidateRepo::new(self.message_token));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
println!("err: {err:?}");
|
||||
tracing::warn!("Could not open repo: {err}")
|
||||
}
|
||||
}
|
||||
println!("handler clone repo finish");
|
||||
}
|
||||
}
|
31
crates/repo-actor/src/handlers/load_config_from_repo.rs
Normal file
31
crates/repo-actor/src/handlers/load_config_from_repo.rs
Normal file
|
@ -0,0 +1,31 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
use tracing::Instrument as _;
|
||||
|
||||
use crate as actor;
|
||||
|
||||
impl Handler<actor::messages::LoadConfigFromRepo> for actor::RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self.repo_details))]
|
||||
fn handle(
|
||||
&mut self,
|
||||
_msg: actor::messages::LoadConfigFromRepo,
|
||||
ctx: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
let details = self.repo_details.clone();
|
||||
let addr = ctx.address();
|
||||
let Some(open_repository) = self.open_repository.clone() else {
|
||||
tracing::warn!("missing open repository - can't load configuration");
|
||||
return;
|
||||
};
|
||||
async move {
|
||||
match actor::load::config_from_repository(details, open_repository).await {
|
||||
Ok(repo_config) => addr.do_send(actor::messages::LoadedConfig::new(repo_config)),
|
||||
Err(err) => tracing::warn!(?err, "Failed to load config"),
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
20
crates/repo-actor/src/handlers/loaded_config.rs
Normal file
20
crates/repo-actor/src/handlers/loaded_config.rs
Normal file
|
@ -0,0 +1,20 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
|
||||
use crate as actor;
|
||||
|
||||
impl Handler<actor::messages::LoadedConfig> for actor::RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.repo_details, branches = ?msg))]
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: actor::messages::LoadedConfig,
|
||||
ctx: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
let repo_config = msg.unwrap();
|
||||
self.repo_details.repo_config.replace(repo_config);
|
||||
|
||||
ctx.address()
|
||||
.do_send(actor::messages::ValidateRepo::new(self.message_token));
|
||||
}
|
||||
}
|
11
crates/repo-actor/src/handlers/mod.rs
Normal file
11
crates/repo-actor/src/handlers/mod.rs
Normal file
|
@ -0,0 +1,11 @@
|
|||
pub mod advance_to_main;
|
||||
pub mod clone;
|
||||
pub mod load_config_from_repo;
|
||||
pub mod loaded_config;
|
||||
pub mod start_monitoring;
|
||||
pub mod validate_repo;
|
||||
pub mod webhook_message;
|
||||
pub mod webhook_registered;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
83
crates/repo-actor/src/handlers/start_monitoring.rs
Normal file
83
crates/repo-actor/src/handlers/start_monitoring.rs
Normal file
|
@ -0,0 +1,83 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
use tracing::Instrument as _;
|
||||
|
||||
use crate as actor;
|
||||
use git_next_git as git;
|
||||
|
||||
impl Handler<actor::messages::StartMonitoring> for actor::RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all,
|
||||
fields(token = %self.message_token, repo = %self.repo_details, main = %msg.main(), next = %msg.next(), dev = %msg.dev()))
|
||||
]
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: actor::messages::StartMonitoring,
|
||||
ctx: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
let Some(repo_config) = self.repo_details.repo_config.clone() else {
|
||||
tracing::warn!("No config loaded");
|
||||
return;
|
||||
};
|
||||
|
||||
let next_ahead_of_main = msg.main() != msg.next();
|
||||
let dev_ahead_of_next = msg.next() != msg.dev();
|
||||
tracing::info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring");
|
||||
|
||||
let addr = ctx.address();
|
||||
let forge = self.forge.clone();
|
||||
|
||||
if next_ahead_of_main {
|
||||
let message_token = self.message_token;
|
||||
let sleep_duration = self.sleep_duration;
|
||||
async move {
|
||||
// get the status - pass, fail, pending (all others map to fail, e.g. error)
|
||||
let status = forge.commit_status(msg.next()).await;
|
||||
tracing::info!(?status, "Checking next branch");
|
||||
match status {
|
||||
git::forge::commit::Status::Pass => {
|
||||
addr.do_send(actor::messages::AdvanceMainTo::new(msg.next().clone()));
|
||||
}
|
||||
git::forge::commit::Status::Pending => {
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
addr.do_send(actor::messages::ValidateRepo::new(message_token));
|
||||
}
|
||||
git::forge::commit::Status::Fail => {
|
||||
tracing::warn!("Checks have failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
} else if dev_ahead_of_next {
|
||||
if let Some(repository) = self.open_repository.clone() {
|
||||
let repo_details = self.repo_details.clone();
|
||||
let message_token = self.message_token;
|
||||
let sleep_duration = self.sleep_duration;
|
||||
async move {
|
||||
match actor::branch::advance_next(
|
||||
msg.next(),
|
||||
msg.dev_commit_history(),
|
||||
repo_details,
|
||||
repo_config,
|
||||
repository,
|
||||
message_token,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(message_token) => {
|
||||
// pause to allow any CI checks to be started
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
addr.do_send(actor::messages::ValidateRepo::new(message_token))
|
||||
}
|
||||
Err(err) => tracing::warn!("advance next: {err}"),
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
18
crates/repo-actor/src/handlers/test.rs
Normal file
18
crates/repo-actor/src/handlers/test.rs
Normal file
|
@ -0,0 +1,18 @@
|
|||
use crate as repo_actor;
|
||||
use actix::{Handler, Message};
|
||||
use git_next_git as git;
|
||||
use repo_actor::message;
|
||||
|
||||
message!(GetRepositoryLog => Vec<String>);
|
||||
|
||||
impl Handler<GetRepositoryLog> for repo_actor::RepoActor {
|
||||
type Result = Vec<String>;
|
||||
|
||||
fn handle(&mut self, _msg: GetRepositoryLog, _ctx: &mut Self::Context) -> Self::Result {
|
||||
match &mut self.open_repository {
|
||||
Some(git::OpenRepository::Test(tor)) => tor.take_log(),
|
||||
Some(git::OpenRepository::Mock(mor)) => mor.take_log(),
|
||||
_ => unimplemented!("Only Test and Mock repositories support this message"),
|
||||
}
|
||||
}
|
||||
}
|
98
crates/repo-actor/src/handlers/validate_repo.rs
Normal file
98
crates/repo-actor/src/handlers/validate_repo.rs
Normal file
|
@ -0,0 +1,98 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
use derive_more::Deref as _;
|
||||
use tracing::Instrument as _;
|
||||
|
||||
use crate as actor;
|
||||
use git_next_git as git;
|
||||
|
||||
impl Handler<actor::messages::ValidateRepo> for actor::RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.deref()))]
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: actor::messages::ValidateRepo,
|
||||
ctx: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
println!("handler validate repo - start");
|
||||
match msg.unwrap() {
|
||||
message_token if self.message_token < message_token => {
|
||||
tracing::info!(%message_token, "New message token");
|
||||
self.message_token = message_token;
|
||||
}
|
||||
message_token if self.message_token > message_token => {
|
||||
tracing::info!("Dropping message from previous generation");
|
||||
return; // message is expired
|
||||
}
|
||||
_ => {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
if self.webhook_id.is_none() {
|
||||
let forge_alias = self.repo_details.forge.forge_alias();
|
||||
let repo_alias = &self.repo_details.repo_alias;
|
||||
let webhook_url = self.webhook.url(forge_alias, repo_alias);
|
||||
let forge = self.forge.clone();
|
||||
let addr = ctx.address();
|
||||
async move {
|
||||
if let Err(err) =
|
||||
forge
|
||||
.register_webhook(&webhook_url)
|
||||
.await
|
||||
.and_then(|registered_webhook| {
|
||||
addr.try_send(actor::messages::WebhookRegistered::from(
|
||||
registered_webhook,
|
||||
))
|
||||
.map_err(|e| {
|
||||
git::forge::webhook::Error::FailedToNotifySelf(e.to_string())
|
||||
})
|
||||
})
|
||||
{
|
||||
tracing::warn!("registering webhook: {err}");
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
if let (Some(repository), Some(repo_config)) = (
|
||||
self.open_repository.clone(),
|
||||
self.repo_details.repo_config.clone(),
|
||||
) {
|
||||
let repo_details = self.repo_details.clone();
|
||||
let addr = ctx.address();
|
||||
let message_token = self.message_token;
|
||||
let sleep_duration = self.sleep_duration;
|
||||
async move {
|
||||
match git::validation::positions::validate_positions(
|
||||
&repository,
|
||||
&repo_details,
|
||||
repo_config,
|
||||
) {
|
||||
Ok(git::validation::positions::Positions {
|
||||
main,
|
||||
next,
|
||||
dev,
|
||||
dev_commit_history,
|
||||
}) => {
|
||||
addr.do_send(actor::messages::StartMonitoring::new(
|
||||
main,
|
||||
next,
|
||||
dev,
|
||||
dev_commit_history,
|
||||
));
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!("{:?}", err);
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
addr.do_send(actor::messages::ValidateRepo::new(message_token));
|
||||
}
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
println!("handler validate repo - finish");
|
||||
}
|
||||
}
|
|
@ -1,13 +1,13 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
|
||||
use crate::{RepoActor, ValidateRepo};
|
||||
use crate as actor;
|
||||
use git_next_config as config;
|
||||
use git_next_git as git;
|
||||
|
||||
use tracing::{info, warn};
|
||||
|
||||
impl Handler<config::WebhookMessage> for RepoActor {
|
||||
impl Handler<config::WebhookMessage> for actor::RepoActor {
|
||||
type Result = ();
|
||||
|
||||
#[allow(clippy::cognitive_complexity)] // TODO: (#49) reduce complexity
|
||||
|
@ -88,6 +88,7 @@ impl Handler<config::WebhookMessage> for RepoActor {
|
|||
token = %message_token,
|
||||
"New commit"
|
||||
);
|
||||
ctx.address().do_send(ValidateRepo { message_token });
|
||||
ctx.address()
|
||||
.do_send(actor::messages::ValidateRepo::new(message_token));
|
||||
}
|
||||
}
|
17
crates/repo-actor/src/handlers/webhook_registered.rs
Normal file
17
crates/repo-actor/src/handlers/webhook_registered.rs
Normal file
|
@ -0,0 +1,17 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
|
||||
use crate as actor;
|
||||
|
||||
impl Handler<actor::messages::WebhookRegistered> for actor::RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(repo = %self.repo_details, webhook_id = %msg.webhook_id()))]
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: actor::messages::WebhookRegistered,
|
||||
_ctx: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
self.webhook_id.replace(msg.webhook_id().clone());
|
||||
self.webhook_auth.replace(msg.webhook_auth().clone());
|
||||
}
|
||||
}
|
|
@ -1,30 +1,58 @@
|
|||
mod branch;
|
||||
pub mod handlers;
|
||||
mod load;
|
||||
pub mod status;
|
||||
pub mod webhook;
|
||||
pub mod messages;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use actix::prelude::*;
|
||||
use config::RegisteredWebhook;
|
||||
use git::validation::positions::{validate_positions, Positions};
|
||||
|
||||
use crate as repo_actor;
|
||||
use git_next_config as config;
|
||||
use git_next_forge as forge;
|
||||
use git_next_git as git;
|
||||
|
||||
use kxio::network::Network;
|
||||
use tracing::{debug, info, warn, Instrument};
|
||||
use tracing::{info, warn, Instrument};
|
||||
|
||||
#[derive(Debug, derive_more::Display)]
|
||||
#[display("{}:{}:{}", generation, repo_details.forge.forge_alias(), repo_details.repo_alias)]
|
||||
/// An actor that represents a Git Repository.
|
||||
///
|
||||
/// When this actor is started it is sent the [CloneRepo] message.
|
||||
///
|
||||
/// ```mermaid
|
||||
/// stateDiagram-v2
|
||||
/// [*] --> CloneRepo :START
|
||||
///
|
||||
/// CloneRepo --> LoadConfigFromRepo
|
||||
/// CloneRepo --> ValidateRepo
|
||||
///
|
||||
/// LoadConfigFromRepo --> LoadedConfig
|
||||
///
|
||||
/// ValidateRepo --> WebhookRegistered
|
||||
/// ValidateRepo --> StartMonitoring
|
||||
/// ValidateRepo --> ValidateRepo :SLEEP 10s
|
||||
///
|
||||
/// LoadedConfig --> ValidateRepo
|
||||
///
|
||||
/// WebhookRegistered --> [*]
|
||||
///
|
||||
/// StartMonitoring --> AdvanceMainTo
|
||||
/// StartMonitoring --> ValidateRepo :SLEEP 10s
|
||||
///
|
||||
/// AdvanceMainTo --> LoadConfigFromRepo
|
||||
/// AdvanceMainTo --> ValidateRepo
|
||||
///
|
||||
/// [*] --> WebhookMessage :WEBHOOK
|
||||
///
|
||||
/// WebhookMessage --> ValidateRepo
|
||||
/// ```
|
||||
///
|
||||
pub struct RepoActor {
|
||||
sleep_duration: std::time::Duration,
|
||||
generation: git::Generation,
|
||||
message_token: MessageToken,
|
||||
message_token: messages::MessageToken,
|
||||
repo_details: git::RepoDetails,
|
||||
webhook: config::server::Webhook,
|
||||
webhook_id: Option<config::WebhookId>, // INFO: if [None] then no webhook is configured
|
||||
|
@ -39,28 +67,30 @@ pub struct RepoActor {
|
|||
}
|
||||
impl RepoActor {
|
||||
pub fn new(
|
||||
details: git::RepoDetails,
|
||||
repo_details: git::RepoDetails,
|
||||
webhook: config::server::Webhook,
|
||||
generation: git::Generation,
|
||||
net: Network,
|
||||
repo: git::Repository,
|
||||
repository: git::Repository,
|
||||
sleep_duration: std::time::Duration,
|
||||
) -> Self {
|
||||
let forge = forge::Forge::new(details.clone(), net.clone());
|
||||
debug!(?forge, "new");
|
||||
let message_token = messages::MessageToken::default();
|
||||
let forge = forge::Forge::new(repo_details.clone(), net.clone());
|
||||
Self {
|
||||
generation,
|
||||
message_token: MessageToken::new(),
|
||||
repo_details: details,
|
||||
message_token,
|
||||
repo_details,
|
||||
webhook,
|
||||
webhook_id: None,
|
||||
webhook_auth: None,
|
||||
last_main_commit: None,
|
||||
last_next_commit: None,
|
||||
last_dev_commit: None,
|
||||
repository: repo,
|
||||
repository,
|
||||
open_repository: None,
|
||||
net,
|
||||
forge,
|
||||
net,
|
||||
sleep_duration,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -68,9 +98,11 @@ impl Actor for RepoActor {
|
|||
type Context = Context<Self>;
|
||||
#[tracing::instrument(name = "RepoActor::stopping", skip_all, fields(repo = %self.repo_details))]
|
||||
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
|
||||
eprintln!("stopping");
|
||||
info!("Checking webhook");
|
||||
match self.webhook_id.take() {
|
||||
Some(webhook_id) => {
|
||||
eprintln!("stopping - unregistering webhook");
|
||||
info!(%webhook_id, "Unregistring webhook");
|
||||
let forge = self.forge.clone();
|
||||
async move {
|
||||
|
@ -87,252 +119,3 @@ impl Actor for RepoActor {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct CloneRepo;
|
||||
impl Handler<CloneRepo> for RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::CloneRepo", skip_all, fields(repo = %self.repo_details /*, gitdir = %self.repo_details.gitdir */))]
|
||||
fn handle(&mut self, _msg: CloneRepo, ctx: &mut Self::Context) -> Self::Result {
|
||||
let gitdir = self.repo_details.gitdir.clone();
|
||||
match git::repository::open(&self.repository, &self.repo_details, gitdir) {
|
||||
Ok(repository) => {
|
||||
self.open_repository.replace(repository);
|
||||
if self.repo_details.repo_config.is_none() {
|
||||
ctx.address().do_send(LoadConfigFromRepo);
|
||||
} else {
|
||||
ctx.address().do_send(ValidateRepo {
|
||||
message_token: self.message_token,
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(err) => warn!("Could not open repo: {err}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct LoadConfigFromRepo;
|
||||
impl Handler<LoadConfigFromRepo> for RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::LoadConfigFromRepo", skip_all, fields(repo = %self.repo_details))]
|
||||
fn handle(&mut self, _msg: LoadConfigFromRepo, ctx: &mut Self::Context) -> Self::Result {
|
||||
let details = self.repo_details.clone();
|
||||
let addr = ctx.address();
|
||||
let Some(open_repository) = self.open_repository.clone() else {
|
||||
warn!("missing open repository - can't load configuration");
|
||||
return;
|
||||
};
|
||||
repo_actor::load::load_file(details, addr, open_repository)
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
struct LoadedConfig(git_next_config::RepoConfig);
|
||||
impl Handler<LoadedConfig> for RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::LoadedConfig", skip_all, fields(repo = %self.repo_details, branches = %msg.0))]
|
||||
fn handle(&mut self, msg: LoadedConfig, ctx: &mut Self::Context) -> Self::Result {
|
||||
let repo_config = msg.0;
|
||||
self.repo_details.repo_config.replace(repo_config);
|
||||
|
||||
ctx.address().do_send(ValidateRepo {
|
||||
message_token: self.message_token,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(derive_more::Constructor, Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct ValidateRepo {
|
||||
message_token: MessageToken,
|
||||
}
|
||||
impl Handler<ValidateRepo> for RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::ValidateRepo", skip_all, fields(repo = %self.repo_details, token = %msg.message_token))]
|
||||
fn handle(&mut self, msg: ValidateRepo, ctx: &mut Self::Context) -> Self::Result {
|
||||
match msg.message_token {
|
||||
message_token if self.message_token < message_token => {
|
||||
info!(%message_token, "New message token");
|
||||
self.message_token = msg.message_token;
|
||||
}
|
||||
message_token if self.message_token > message_token => {
|
||||
info!("Dropping message from previous generation");
|
||||
return; // message is expired
|
||||
}
|
||||
_ => {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
if self.webhook_id.is_none() {
|
||||
let forge_alias = self.repo_details.forge.forge_alias();
|
||||
let repo_alias = &self.repo_details.repo_alias;
|
||||
let webhook_url = self.webhook.url(forge_alias, repo_alias);
|
||||
let forge = self.forge.clone();
|
||||
let addr = ctx.address();
|
||||
async move {
|
||||
if let Err(err) =
|
||||
forge
|
||||
.register_webhook(&webhook_url)
|
||||
.await
|
||||
.and_then(|registered_webhook| {
|
||||
addr.try_send(WebhookRegistered::from(registered_webhook))
|
||||
.map_err(|e| {
|
||||
git::forge::webhook::Error::FailedToNotifySelf(e.to_string())
|
||||
})
|
||||
})
|
||||
{
|
||||
warn!("registering webhook: {err}");
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
if let (Some(repository), Some(repo_config)) = (
|
||||
self.open_repository.clone(),
|
||||
self.repo_details.repo_config.clone(),
|
||||
) {
|
||||
let repo_details = self.repo_details.clone();
|
||||
let addr = ctx.address();
|
||||
let message_token = self.message_token;
|
||||
async move {
|
||||
match validate_positions(&repository, &repo_details, repo_config) {
|
||||
Ok(Positions {
|
||||
main,
|
||||
next,
|
||||
dev,
|
||||
dev_commit_history,
|
||||
}) => {
|
||||
addr.do_send(StartMonitoring::new(main, next, dev, dev_commit_history));
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("{:?}", err);
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
addr.do_send(ValidateRepo::new(message_token));
|
||||
}
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, derive_more::Constructor, Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct StartMonitoring {
|
||||
main: git::Commit,
|
||||
next: git::Commit,
|
||||
dev: git::Commit,
|
||||
dev_commit_history: Vec<git::Commit>,
|
||||
}
|
||||
impl Handler<StartMonitoring> for RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::StartMonitoring", skip_all,
|
||||
fields(token = %self.message_token, repo = %self.repo_details, main = %msg.main, next= %msg.next, dev = %msg.dev))
|
||||
]
|
||||
fn handle(&mut self, msg: StartMonitoring, ctx: &mut Self::Context) -> Self::Result {
|
||||
let Some(repo_config) = self.repo_details.repo_config.clone() else {
|
||||
warn!("No config loaded");
|
||||
return;
|
||||
};
|
||||
|
||||
let next_ahead_of_main = msg.main != msg.next;
|
||||
let dev_ahead_of_next = msg.next != msg.dev;
|
||||
info!(next_ahead_of_main, dev_ahead_of_next, "StartMonitoring");
|
||||
|
||||
let addr = ctx.address();
|
||||
let forge = self.forge.clone();
|
||||
|
||||
if next_ahead_of_main {
|
||||
status::check_next(msg.next, addr, forge, self.message_token)
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
} else if dev_ahead_of_next {
|
||||
if let Some(repository) = self.open_repository.clone() {
|
||||
branch::advance_next(
|
||||
msg.next,
|
||||
msg.dev_commit_history,
|
||||
self.repo_details.clone(),
|
||||
repo_config,
|
||||
repository,
|
||||
addr,
|
||||
self.message_token,
|
||||
)
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct WebhookRegistered(config::WebhookId, config::WebhookAuth);
|
||||
impl From<RegisteredWebhook> for WebhookRegistered {
|
||||
fn from(value: RegisteredWebhook) -> Self {
|
||||
Self(value.id().clone(), value.auth().clone())
|
||||
}
|
||||
}
|
||||
impl Handler<WebhookRegistered> for RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::WebhookRegistered", skip_all, fields(repo = %self.repo_details, webhook_id = %msg.0))]
|
||||
fn handle(&mut self, msg: WebhookRegistered, _ctx: &mut Self::Context) -> Self::Result {
|
||||
self.webhook_id.replace(msg.0);
|
||||
self.webhook_auth.replace(msg.1);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct AdvanceMainTo(git::Commit);
|
||||
impl Handler<AdvanceMainTo> for RepoActor {
|
||||
type Result = ();
|
||||
#[tracing::instrument(name = "RepoActor::AdvanceMainTo", skip_all, fields(repo = %self.repo_details, commit = %msg.0))]
|
||||
fn handle(&mut self, msg: AdvanceMainTo, ctx: &mut Self::Context) -> Self::Result {
|
||||
let Some(repo_config) = self.repo_details.repo_config.clone() else {
|
||||
warn!("No config loaded");
|
||||
return;
|
||||
};
|
||||
let Some(repository) = self.open_repository.clone() else {
|
||||
warn!("No repository opened");
|
||||
return;
|
||||
};
|
||||
let repo_details = self.repo_details.clone();
|
||||
let addr = ctx.address();
|
||||
let message_token = self.message_token;
|
||||
async move {
|
||||
branch::advance_main(msg.0, &repo_details, &repo_config, &repository).await;
|
||||
match repo_config.source() {
|
||||
git_next_config::RepoConfigSource::Repo => addr.do_send(LoadConfigFromRepo),
|
||||
git_next_config::RepoConfigSource::Server => {
|
||||
addr.do_send(ValidateRepo { message_token })
|
||||
}
|
||||
}
|
||||
}
|
||||
.in_current_span()
|
||||
.into_actor(self)
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, PartialOrd, Ord, derive_more::Display)]
|
||||
pub struct MessageToken(u32);
|
||||
impl MessageToken {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
pub const fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,75 +1,54 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
//
|
||||
use actix::prelude::*;
|
||||
|
||||
use tracing::{error, info};
|
||||
use derive_more::Display;
|
||||
use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
|
||||
use git_next_config as config;
|
||||
use git_next_git as git;
|
||||
|
||||
use super::{LoadedConfig, RepoActor};
|
||||
|
||||
/// Loads the [RepoConfig] from the `.git-next.toml` file in the repository
|
||||
#[tracing::instrument(skip_all, fields(branch = %repo_details.branch))]
|
||||
pub async fn load_file(
|
||||
pub async fn config_from_repository(
|
||||
repo_details: git::RepoDetails,
|
||||
addr: Addr<RepoActor>,
|
||||
open_repository: git::OpenRepository,
|
||||
) {
|
||||
) -> Result<config::RepoConfig> {
|
||||
info!("Loading .git-next.toml from repo");
|
||||
let repo_config = match load(&repo_details, &open_repository).await {
|
||||
Ok(repo_config) => repo_config,
|
||||
Err(err) => {
|
||||
error!(?err, "Failed to load config");
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!("Loaded .git-next.toml from repo");
|
||||
addr.do_send(LoadedConfig(repo_config));
|
||||
}
|
||||
|
||||
async fn load(
|
||||
details: &git::RepoDetails,
|
||||
open_repository: &git::OpenRepository,
|
||||
) -> Result<config::RepoConfig, Error> {
|
||||
let contents = open_repository.read_file(&details.branch, &PathBuf::from(".git-next.toml"))?;
|
||||
let config = config::RepoConfig::load(&contents)?;
|
||||
let config = validate(config, open_repository).await?;
|
||||
let contents =
|
||||
open_repository.read_file(&repo_details.branch, &PathBuf::from(".git-next.toml"))?;
|
||||
let config = config::RepoConfig::parse(&contents)?;
|
||||
let branches = open_repository.remote_branches()?;
|
||||
required_branch(&config.branches().main(), &branches)?;
|
||||
required_branch(&config.branches().next(), &branches)?;
|
||||
required_branch(&config.branches().dev(), &branches)?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
#[derive(Debug, derive_more::From, derive_more::Display)]
|
||||
fn required_branch(
|
||||
branch_name: &config::BranchName,
|
||||
branches: &[config::BranchName],
|
||||
) -> Result<()> {
|
||||
branches
|
||||
.iter()
|
||||
.find(|branch| *branch == branch_name)
|
||||
.ok_or_else(|| Error::BranchNotFound(branch_name.clone()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub type Result<T> = core::result::Result<T, Error>;
|
||||
#[derive(Debug, thiserror::Error, Display)]
|
||||
pub enum Error {
|
||||
File(git::file::Error),
|
||||
Config(config::server::Error),
|
||||
Toml(toml::de::Error),
|
||||
Branch(git::push::Error),
|
||||
#[display("file")]
|
||||
File(#[from] git::file::Error),
|
||||
|
||||
#[display("config")]
|
||||
Config(#[from] config::server::Error),
|
||||
|
||||
#[display("toml")]
|
||||
Toml(#[from] toml::de::Error),
|
||||
|
||||
#[display("push")]
|
||||
Push(#[from] git::push::Error),
|
||||
|
||||
#[display("branch not found: {}", 0)]
|
||||
BranchNotFound(config::BranchName),
|
||||
}
|
||||
|
||||
pub async fn validate(
|
||||
config: config::RepoConfig,
|
||||
open_repository: &git::OpenRepository,
|
||||
) -> Result<config::RepoConfig, Error> {
|
||||
let branches = open_repository.remote_branches()?;
|
||||
if !branches
|
||||
.iter()
|
||||
.any(|branch| branch == &config.branches().main())
|
||||
{
|
||||
return Err(Error::BranchNotFound(config.branches().main()));
|
||||
}
|
||||
if !branches
|
||||
.iter()
|
||||
.any(|branch| branch == &config.branches().next())
|
||||
{
|
||||
return Err(Error::BranchNotFound(config.branches().next()));
|
||||
}
|
||||
if !branches
|
||||
.iter()
|
||||
.any(|branch| branch == &config.branches().dev())
|
||||
{
|
||||
return Err(Error::BranchNotFound(config.branches().dev()));
|
||||
}
|
||||
Ok(config)
|
||||
}
|
||||
|
|
89
crates/repo-actor/src/messages.rs
Normal file
89
crates/repo-actor/src/messages.rs
Normal file
|
@ -0,0 +1,89 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
use config::newtype;
|
||||
use derive_more::{Constructor, Display};
|
||||
|
||||
use git_next_config as config;
|
||||
use git_next_git as git;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! message {
|
||||
($name:ident wraps $value:ty) => {
|
||||
git_next_config::newtype!($name is a $value);
|
||||
impl Message for $name {
|
||||
type Result = ();
|
||||
}
|
||||
};
|
||||
($name:ident) => {
|
||||
git_next_config::newtype!($name);
|
||||
impl Message for $name {
|
||||
type Result = ();
|
||||
}
|
||||
};
|
||||
($name:ident wraps $value:ty => $result:ty) => {
|
||||
git_next_config::newtype!($name is a $value);
|
||||
impl Message for $name {
|
||||
type Result = $result;
|
||||
}
|
||||
};
|
||||
($name:ident => $result:ty) => {
|
||||
git_next_config::newtype!($name);
|
||||
impl Message for $name {
|
||||
type Result = $result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
message!(LoadConfigFromRepo);
|
||||
message!(CloneRepo);
|
||||
message!(LoadedConfig wraps config::RepoConfig);
|
||||
message!(ValidateRepo wraps MessageToken);
|
||||
|
||||
#[derive(Debug, Constructor, Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct StartMonitoring {
|
||||
main: git::Commit,
|
||||
next: git::Commit,
|
||||
dev: git::Commit,
|
||||
dev_commit_history: Vec<git::Commit>,
|
||||
}
|
||||
impl StartMonitoring {
|
||||
pub const fn main(&self) -> &git::Commit {
|
||||
&self.main
|
||||
}
|
||||
pub const fn next(&self) -> &git::Commit {
|
||||
&self.next
|
||||
}
|
||||
pub const fn dev(&self) -> &git::Commit {
|
||||
&self.dev
|
||||
}
|
||||
pub fn dev_commit_history(&self) -> &[git::Commit] {
|
||||
&self.dev_commit_history
|
||||
}
|
||||
}
|
||||
|
||||
message!(WebhookRegistered wraps (config::WebhookId, config::WebhookAuth));
|
||||
impl WebhookRegistered {
|
||||
pub const fn webhook_id(&self) -> &config::WebhookId {
|
||||
&self.0 .0
|
||||
}
|
||||
pub const fn webhook_auth(&self) -> &config::WebhookAuth {
|
||||
&self.0 .1
|
||||
}
|
||||
}
|
||||
impl From<config::RegisteredWebhook> for WebhookRegistered {
|
||||
fn from(value: config::RegisteredWebhook) -> Self {
|
||||
let webhook_id = value.id().clone();
|
||||
let webhook_auth = value.auth().clone();
|
||||
Self::from((webhook_id, webhook_auth))
|
||||
}
|
||||
}
|
||||
|
||||
message!(AdvanceMainTo wraps git::Commit);
|
||||
|
||||
newtype!(MessageToken is a u32, Copy, Default, Display);
|
||||
impl MessageToken {
|
||||
pub const fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
//
|
||||
use actix::prelude::*;
|
||||
|
||||
use git_next_forge as forge;
|
||||
use git_next_git as git;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::{MessageToken, ValidateRepo};
|
||||
|
||||
use super::AdvanceMainTo;
|
||||
|
||||
pub async fn check_next(
|
||||
next: git::Commit,
|
||||
addr: Addr<super::RepoActor>,
|
||||
forge: forge::Forge,
|
||||
message_token: MessageToken,
|
||||
) {
|
||||
// get the status - pass, fail, pending (all others map to fail, e.g. error)
|
||||
let status = forge.commit_status(&next).await;
|
||||
info!(?status, "Checking next branch");
|
||||
match status {
|
||||
git::forge::commit::Status::Pass => {
|
||||
addr.do_send(AdvanceMainTo(next));
|
||||
}
|
||||
git::forge::commit::Status::Pending => {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
|
||||
addr.do_send(ValidateRepo { message_token });
|
||||
}
|
||||
git::forge::commit::Status::Fail => {
|
||||
warn!("Checks have failed");
|
||||
}
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue