Compare commits

...

7 commits

Author SHA1 Message Date
9e12f5eb5d feat: post webhook notifications to user
All checks were successful
Rust / build (push) Successful in 2m56s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/cron/push-next Pipeline was successful
ci/woodpecker/cron/tag-created Pipeline was successful
ci/woodpecker/cron/cron-docker-builder Pipeline was successful
Closes kemitix/git-next#91
2024-07-23 20:40:01 +01:00
288c20c24b feat: dispatch NotifyUser messages to server for user (2/2)
All checks were successful
Rust / build (push) Successful in 2m49s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
2024-07-23 20:39:02 +01:00
4978400ece refactor: use Option<&T> over &Option<T>
All checks were successful
Rust / build (push) Successful in 2m48s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
2024-07-23 20:38:58 +01:00
bcf57bc728 feat: dispatch NotifyUser messages to server for user (1/2)
All checks were successful
Rust / build (push) Successful in 2m45s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
2024-07-23 20:38:54 +01:00
e9877ca9fa feat: support sending messages to the user
All checks were successful
Rust / build (push) Successful in 2m51s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
2024-07-23 20:38:51 +01:00
c86d890c2c feat: enable configuration of a webhook for receiving notifications
All checks were successful
Rust / build (push) Successful in 2m47s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
2024-07-23 20:38:29 +01:00
1690e1bff6 docs: document Notifications to user
All checks were successful
Rust / build (push) Successful in 4m12s
ci/woodpecker/push/cron-docker-builder Pipeline was successful
ci/woodpecker/push/push-next Pipeline was successful
ci/woodpecker/push/tag-created Pipeline was successful
2024-07-23 20:37:08 +01:00
46 changed files with 729 additions and 170 deletions

View file

@ -78,6 +78,8 @@ git-conventional = "0.12"
bytes = "1.6"
ulid = "1.1"
warp = "0.3"
time = "0.3"
standardwebhooks = "1.0"
# boilerplate
derive_more = { version = "1.0.0-beta.6", features = [

View file

@ -63,7 +63,7 @@ cargo install --path crates/cli
- [x] cli
- [x] server
- [ ] notifications - notify user when intervention required (e.g. to rebase)
- [x] notifications - notify user when intervention required (e.g. to rebase)
- [ ] tui overview
- [ ] webui overview
@ -204,6 +204,110 @@ Currently `git-next` can only use a `gitdir` if the forge and repo is the
same one specified as the `origin` remote. Otherwise the behaviour is
untested and undefined.
## Notifications
`git-next` can send a number of notification to the user when intervention is required.
Currently, only WebHooks are supported.
Webhooks are sent using the Standard Webhooks format. That means all POST messages have
the following headers:
- `Webhook-Id`
- `Webhook-Signature`
- `Webhook-Timestamp`
### Events
#### Dev Not Based on Main
This message `type` indicates that the `dev` branch is not based on `main`.
**Action Required**: Rebase the `dev` branch onto the `main` branch.
Sample payload:
```json
{
"data": {
"branches": {
"dev": "dev",
"main": "main"
},
"forge_alias": "jo",
"repo_alias": "kxio"
},
"timestamp": "1721760933",
"type": "branch.dev.not-on-main"
}
```
#### CI Check Failed
This message `type` indicates that the commit on the tip of the `next` branch has failed the
configured CI checks.
**Action Required**: Either update the commit to correct the issue CI raised, or, if the issue
is transient (e.g. a network issue), re-run/re-start the job in your CI.
Sample payload:
```json
{
"data": {
"commit": {
"sha": "98abef1af6825f9770d725a681e5cfc09d7fd4f2",
"message": "feat: add foo to bar template"
},
"forge_alias": "jo",
"repo_alias": "kxio"
},
"timestamp": "1721760933",
"type": "cicheck.failed"
}
```
#### Repo Config Load Failed
This message `type` indicates that `git-next` wasn't able to load the configuration for the
repo from the `git-next.toml` file in the repository.
**Action Required**: Review the `reason` provided.
Sample payload:
```json
{
"data": {
"reason": "File not found: .git-next.toml",
"forge_alias": "jo",
"repo_alias": "kxio"
},
"timestamp": "1721760933",
"type": "config.load.failed"
}
```
#### Webhook Registration Failed
This message `type` indicates that `git-next` wasn't able to register it's webhook with the
forge repository, so will not receive updates when the branches in the repo are updated.
**Action Required**: Review the `reason` provided.
Sample payload:
```json
{
"data": {
"reason": "repo config not loaded",
"forge_alias": "jo",
"repo_alias": "kxio"
},
"timestamp": "1721760933",
"type": "webhook.registration.failed"
}
```
## Behaviour
The branch names are configurable, but we will talk about `main`, `next` and `dev`.

View file

@ -1 +1,3 @@
crate::newtype!(BranchName: String, derive_more::Display, Default: "The name of a Git branch");
use serde::Serialize;
crate::newtype!(BranchName: String, derive_more::Display, Default, Serialize: "The name of a Git branch");

View file

@ -1,4 +1,6 @@
crate::newtype!(ForgeAlias: String, Hash, PartialOrd, Ord, derive_more::Display, Default: "The name of a Forge to connect to");
use serde::Serialize;
crate::newtype!(ForgeAlias: String, Hash, PartialOrd, Ord, derive_more::Display, Default, Serialize: "The name of a Forge to connect to");
impl From<&ForgeAlias> for std::path::PathBuf {
fn from(value: &ForgeAlias) -> Self {
Self::from(&value.0)

View file

@ -1,7 +1,8 @@
use derive_more::Display;
use serde::Serialize;
use crate::newtype;
newtype!(RepoAlias: String, Display, Default, PartialOrd, Ord: r#"The alias of a repo.
newtype!(RepoAlias: String, Display, Default, PartialOrd, Ord, Serialize: r#"The alias of a repo.
This is the alias for the repo within `git-next-server.toml`."#);

View file

@ -8,6 +8,7 @@ use std::{
};
use kxio::fs::FileSystem;
use secrecy::Secret;
use tracing::info;
use crate::{newtype, ForgeAlias, ForgeConfig, RepoAlias};
@ -41,7 +42,8 @@ type Result<T> = core::result::Result<T, Error>;
)]
pub struct ServerConfig {
http: Http,
webhook: Webhook,
webhook: InboundWebhook,
notification: Notification,
storage: ServerStorage,
pub forge: BTreeMap<String, ForgeConfig>,
}
@ -64,7 +66,11 @@ impl ServerConfig {
&self.storage
}
pub const fn webhook(&self) -> &Webhook {
pub const fn notification(&self) -> &Notification {
&self.notification
}
pub const fn inbound_webhook(&self) -> &InboundWebhook {
&self.webhook
}
@ -113,10 +119,10 @@ impl Http {
serde::Deserialize,
derive_more::Constructor,
)]
pub struct Webhook {
pub struct InboundWebhook {
url: String,
}
impl Webhook {
impl InboundWebhook {
pub fn url(&self, forge_alias: &ForgeAlias, repo_alias: &RepoAlias) -> WebhookUrl {
let base_url = &self.url;
WebhookUrl(format!("{base_url}/{forge_alias}/{repo_alias}"))
@ -149,3 +155,99 @@ impl ServerStorage {
self.path.as_path()
}
}
/// Identifier for the type of Notification
#[derive(
Clone,
Default,
Debug,
derive_more::From,
PartialEq,
Eq,
PartialOrd,
Ord,
serde::Deserialize,
Copy,
)]
pub enum NotificationType {
#[default]
None,
Webhook,
}
/// Defines the Webhook Forges should send updates to
/// Must be an address that is accessible from the remote forge
#[derive(
Clone,
Debug,
derive_more::From,
PartialEq,
Eq,
PartialOrd,
Ord,
derive_more::AsRef,
serde::Deserialize,
)]
pub struct Notification {
r#type: NotificationType,
webhook: Option<OutboundWebhook>,
}
impl Notification {
pub const fn none() -> Self {
Self {
r#type: NotificationType::None,
webhook: None,
}
}
pub const fn new_webhook(webhook: OutboundWebhook) -> Self {
Self {
r#type: NotificationType::Webhook,
webhook: Some(webhook),
}
}
pub const fn r#type(&self) -> NotificationType {
self.r#type
}
pub const fn webhook(&self) -> Option<&OutboundWebhook> {
self.webhook.as_ref()
}
pub fn webhook_url(&self) -> Option<String> {
self.webhook.clone().map(|x| x.url)
}
pub fn webhook_secret(&self) -> Option<Secret<String>> {
self.webhook.clone().map(|x| x.secret).map(Secret::new)
}
}
impl Default for Notification {
fn default() -> Self {
Self::none()
}
}
#[derive(
Clone,
Debug,
derive_more::From,
PartialEq,
Eq,
PartialOrd,
Ord,
serde::Deserialize,
derive_more::Constructor,
)]
pub struct OutboundWebhook {
url: String,
secret: String,
}
impl OutboundWebhook {
pub fn url(&self) -> &str {
self.url.as_ref()
}
pub fn secret(&self) -> Secret<String> {
Secret::new(self.secret.clone())
}
}

View file

@ -7,9 +7,9 @@ use std::collections::BTreeMap;
use std::path::PathBuf;
use crate::server::Http;
use crate::server::InboundWebhook;
use crate::server::ServerConfig;
use crate::server::ServerStorage;
use crate::server::Webhook;
use crate::webhook::push::Branch;
mod url;
@ -468,8 +468,14 @@ mod server {
let http = &server_config.http()?;
let http_addr = http.ip();
let http_port = server_config.http()?.port();
let webhook_url = server_config.webhook().base_url();
let webhook_url = server_config.inbound_webhook().base_url();
let storage_path = server_config.storage().path();
let notification = &server_config.notification();
let notification_webhook_url = notification.webhook_url().unwrap_or_default();
let notification_webhook_secret = notification
.webhook_secret()
.map(|secret| secret.expose_secret().clone())
.unwrap_or_default();
let forge_alias = server_config
.forges()
.next()
@ -519,6 +525,10 @@ url = "{webhook_url}"
[storage]
path = {storage_path:?}
[notification]
type = "Webhook"
webhook = {{ url = "{notification_webhook_url}", secret = "{notification_webhook_secret}" }}
[forge.{forge_alias}]
forge_type = "{forge_type}"
hostname = "{forge_hostname}"
@ -687,6 +697,8 @@ mod push {
}
mod given {
use crate::server::{Notification, OutboundWebhook};
use super::*;
use rand::Rng as _;
use std::{
@ -709,7 +721,8 @@ mod given {
pub fn a_server_config() -> ServerConfig {
ServerConfig::new(
an_http(),
a_webhook(),
an_inbound_webhook(),
a_notification_config(),
a_server_storage(),
some_forge_configs(),
)
@ -730,12 +743,18 @@ mod given {
pub fn a_port() -> u16 {
rand::thread_rng().gen()
}
pub fn a_webhook() -> Webhook {
Webhook::new(a_name())
pub fn an_inbound_webhook() -> InboundWebhook {
InboundWebhook::new(a_name())
}
pub fn an_outbound_webhook() -> OutboundWebhook {
OutboundWebhook::new(a_name(), a_name())
}
pub fn a_server_storage() -> ServerStorage {
ServerStorage::new(a_name().into())
}
pub fn a_notification_config() -> Notification {
Notification::new_webhook(an_outbound_webhook())
}
pub fn some_forge_configs() -> BTreeMap<String, ForgeConfig> {
[(a_name(), a_forge_config())].into()
}

View file

@ -703,7 +703,7 @@ mod forgejo {
forge_alias: &config::ForgeAlias,
repo_alias: &config::RepoAlias,
) -> git_next_config::server::WebhookUrl {
config::server::Webhook::new(a_name()).url(forge_alias, repo_alias)
config::server::InboundWebhook::new(a_name()).url(forge_alias, repo_alias)
}
pub fn any_webhook_url() -> git_next_config::server::WebhookUrl {

View file

@ -510,7 +510,7 @@ mod github {
use std::path::PathBuf;
use git_next_config::{server::Webhook, WebhookId};
use git_next_config::{server::InboundWebhook, WebhookId};
use kxio::network::{MockNetwork, StatusCode};
use rand::RngCore;
use serde_json::json;
@ -648,7 +648,7 @@ mod github {
forge_alias: &ForgeAlias,
repo_alias: &RepoAlias,
) -> git_next_config::server::WebhookUrl {
Webhook::new(a_name()).url(forge_alias, repo_alias)
InboundWebhook::new(a_name()).url(forge_alias, repo_alias)
}
pub fn any_webhook_url() -> git_next_config::server::WebhookUrl {
given::a_webhook_url(&given::a_forge_alias(), &given::a_repo_alias())

View file

@ -25,11 +25,13 @@ async-trait = { workspace = true }
# fs/network
kxio = { workspace = true }
# # TOML parsing
# serde = { workspace = true }
# TOML parsing
serde_json = { workspace = true }
# toml = { workspace = true }
# webhooks - user notification
serde = { workspace = true }
# Secrets and Password
secrecy = { workspace = true }

View file

@ -2,6 +2,7 @@ use config::newtype;
use derive_more::Display;
//
use git_next_config as config;
use serde::Serialize;
#[derive(
Clone,
@ -13,6 +14,7 @@ use git_next_config as config;
Ord,
derive_more::Constructor,
derive_more::Display,
Serialize,
)]
#[display("{}", sha)]
pub struct Commit {
@ -37,8 +39,8 @@ impl From<config::webhook::Push> for Commit {
}
}
newtype!(Sha: String, Display, Hash,PartialOrd, Ord: "The unique SHA for a git commit.");
newtype!(Message: String, Hash, PartialOrd, Ord: "The commit message for a git commit.");
newtype!(Sha: String, Display, Hash,PartialOrd, Ord, Serialize: "The unique SHA for a git commit.");
newtype!(Message: String, Display, Hash, PartialOrd, Ord, Serialize: "The commit message for a git commit.");
#[derive(Clone, Debug)]
pub struct Histories {

View file

@ -10,6 +10,7 @@ mod git_remote;
pub mod push;
mod repo_details;
pub mod repository;
mod user_notification;
pub mod validation;
#[cfg(test)]
@ -24,3 +25,4 @@ pub use git_remote::GitRemote;
pub use repo_details::RepoDetails;
pub use repository::OpenRepository;
pub use repository::Repository;
pub use user_notification::UserNotification;

View file

@ -0,0 +1,27 @@
use crate::Commit;
use git_next_config::{BranchName, ForgeAlias, RepoAlias};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum UserNotification {
CICheckFailed {
forge_alias: ForgeAlias,
repo_alias: RepoAlias,
commit: Commit,
},
RepoConfigLoadFailure {
forge_alias: ForgeAlias,
repo_alias: RepoAlias,
reason: String,
},
WebhookRegistration {
forge_alias: ForgeAlias,
repo_alias: RepoAlias,
reason: String,
},
DevNotBasedOnMain {
forge_alias: ForgeAlias,
repo_alias: RepoAlias,
dev_branch: BranchName,
main_branch: BranchName,
},
}

View file

@ -1,5 +1,5 @@
//
use crate as git;
use crate::{self as git, UserNotification};
use git_next_config as config;
pub type Result<T> = core::result::Result<T, Error>;
@ -40,10 +40,14 @@ pub fn validate_positions(
// Validations:
// Dev must be on main branch, else the USER must rebase it
if is_not_based_on(&commit_histories.dev, &main) {
return Err(Error::UserIntervention(format!(
"Branch '{}' not based on '{}'",
dev_branch, main_branch
)));
return Err(Error::UserIntervention(
UserNotification::DevNotBasedOnMain {
forge_alias: repo_details.forge.forge_alias().clone(),
repo_alias: repo_details.repo_alias.clone(),
dev_branch,
main_branch,
},
));
}
// verify that next is on main or at most one commit on top of main, else reset it back to main
if is_not_based_on(
@ -120,8 +124,8 @@ pub enum Error {
#[error("{0} - not retrying")]
NonRetryable(String),
#[error("{0} - user intervention required")]
UserIntervention(String),
#[error("user intervention required")]
UserIntervention(UserNotification),
}
impl From<git::fetch::Error> for Error {
fn from(value: git::fetch::Error) -> Self {

View file

@ -43,6 +43,7 @@ git-conventional = { workspace = true }
# Webhooks
bytes = { workspace = true }
ulid = { workspace = true }
time = { workspace = true }
# boilerplate
derive_more = { workspace = true }

View file

@ -30,21 +30,18 @@ impl Handler<actor::messages::AdvanceMain> for actor::RepoActor {
Err(err) => {
tracing::warn!("advance main: {err}");
}
Ok(_) => {
let log = self.log.clone();
match repo_config.source() {
Ok(_) => match repo_config.source() {
git_next_config::RepoConfigSource::Repo => {
actor::do_send(addr, actor::messages::LoadConfigFromRepo, &log);
actor::do_send(addr, actor::messages::LoadConfigFromRepo, self.log.as_ref());
}
git_next_config::RepoConfigSource::Server => {
actor::do_send(
addr,
actor::messages::ValidateRepo::new(message_token),
&log,
self.log.as_ref(),
);
}
}
}
},
}
}
}

View file

@ -35,7 +35,7 @@ impl Handler<actor::messages::AdvanceNext> for actor::RepoActor {
actor::do_send(
addr,
actor::messages::ValidateRepo::new(message_token),
&self.log,
self.log.as_ref(),
);
}
Err(err) => tracing::warn!("advance next: {err}"),

View file

@ -11,7 +11,7 @@ impl Handler<actor::messages::CheckCIStatus> for actor::RepoActor {
msg: actor::messages::CheckCIStatus,
ctx: &mut Self::Context,
) -> Self::Result {
actor::logger(&self.log, "start: CheckCIStatus");
actor::logger(self.log.as_ref(), "start: CheckCIStatus");
let addr = ctx.address();
let forge = self.forge.duplicate();
let next = msg.unwrap();
@ -24,7 +24,7 @@ impl Handler<actor::messages::CheckCIStatus> for actor::RepoActor {
actor::do_send(
addr,
actor::messages::ReceiveCIStatus::new((next, status)),
&log,
log.as_ref(),
);
}
.in_current_span()

View file

@ -12,29 +12,29 @@ impl Handler<actor::messages::CloneRepo> for actor::RepoActor {
_msg: actor::messages::CloneRepo,
ctx: &mut Self::Context,
) -> Self::Result {
actor::logger(&self.log, "Handler: CloneRepo: start");
actor::logger(self.log.as_ref(), "Handler: CloneRepo: start");
tracing::debug!("Handler: CloneRepo: start");
match git::repository::open(&*self.repository_factory, &self.repo_details) {
Ok(repository) => {
actor::logger(&self.log, "open okay");
actor::logger(self.log.as_ref(), "open okay");
tracing::debug!("open okay");
self.open_repository.replace(repository);
if self.repo_details.repo_config.is_none() {
actor::do_send(
ctx.address(),
actor::messages::LoadConfigFromRepo,
&self.log,
self.log.as_ref(),
);
} else {
actor::do_send(
ctx.address(),
actor::messages::RegisterWebhook::new(),
&self.log,
self.log.as_ref(),
);
}
}
Err(err) => {
actor::logger(&self.log, "open failed");
actor::logger(self.log.as_ref(), "open failed");
tracing::debug!("err: {err:?}");
tracing::warn!("Could not open repo: {err}")
}

View file

@ -1,8 +1,9 @@
//
use actix::prelude::*;
use git_next_git::UserNotification;
use tracing::Instrument as _;
use crate as actor;
use crate::{self as actor};
impl Handler<actor::messages::LoadConfigFromRepo> for actor::RepoActor {
type Result = ();
@ -18,16 +19,27 @@ impl Handler<actor::messages::LoadConfigFromRepo> for actor::RepoActor {
};
let open_repository = open_repository.duplicate();
let repo_details = self.repo_details.clone();
let forge_alias = repo_details.forge.forge_alias().clone();
let repo_alias = repo_details.repo_alias.clone();
let addr = ctx.address();
let notify_user_recipient = self.notify_user_recipient.clone();
let log = self.log.clone();
async move {
match actor::load::config_from_repository(repo_details, &*open_repository).await {
Ok(repo_config) => {
actor::logger(&log, "send: LoadedConfig");
addr.do_send(actor::messages::ReceiveRepoConfig::new(repo_config))
}
Err(err) => tracing::warn!(?err, "Failed to load config"),
// TODO: (#95) should notify user
Ok(repo_config) => actor::do_send(
addr,
actor::messages::ReceiveRepoConfig::new(repo_config),
log.as_ref(),
),
Err(err) => actor::notify_user(
notify_user_recipient.as_ref(),
UserNotification::RepoConfigLoadFailure {
forge_alias,
repo_alias,
reason: err.to_string(),
},
log.as_ref(),
),
}
}
.in_current_span()

View file

@ -1,6 +1,7 @@
//
use crate as actor;
use crate::{self as actor};
use actix::prelude::*;
use git::UserNotification;
use git_next_git as git;
impl Handler<actor::messages::ReceiveCIStatus> for actor::RepoActor {
@ -12,33 +13,47 @@ impl Handler<actor::messages::ReceiveCIStatus> for actor::RepoActor {
ctx: &mut Self::Context,
) -> Self::Result {
let log = self.log.clone();
actor::logger(&log, "start: ReceiveCIStatus");
actor::logger(log.as_ref(), "start: ReceiveCIStatus");
let addr = ctx.address();
let (next, status) = msg.unwrap();
let forge_alias = self.repo_details.forge.forge_alias().clone();
let repo_alias = self.repo_details.repo_alias.clone();
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
tracing::debug!(?status, "");
match status {
git::forge::commit::Status::Pass => {
actor::do_send(addr, actor::messages::AdvanceMain::new(next), &self.log);
actor::do_send(
addr,
actor::messages::AdvanceMain::new(next),
self.log.as_ref(),
);
}
git::forge::commit::Status::Pending => {
std::thread::sleep(sleep_duration);
actor::do_send(
addr,
actor::messages::ValidateRepo::new(message_token),
&self.log,
self.log.as_ref(),
);
}
git::forge::commit::Status::Fail => {
tracing::warn!("Checks have failed");
// TODO: (#95) test: repo with next ahead of main and failing CI should notify user
actor::notify_user(
self.notify_user_recipient.as_ref(),
UserNotification::CICheckFailed {
forge_alias,
repo_alias,
commit: next,
},
log.as_ref(),
);
actor::delay_send(
addr,
sleep_duration,
actor::messages::ValidateRepo::new(message_token),
&self.log,
self.log.as_ref(),
);
}
}

View file

@ -17,7 +17,7 @@ impl Handler<actor::messages::ReceiveRepoConfig> for actor::RepoActor {
actor::do_send(
ctx.address(),
actor::messages::RegisterWebhook::new(),
&self.log,
self.log.as_ref(),
);
}
}

View file

@ -2,19 +2,21 @@
use actix::prelude::*;
use tracing::Instrument as _;
use crate as actor;
use crate::{self as actor};
use actor::{messages::RegisterWebhook, RepoActor};
use git_next_git::UserNotification;
impl Handler<RegisterWebhook> for RepoActor {
type Result = ();
fn handle(&mut self, _msg: RegisterWebhook, ctx: &mut Self::Context) -> Self::Result {
if self.webhook_id.is_none() {
let forge_alias = self.repo_details.forge.forge_alias();
let repo_alias = &self.repo_details.repo_alias;
let webhook_url = self.webhook.url(forge_alias, repo_alias);
let forge_alias = self.repo_details.forge.forge_alias().clone();
let repo_alias = self.repo_details.repo_alias.clone();
let webhook_url = self.webhook.url(&forge_alias, &repo_alias);
let forge = self.forge.duplicate();
let addr = ctx.address();
let notify_user_recipient = self.notify_user_recipient.clone();
let log = self.log.clone();
tracing::debug!("registering webhook");
async move {
@ -24,16 +26,25 @@ impl Handler<RegisterWebhook> for RepoActor {
actor::do_send(
addr,
actor::messages::WebhookRegistered::from(registered_webhook),
&log,
log.as_ref(),
);
}
Err(err) => {
actor::notify_user(
notify_user_recipient.as_ref(),
UserNotification::WebhookRegistration {
forge_alias,
repo_alias,
reason: err.to_string(),
},
log.as_ref(),
);
}
Err(err) => tracing::warn!(?err, "registering webhook"),
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
tracing::debug!("registering webhook done");
}
}
}

View file

@ -14,14 +14,14 @@ impl Handler<actor::messages::ValidateRepo> for actor::RepoActor {
msg: actor::messages::ValidateRepo,
ctx: &mut Self::Context,
) -> Self::Result {
actor::logger(&self.log, "start: ValidateRepo");
actor::logger(self.log.as_ref(), "start: ValidateRepo");
// Message Token - make sure we are only triggered for the latest/current token
match self.token_status(msg.unwrap()) {
TokenStatus::Current => {} // do nothing
TokenStatus::Expired => {
actor::logger(
&self.log,
self.log.as_ref(),
format!("discarded: old message token: {}", self.message_token),
);
return; // message is expired
@ -29,24 +29,27 @@ impl Handler<actor::messages::ValidateRepo> for actor::RepoActor {
TokenStatus::New(message_token) => {
self.message_token = message_token;
actor::logger(
&self.log,
self.log.as_ref(),
format!("new message token: {}", self.message_token),
);
}
}
actor::logger(&self.log, format!("accepted token: {}", self.message_token));
actor::logger(
self.log.as_ref(),
format!("accepted token: {}", self.message_token),
);
// Repository positions
let Some(ref open_repository) = self.open_repository else {
actor::logger(&self.log, "no open repository");
actor::logger(self.log.as_ref(), "no open repository");
return;
};
actor::logger(&self.log, "have open repository");
actor::logger(self.log.as_ref(), "have open repository");
let Some(repo_config) = self.repo_details.repo_config.clone() else {
actor::logger(&self.log, "no repo config");
actor::logger(self.log.as_ref(), "no repo config");
return;
};
actor::logger(&self.log, "have repo config");
actor::logger(self.log.as_ref(), "have repo config");
match git::validation::positions::validate_positions(
&**open_repository,
@ -64,46 +67,50 @@ impl Handler<actor::messages::ValidateRepo> for actor::RepoActor {
actor::do_send(
ctx.address(),
actor::messages::CheckCIStatus::new(next),
&self.log,
self.log.as_ref(),
);
} else if next != dev {
actor::do_send(
ctx.address(),
actor::messages::AdvanceNext::new((next, dev_commit_history)),
&self.log,
self.log.as_ref(),
)
} else {
// do nothing
}
}
Err(git::validation::positions::Error::Retryable(message)) => {
actor::logger(&self.log, message);
actor::logger(self.log.as_ref(), message);
let addr = ctx.address();
let message_token = self.message_token;
let sleep_duration = self.sleep_duration;
let log = self.log.clone();
async move {
tracing::debug!("sleeping before retrying...");
actor::logger(&log, "before sleep");
actor::logger(log.as_ref(), "before sleep");
tokio::time::sleep(sleep_duration).await;
actor::logger(&log, "after sleep");
actor::logger(log.as_ref(), "after sleep");
actor::do_send(
addr,
actor::messages::ValidateRepo::new(message_token),
&log,
log.as_ref(),
);
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
Err(git::validation::positions::Error::NonRetryable(message))
| Err(git::validation::positions::Error::UserIntervention(message)) => {
actor::logger(&self.log, message);
Err(git::validation::positions::Error::UserIntervention(user_notification)) => {
actor::notify_user(
self.notify_user_recipient.as_ref(),
user_notification,
self.log.as_ref(),
)
}
Err(git::validation::positions::Error::NonRetryable(message)) => {
actor::logger(self.log.as_ref(), message);
}
}
tracing::debug!("Handler: ValidateRepo: finish");
}
}

View file

@ -17,23 +17,30 @@ impl Handler<WebhookNotification> for actor::RepoActor {
#[tracing::instrument(name = "RepoActor::WebhookMessage", skip_all, fields(token = %self.message_token, repo = %self.repo_details))]
fn handle(&mut self, msg: WebhookNotification, ctx: &mut Self::Context) -> Self::Result {
let Some(config) = &self.repo_details.repo_config else {
actor::logger(&self.log, "server has no repo config");
actor::logger(self.log.as_ref(), "server has no repo config");
warn!("No repo config");
return;
};
if validate_notification(&msg, &self.webhook_auth, &*self.forge, &self.log).is_err() {
if validate_notification(
&msg,
self.webhook_auth.as_ref(),
&*self.forge,
self.log.as_ref(),
)
.is_err()
{
return;
}
let body = msg.body();
match self.forge.parse_webhook_body(body) {
Err(err) => {
actor::logger(&self.log, "message parse error - not a push");
actor::logger(self.log.as_ref(), "message parse error - not a push");
warn!(?err, "Not a 'push'");
return;
}
Ok(push) => match push.branch(config.branches()) {
None => {
actor::logger(&self.log, "unknown branch");
actor::logger(self.log.as_ref(), "unknown branch");
warn!(
?push,
"Unrecognised branch, we should be filtering to only the ones we want"
@ -45,7 +52,7 @@ impl Handler<WebhookNotification> for actor::RepoActor {
push,
config.branches().main(),
&mut self.last_main_commit,
&self.log,
self.log.as_ref(),
)
.is_err()
{
@ -57,7 +64,7 @@ impl Handler<WebhookNotification> for actor::RepoActor {
push,
config.branches().next(),
&mut self.last_next_commit,
&self.log,
self.log.as_ref(),
)
.is_err()
{
@ -69,7 +76,7 @@ impl Handler<WebhookNotification> for actor::RepoActor {
push,
config.branches().dev(),
&mut self.last_dev_commit,
&self.log,
self.log.as_ref(),
)
.is_err()
{
@ -86,16 +93,16 @@ impl Handler<WebhookNotification> for actor::RepoActor {
actor::do_send(
ctx.address(),
actor::messages::ValidateRepo::new(message_token),
&self.log,
self.log.as_ref(),
);
}
}
fn validate_notification(
msg: &WebhookNotification,
webhook_auth: &Option<WebhookAuth>,
webhook_auth: Option<&WebhookAuth>,
forge: &dyn ForgeLike,
log: &Option<RepoActorLog>,
log: Option<&RepoActorLog>,
) -> Result<(), ()> {
let Some(expected_authorization) = webhook_auth else {
actor::logger(log, "server has no auth token");
@ -122,7 +129,7 @@ fn handle_push(
push: Push,
branch: BranchName,
last_commit: &mut Option<Commit>,
log: &Option<RepoActorLog>,
log: Option<&RepoActorLog>,
) -> Result<(), ()> {
actor::logger(log, "message is for dev branch");
let commit = git::Commit::from(push);

View file

@ -16,7 +16,7 @@ impl Handler<actor::messages::WebhookRegistered> for actor::RepoActor {
actor::do_send(
ctx.address(),
actor::messages::ValidateRepo::new(self.message_token),
&self.log,
self.log.as_ref(),
);
}
}

View file

@ -2,6 +2,7 @@ mod branch;
pub mod handlers;
mod load;
pub mod messages;
mod notifications;
#[cfg(test)]
mod tests;
@ -12,7 +13,8 @@ use actix::prelude::*;
use derive_more::Deref;
use git_next_config as config;
use git_next_git as git;
use git_next_git::{self as git, UserNotification};
use messages::NotifyUser;
use kxio::network::Network;
use tracing::{info, warn, Instrument};
@ -37,7 +39,7 @@ pub struct RepoActor {
generation: git::Generation,
message_token: messages::MessageToken,
repo_details: git::RepoDetails,
webhook: config::server::Webhook,
webhook: config::server::InboundWebhook,
webhook_id: Option<config::WebhookId>, // INFO: if [None] then no webhook is configured
webhook_auth: Option<config::WebhookAuth>, // INFO: if [None] then no webhook is configured
last_main_commit: Option<git::Commit>,
@ -48,16 +50,19 @@ pub struct RepoActor {
net: Network,
forge: Box<dyn git::ForgeLike>,
log: Option<RepoActorLog>,
notify_user_recipient: Option<Recipient<NotifyUser>>,
}
impl RepoActor {
#[allow(clippy::too_many_arguments)]
pub fn new(
repo_details: git::RepoDetails,
forge: Box<dyn git::ForgeLike>,
webhook: config::server::Webhook,
webhook: config::server::InboundWebhook,
generation: git::Generation,
net: Network,
repository_factory: Box<dyn git::repository::RepositoryFactory>,
sleep_duration: std::time::Duration,
notify_user_recipient: Option<Recipient<NotifyUser>>,
) -> Self {
let message_token = messages::MessageToken::default();
Self {
@ -76,6 +81,7 @@ impl RepoActor {
net,
sleep_duration,
log: None,
notify_user_recipient,
}
}
}
@ -105,12 +111,8 @@ impl Actor for RepoActor {
}
}
pub fn delay_send<M>(
addr: Addr<RepoActor>,
delay: Duration,
msg: M,
log: &Option<crate::RepoActorLog>,
) where
pub fn delay_send<M>(addr: Addr<RepoActor>, delay: Duration, msg: M, log: Option<&RepoActorLog>)
where
M: actix::Message + Send + 'static + std::fmt::Debug,
RepoActor: actix::Handler<M>,
<M as actix::Message>::Result: Send,
@ -122,7 +124,7 @@ pub fn delay_send<M>(
do_send(addr, msg, log)
}
pub fn do_send<M>(_addr: Addr<RepoActor>, msg: M, log: &Option<crate::RepoActorLog>)
pub fn do_send<M>(_addr: Addr<RepoActor>, msg: M, log: Option<&RepoActorLog>)
where
M: actix::Message + Send + 'static + std::fmt::Debug,
RepoActor: actix::Handler<M>,
@ -135,10 +137,23 @@ where
_addr.do_send(msg)
}
pub fn logger(log: &Option<crate::RepoActorLog>, message: impl Into<String>) {
pub fn logger(log: Option<&RepoActorLog>, message: impl Into<String>) {
if let Some(log) = log {
let message: String = message.into();
tracing::debug!(message);
let _ = log.write().map(|mut l| l.push(message));
}
}
pub fn notify_user(
recipient: Option<&Recipient<NotifyUser>>,
user_notification: UserNotification,
log: Option<&RepoActorLog>,
) {
let msg = NotifyUser::from(user_notification);
let log_message = format!("send: {:?}", msg);
tracing::debug!(log_message);
logger(log, log_message);
if let Some(recipient) = &recipient {
recipient.do_send(msg);
}
}

View file

@ -2,6 +2,7 @@
use config::newtype;
use derive_more::Display;
use git::UserNotification;
use git_next_actor_macros::message;
use git_next_config as config;
use git_next_git as git;
@ -63,3 +64,4 @@ Contains a tuple of the commit that was checked (the tip of the `next` branch) a
message!(AdvanceNext: (git::Commit, Vec<git::Commit>): "Request to advance the `next` branch on to the next commit on the `dev branch."); // next commit and the dev commit history
message!(AdvanceMain: git::Commit: "Request to advance the `main` branch on to same commit as the `next` branch."); // next commit
message!(WebhookNotification: config::webhook::forge_notification::ForgeNotification: "Notification of a webhook message from the forge.");
message!(NotifyUser: UserNotification: "Request to send the message payload to the notification webhook");

View file

@ -0,0 +1,72 @@
use derive_more::Deref as _;
use git_next_git::UserNotification;
use serde_json::json;
use crate::messages::NotifyUser;
impl NotifyUser {
pub fn as_json(self, timestamp: time::OffsetDateTime) -> serde_json::Value {
let timestamp = timestamp.unix_timestamp().to_string();
match self.deref() {
UserNotification::CICheckFailed {
forge_alias,
repo_alias,
commit,
} => json!({
"type": "cicheck.failed",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"commit": {
"sha": commit.sha(),
"message": commit.message()
}
}
}),
UserNotification::RepoConfigLoadFailure {
forge_alias,
repo_alias,
reason,
} => json!({
"type": "config.load.failed",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"reason": reason
}
}),
UserNotification::WebhookRegistration {
forge_alias,
repo_alias,
reason,
} => json!({
"type": "webhook.registration.failed",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"reason": reason
}
}),
UserNotification::DevNotBasedOnMain {
forge_alias,
repo_alias,
dev_branch,
main_branch,
} => json!({
"type": "branch.dev.not-on-main",
"timestamp": timestamp,
"data": {
"forge_alias": forge_alias,
"repo_alias": repo_alias,
"branches": {
"dev": dev_branch,
"main": main_branch
}
}
}),
}
}
}

View file

@ -57,7 +57,7 @@ pub fn a_webhook_url(
forge_alias: &ForgeAlias,
repo_alias: &RepoAlias,
) -> git_next_config::server::WebhookUrl {
config::server::Webhook::new(a_name()).url(forge_alias, repo_alias)
config::server::InboundWebhook::new(a_name()).url(forge_alias, repo_alias)
}
pub fn a_name() -> String {
@ -179,8 +179,8 @@ pub fn a_message_token() -> MessageToken {
MessageToken::default()
}
pub fn a_webhook(url: &WebhookUrl) -> Webhook {
Webhook::new(url.clone().into())
pub fn a_webhook(url: &WebhookUrl) -> InboundWebhook {
InboundWebhook::new(url.clone().into())
}
pub fn a_forge() -> Box<MockForgeLike> {
@ -209,6 +209,7 @@ pub fn a_repo_actor(
net,
repository_factory,
std::time::Duration::from_nanos(1),
None,
)
.with_log(actors_log),
log,

View file

@ -45,16 +45,12 @@ async fn when_read_file_ok_should_send_config_loaded() -> TestResult {
//then
tracing::debug!(?log, "");
log.read().map_err(|e| e.to_string()).map(|l| {
assert!(l
.iter()
.any(|message| message.contains("send: LoadedConfig")))
})?;
log.require_message_containing("send: ReceiveRepoConfig")?;
log.no_message_contains("send: NotifyUsers")?;
Ok(())
}
#[actix::test]
#[ignore] //TODO: (#95) should notify user
async fn when_read_file_err_should_notify_user() -> TestResult {
//given
let fs = given::a_filesystem();
@ -80,13 +76,7 @@ async fn when_read_file_err_should_notify_user() -> TestResult {
//then
tracing::debug!(?log, "");
log.read().map_err(|e| e.to_string()).map(|l| {
assert!(l.iter().any(|message| message.contains("send: NotifyUser")));
assert!(
!l.iter()
.any(|message| message.contains("send: LoadedConfig")),
"not send LoadedConfig"
);
})?;
log.require_message_containing("send: NotifyUser")?;
log.no_message_contains("send: ReceiveRepoConfig")?;
Ok(())
}

View file

@ -88,7 +88,6 @@ async fn when_fail_should_recheck_after_delay() -> TestResult {
}
#[test_log::test(actix::test)]
#[ignore] //TODO: (#95) should notify user
async fn when_fail_should_notify_user() -> TestResult {
//given
let fs = given::a_filesystem();

View file

@ -36,7 +36,6 @@ async fn when_registered_ok_should_send_webhook_registered() -> TestResult {
}
#[actix::test]
#[ignore] //TODO: (#95) should notify user
async fn when_registered_error_should_send_notify_user() -> TestResult {
//given
let fs = given::a_filesystem();

View file

@ -279,7 +279,52 @@ async fn repo_with_dev_ahead_of_next_should_advance_next() -> TestResult {
Ok(())
}
// TODO: (#95) test: repo with dev not a child of main should notify user
#[test_log::test(actix::test)]
async fn repo_with_dev_not_ahead_of_main_should_notify_user() -> TestResult {
//given
let fs = given::a_filesystem();
let (mut open_repository, repo_details) = given::an_open_repository(&fs);
#[allow(clippy::unwrap_used)]
let repo_config = repo_details.repo_config.clone().unwrap();
// Validate repo branches
expect::fetch_ok(&mut open_repository);
let branches = repo_config.branches();
// commit_log main
let main_commit = expect::main_commit_log(&mut open_repository, branches.main());
// next - on main
let next_commit = main_commit.clone();
let next_branch_log = vec![next_commit.clone(), given::a_commit()];
// dev - not ahead of next
let dev_commit = given::a_named_commit("dev");
let dev_branch_log = vec![dev_commit];
// commit_log next
open_repository
.expect_commit_log()
.times(1)
.with(eq(branches.next()), eq([main_commit.clone()]))
.return_once(move |_, _| Ok(next_branch_log));
// commit_log dev
let dev_commit_log = dev_branch_log.clone();
open_repository
.expect_commit_log()
.times(1)
.with(eq(branches.dev()), eq([main_commit]))
.return_once(move |_, _| Ok(dev_commit_log));
//when
let (addr, log) = when::start_actor_with_open_repository(
Box::new(open_repository),
repo_details,
given::a_forge(),
);
addr.send(crate::messages::ValidateRepo::new(MessageToken::default()))
.await?;
System::current().stop();
//then
log.require_message_containing("send: NotifyUser")?;
Ok(())
}
#[test_log::test(actix::test)]
async fn should_accept_message_with_current_token() -> TestResult {
@ -382,7 +427,7 @@ async fn should_send_validate_repo_when_retryable_error() -> TestResult {
}
#[test_log::test(actix::test)]
async fn should_send_nothing_when_non_retryable_error() -> TestResult {
async fn should_send_notify_user_when_non_retryable_error() -> TestResult {
//given
let fs = given::a_filesystem();
let (mut open_repository, repo_details) = given::an_open_repository(&fs);
@ -424,6 +469,6 @@ async fn should_send_nothing_when_non_retryable_error() -> TestResult {
//then
log.require_message_containing("accepted token")?;
log.no_message_contains("send:")?;
log.require_message_containing("send: NotifyUser")?;
Ok(())
}

View file

@ -7,7 +7,7 @@ use actor::{
};
use assert2::let_assert;
use config::{
server::{Webhook, WebhookUrl},
server::{InboundWebhook, WebhookUrl},
webhook::forge_notification::Body,
BranchName, ForgeAlias, ForgeConfig, ForgeNotification, ForgeType, GitDir, RegisteredWebhook,
RepoAlias, RepoBranches, RepoConfig, ServerRepoConfig, WebhookAuth, WebhookId,
@ -82,7 +82,7 @@ pub struct RepoActorView {
pub generation: git::Generation,
pub message_token: MessageToken,
pub repo_details: git::RepoDetails,
pub webhook: config::server::Webhook,
pub webhook: config::server::InboundWebhook,
pub webhook_id: Option<config::WebhookId>, // INFO: if [None] then no webhook is configured
pub webhook_auth: Option<config::WebhookAuth>, // INFO: if [None] then no webhook is configured
pub last_main_commit: Option<git::Commit>,

View file

@ -28,6 +28,14 @@ derive-with = { workspace = true }
# Actors
actix = { workspace = true }
# Webhooks
serde = { workspace = true }
serde_json = { workspace = true }
ulid = { workspace = true }
time = { workspace = true }
secrecy = { workspace = true }
standardwebhooks = { workspace = true }
[dev-dependencies]
# Testing
# assert2 = { workspace = true }

View file

@ -3,9 +3,9 @@ use actix::prelude::*;
use git_next_config::server::ServerConfig;
use git_next_file_watcher_actor::FileUpdated;
use crate::{messages::ReceiveServerConfig, Server};
use crate::{messages::ReceiveServerConfig, ServerActor};
impl Handler<FileUpdated> for Server {
impl Handler<FileUpdated> for ServerActor {
type Result = ();
fn handle(&mut self, _msg: FileUpdated, ctx: &mut Self::Context) -> Self::Result {

View file

@ -1,4 +1,5 @@
mod file_updated;
mod notify_user;
mod receive_server_config;
mod receive_valid_server_config;
mod shutdown;

View file

@ -0,0 +1,81 @@
//
use actix::prelude::*;
use git_next_config::server::{Notification, NotificationType};
use git_next_repo_actor::messages::NotifyUser;
use secrecy::ExposeSecret;
use standardwebhooks::Webhook;
use tracing::Instrument;
use crate::ServerActor;
impl Handler<NotifyUser> for ServerActor {
type Result = ();
fn handle(&mut self, msg: NotifyUser, ctx: &mut Self::Context) -> Self::Result {
let Some(server_config) = &self.server_config else {
return;
};
let notification_config = server_config.notification().clone();
let net = self.net.clone();
async move {
match notification_config.r#type() {
NotificationType::None => { /* do nothing */ }
NotificationType::Webhook => send_webhook(msg, notification_config, net).await,
}
}
.in_current_span()
.into_actor(self)
.wait(ctx);
}
}
async fn send_webhook(
msg: NotifyUser,
notification_config: Notification,
net: kxio::network::Network,
) {
let Some(webhook_config) = notification_config.webhook() else {
tracing::warn!("Invalid notification configuration (config) - can't sent notification");
return;
};
let Ok(webhook) = Webhook::new(webhook_config.secret().expose_secret()) else {
tracing::warn!("Invalid notification configuration (signer) - can't sent notification");
return;
};
do_send_webhook(msg, webhook, webhook_config, net).await;
}
async fn do_send_webhook(
msg: NotifyUser,
webhook: Webhook,
webhook_config: &git_next_config::server::OutboundWebhook,
net: kxio::network::Network,
) {
let message_id = format!("msg_{}", ulid::Ulid::new());
let timestamp = time::OffsetDateTime::now_utc();
let payload = msg.as_json(timestamp);
let timestamp = timestamp.unix_timestamp();
let to_sign = format!("{message_id}.{timestamp}.{payload}");
tracing::info!(?to_sign, "");
#[allow(clippy::expect_used)]
let signature = webhook
.sign(&message_id, timestamp, payload.to_string().as_ref())
.expect("signature");
tracing::info!(?signature, "");
let url = webhook_config.url();
use kxio::network::{NetRequest, NetUrl, RequestBody, ResponseType};
let net_url = NetUrl::new(url.to_string());
let request = NetRequest::post(net_url)
.body(RequestBody::Json(payload))
.header("webhook-id", &message_id)
.header("webhook-timestamp", &timestamp.to_string())
.header("webhook-signature", &signature)
.response_type(ResponseType::None)
.build();
net.post_json::<()>(request).await.map_or_else(
|err| {
tracing::warn!(?err, "sending webhook");
},
|_| (),
);
}

View file

@ -2,10 +2,10 @@ use actix::prelude::*;
use crate::{
messages::{ReceiveServerConfig, ReceiveValidServerConfig, ValidServerConfig},
Server,
ServerActor,
};
impl Handler<ReceiveServerConfig> for Server {
impl Handler<ReceiveServerConfig> for ServerActor {
type Result = ();
#[allow(clippy::cognitive_complexity)]
@ -21,7 +21,7 @@ impl Handler<ReceiveServerConfig> for Server {
return;
};
if msg.webhook().base_url().ends_with('/') {
if msg.inbound_webhook().base_url().ends_with('/') {
tracing::error!("webhook.url must not end with a '/'");
return;
}

View file

@ -3,30 +3,37 @@ use git_next_webhook_actor::{AddWebhookRecipient, ShutdownWebhook, WebhookActor,
use crate::{
messages::{ReceiveValidServerConfig, ValidServerConfig},
Server,
ServerActor,
};
impl Handler<ReceiveValidServerConfig> for Server {
impl Handler<ReceiveValidServerConfig> for ServerActor {
type Result = ();
fn handle(&mut self, msg: ReceiveValidServerConfig, _ctx: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: ReceiveValidServerConfig, ctx: &mut Self::Context) -> Self::Result {
let ValidServerConfig {
server_config,
socket_address,
server_storage,
} = msg.unwrap();
if let Some(webhook) = self.webhook.take() {
webhook.do_send(ShutdownWebhook);
// shutdown any existing webhook actor
if let Some(webhook_actor_addr) = self.webhook_actor_addr.take() {
webhook_actor_addr.do_send(ShutdownWebhook);
}
self.generation.inc();
// Webhook Server
tracing::info!("Starting Webhook Server...");
let webhook_router = WebhookRouter::default().start();
let webhook = server_config.webhook();
let inbound_webhook = server_config.inbound_webhook();
// Forge Actors
for (forge_alias, forge_config) in server_config.forges() {
let repo_actors = self
.create_forge_repos(forge_config, forge_alias.clone(), &server_storage, webhook)
.create_forge_repos(
forge_config,
forge_alias.clone(),
&server_storage,
inbound_webhook,
ctx.address().recipient(),
)
.into_iter()
.map(|a| self.start_actor(a))
.collect::<Vec<_>>();
@ -45,7 +52,9 @@ impl Handler<ReceiveValidServerConfig> for Server {
.insert((forge_alias.clone(), repo_alias), addr);
});
}
let webhook = WebhookActor::new(socket_address, webhook_router.recipient()).start();
self.webhook.replace(webhook);
let webhook_actor_addr =
WebhookActor::new(socket_address, webhook_router.recipient()).start();
self.webhook_actor_addr.replace(webhook_actor_addr);
self.server_config.replace(server_config);
}
}

View file

@ -2,9 +2,9 @@
use actix::prelude::*;
use git_next_webhook_actor::ShutdownWebhook;
use crate::{messages::Shutdown, Server};
use crate::{messages::Shutdown, ServerActor};
impl Handler<Shutdown> for Server {
impl Handler<Shutdown> for ServerActor {
type Result = ();
fn handle(&mut self, _msg: Shutdown, _ctx: &mut Self::Context) -> Self::Result {
@ -16,7 +16,7 @@ impl Handler<Shutdown> for Server {
tracing::debug!(%forge_alias, %repo_alias, "removed webhook");
});
tracing::debug!("server shutdown");
if let Some(webhook) = self.webhook.take() {
if let Some(webhook) = self.webhook_actor_addr.take() {
tracing::debug!("shuting down webhook");
webhook.do_send(ShutdownWebhook);
tracing::debug!("webhook shutdown");

View file

@ -7,9 +7,10 @@ pub mod messages;
use actix::prelude::*;
use git_next_config as config;
use git_next_config::server::{ServerConfig, ServerStorage, Webhook};
use git_next_config::server::{InboundWebhook, ServerConfig, ServerStorage};
use git_next_config::{ForgeAlias, ForgeConfig, GitDir, RepoAlias, ServerRepoConfig};
use git_next_git::{Generation, RepoDetails};
use git_next_repo_actor::messages::NotifyUser;
use git_next_repo_actor::{messages::CloneRepo, RepoActor};
use git_next_webhook_actor as webhook;
use kxio::{fs::FileSystem, network::Network};
@ -43,9 +44,10 @@ type Result<T> = core::result::Result<T, Error>;
#[derive(derive_with::With)]
#[with(message_log)]
pub struct Server {
pub struct ServerActor {
server_config: Option<ServerConfig>,
generation: Generation,
webhook: Option<Addr<WebhookActor>>,
webhook_actor_addr: Option<Addr<WebhookActor>>,
fs: FileSystem,
net: Network,
repository_factory: Box<dyn RepositoryFactory>,
@ -55,11 +57,11 @@ pub struct Server {
// testing
message_log: Option<Arc<RwLock<Vec<String>>>>,
}
impl Actor for Server {
impl Actor for ServerActor {
type Context = Context<Self>;
}
impl Server {
impl ServerActor {
pub fn new(
fs: FileSystem,
net: Network,
@ -68,8 +70,9 @@ impl Server {
) -> Self {
let generation = Generation::default();
Self {
server_config: None,
generation,
webhook: None,
webhook_actor_addr: None,
fs,
net,
repository_factory: repo,
@ -104,7 +107,8 @@ impl Server {
forge_config: &ForgeConfig,
forge_name: ForgeAlias,
server_storage: &ServerStorage,
webhook: &Webhook,
webhook: &InboundWebhook,
notify_user_recipient: Recipient<NotifyUser>,
) -> Vec<(ForgeAlias, RepoAlias, RepoActor)> {
let span =
tracing::info_span!("create_forge_repos", name = %forge_name, config = %forge_config);
@ -114,7 +118,11 @@ impl Server {
let mut repos = vec![];
let creator = self.create_actor(forge_name, forge_config.clone(), server_storage, webhook);
for (repo_alias, server_repo_config) in forge_config.repos() {
let forge_repo = creator((repo_alias, server_repo_config));
let forge_repo = creator((
repo_alias,
server_repo_config,
notify_user_recipient.clone(),
));
info!(
alias = %forge_repo.1,
"Created Repo"
@ -129,15 +137,18 @@ impl Server {
forge_name: ForgeAlias,
forge_config: ForgeConfig,
server_storage: &ServerStorage,
webhook: &Webhook,
) -> impl Fn((RepoAlias, &ServerRepoConfig)) -> (ForgeAlias, RepoAlias, RepoActor) {
webhook: &InboundWebhook,
) -> impl Fn(
(RepoAlias, &ServerRepoConfig, Recipient<NotifyUser>),
) -> (ForgeAlias, RepoAlias, RepoActor) {
let server_storage = server_storage.clone();
let webhook = webhook.clone();
let net = self.net.clone();
let repository_factory = self.repository_factory.duplicate();
let generation = self.generation;
let sleep_duration = self.sleep_duration;
move |(repo_alias, server_repo_config)| {
// let notify_user_recipient = server_addr.recipient();
move |(repo_alias, server_repo_config, notify_user_recipient)| {
let span = tracing::info_span!("create_actor", alias = %repo_alias, config = %server_repo_config);
let _guard = span.enter();
info!("Creating Repo");
@ -173,6 +184,7 @@ impl Server {
net.clone(),
repository_factory.duplicate(),
sleep_duration,
Some(notify_user_recipient),
);
(forge_name.clone(), repo_alias, actor)
}

View file

@ -1,7 +1,7 @@
//
use crate::{tests::given, ReceiveServerConfig, Server};
use crate::{tests::given, ReceiveServerConfig, ServerActor};
use actix::prelude::*;
use git_next_config::server::{Http, ServerConfig, ServerStorage, Webhook};
use git_next_config::server::{Http, InboundWebhook, Notification, ServerConfig, ServerStorage};
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
@ -17,11 +17,12 @@ async fn when_webhook_url_has_trailing_slash_should_not_send() {
let duration = std::time::Duration::from_millis(1);
// sut
let server = Server::new(fs.clone(), net.into(), repo, duration);
let server = ServerActor::new(fs.clone(), net.into(), repo, duration);
// collaborators
let http = Http::new("0.0.0.0".to_string(), 80);
let webhook = Webhook::new("http://localhost/".to_string()); // With trailing slash
let webhook = InboundWebhook::new("http://localhost/".to_string()); // With trailing slash
let notifications = Notification::none();
let server_storage = ServerStorage::new((fs.base()).to_path_buf());
let repos = BTreeMap::default();
@ -35,6 +36,7 @@ async fn when_webhook_url_has_trailing_slash_should_not_send() {
.do_send(ReceiveServerConfig::new(ServerConfig::new(
http,
webhook,
notifications,
server_storage,
repos,
)));

View file

@ -8,6 +8,10 @@ url = "https://localhost:8080" # don't include any query path or a trailing slas
[storage]
path = "./data"
[notifications]
type = "WebHook"
webhook = { url = "https://localhost:9090" }
[forge]
[forge.default]

View file

@ -1,7 +1,7 @@
//
use actix::prelude::*;
use git_next_file_watcher_actor::{FileUpdated, FileWatcher};
use git_next_server_actor::Server;
use git_next_server_actor::ServerActor;
use kxio::{fs::FileSystem, network::Network};
use std::path::PathBuf;
use tracing::{error, info, level_filters::LevelFilter};
@ -40,7 +40,7 @@ pub fn start(
info!("Starting Server...");
let execution = async move {
let server = Server::new(fs.clone(), net.clone(), repo, sleep_duration).start();
let server = ServerActor::new(fs.clone(), net.clone(), repo, sleep_duration).start();
server.do_send(FileUpdated);
info!("Starting File Watcher...");