diff --git a/src/import/attachment.rs b/src/import/attachment.rs index 145fc05..3d49851 100644 --- a/src/import/attachment.rs +++ b/src/import/attachment.rs @@ -1,12 +1,15 @@ +use color_eyre::eyre::Context; // -use kameo::{mailbox::unbounded::UnboundedMailbox, Actor}; - +use crate::import::rate_limit::{RateLimitActor, RequestToken}; use crate::{ + ask, e, f, nextcloud::model::{NextcloudBoardId, NextcloudCardId, NextcloudStackId}, on_actor_start, p, trello::model::{attachment::TrelloAttachment, card::TrelloCardId}, FullCtx, }; +use kameo::actor::ActorRef; +use kameo::{mailbox::unbounded::UnboundedMailbox, Actor}; pub(crate) struct ImportAttachmentActor { ctx: FullCtx, @@ -15,6 +18,7 @@ pub(crate) struct ImportAttachmentActor { nextcloud_card_id: NextcloudCardId, trello_card_id: TrelloCardId, trello_attachment: TrelloAttachment, + rate_limit_actor: ActorRef, } impl ImportAttachmentActor { pub fn new( @@ -24,6 +28,7 @@ impl ImportAttachmentActor { nextcloud_card_id: NextcloudCardId, trello_card_id: TrelloCardId, trello_attachment: TrelloAttachment, + rate_limit_actor: ActorRef, ) -> Self { Self { ctx, @@ -32,6 +37,7 @@ impl ImportAttachmentActor { nextcloud_card_id, trello_card_id, trello_attachment, + rate_limit_actor, } } } @@ -49,6 +55,7 @@ impl Actor for ImportAttachmentActor { let dir = this.ctx.temp_fs.dir(&dir_path); dir.create()?; let file_path = dir_path.join(&*this.trello_attachment.name); + ask!(this.rate_limit_actor, RequestToken)?; let attachment_path = this .ctx .trello_client() @@ -58,7 +65,8 @@ impl Actor for ImportAttachmentActor { Some(&file_path), &this.ctx.temp_fs, ) - .await?; + .await + .with_context(|| f!("saving attachment {}", file_path.display()))?; let attachment_file = this.ctx.temp_fs.file(&attachment_path); // - - - upload the attachment to nextcloud card let attachment = this @@ -71,7 +79,16 @@ impl Actor for ImportAttachmentActor { &attachment_file, ) .await - .result?; + .result + .inspect_err(|e| { + e!( + this.ctx.prt, + ">> Error adding attachment {} to card: {}", + file_path.display(), + e.to_string() + ); + }) + .with_context(|| f!("adding attachment to card {}", file_path.display()))?; p!( this.ctx.prt, ">> Attachment added: {}:{}", @@ -80,8 +97,11 @@ impl Actor for ImportAttachmentActor { // attachment.extended_data.mimetype ); // delete local copy of attachment - attachment_file.remove()?; - dir.remove()?; + attachment_file + .remove() + .with_context(|| f!("deleting temp file {attachment_file}"))?; + dir.remove() + .with_context(|| f!("deleting temp dir {dir}"))?; Ok(actor_ref.stop_gracefully().await?) }); diff --git a/src/import/card.rs b/src/import/card.rs index 7caac23..196964e 100644 --- a/src/import/card.rs +++ b/src/import/card.rs @@ -8,7 +8,10 @@ use kameo::{ Actor, }; +use crate::import::rate_limit::{RateLimitActor, RequestToken}; +use crate::rate_limit::RateLimit; use crate::{ + ask, import::{attachment::ImportAttachmentActor, label::ImportLabelActor, labels::LabelsActor}, nextcloud::{ client::DeckClient, @@ -28,6 +31,7 @@ pub(crate) struct ImportCardActor { nextcloud_board_id: NextcloudBoardId, nextcloud_stack_id: NextcloudStackId, labels_actor_ref: ActorRef, + rate_limit_actor: ActorRef, labels_children: HashMap)>, attachments_children: HashMap)>, @@ -39,6 +43,7 @@ impl ImportCardActor { nextcloud_board_id: NextcloudBoardId, nextcloud_stack_id: NextcloudStackId, labels_actor_ref: ActorRef, + rate_limit_actor: ActorRef, ) -> Self { Self { ctx, @@ -46,6 +51,7 @@ impl ImportCardActor { nextcloud_board_id, nextcloud_stack_id, labels_actor_ref, + rate_limit_actor, labels_children: Default::default(), attachments_children: Default::default(), @@ -77,10 +83,13 @@ impl Actor for ImportCardActor { .await .result?; + let mut limit = RateLimit::new("labels & attachments", 10, 10.0, this.ctx.now()); + // - - for each label on the trello card let mut labels = vec![]; std::mem::swap(&mut this.trello_card.labels, &mut labels); for trello_label in labels.into_iter() { + limit.block_until_token_available(this.ctx.now()).await; let trello_label_name = trello_label.name.clone(); let child = spawn_in_thread!( actor_ref, @@ -97,12 +106,14 @@ impl Actor for ImportCardActor { .insert(child.id(), (trello_label_name, child.clone())); } + ask!(this.rate_limit_actor, RequestToken)?; let attachments = trello_client .card(&this.trello_card.id) .await .result? .attachments; for trello_attachment in attachments.into_iter() { + limit.block_until_token_available(this.ctx.now()).await; if trello_attachment.is_upload { let trello_attachment_name = trello_attachment.name.clone(); let child = spawn_in_thread!( @@ -114,6 +125,7 @@ impl Actor for ImportCardActor { nextcloud_card.id, this.trello_card.id.clone(), trello_attachment, + this.rate_limit_actor.clone(), ) ); this.attachments_children diff --git a/src/import/mod.rs b/src/import/mod.rs index 28ac39e..2f708d0 100644 --- a/src/import/mod.rs +++ b/src/import/mod.rs @@ -4,8 +4,12 @@ use kameo::actor::spawn_in_thread; use crate::{ f, - import::{labels::LabelsActor, stacks::ImportStacksActor, supervisor::Supervisor}, + import::{ + labels::LabelsActor, rate_limit::RateLimitActor, stacks::ImportStacksActor, + supervisor::Supervisor, + }, nextcloud::model::NextcloudBoardTitle, + rate_limit::RateLimit, spawn_in_thread, trello::model::board::{TrelloBoardId, TrelloBoardName, TrelloBoards}, FullCtx, @@ -15,6 +19,7 @@ mod attachment; mod card; mod label; mod labels; +mod rate_limit; mod stack; mod stacks; mod supervisor; @@ -72,13 +77,16 @@ pub(crate) async fn run(ctx: &FullCtx) -> color_eyre::Result<()> { // Start importing + println!("> Loading Nextcloud board: {nextcloud_board_name}"); // get the id of the selected board let nextcloud_board_id = nextcloud_boards .iter() .find(|b| b.title == nextcloud_board_name) .map(|b| b.id) .expect("find selected board"); + println!("> Loaded Nextcloud board: {nextcloud_board_name}"); + println!("> Spawning actors"); let supervisor = spawn_in_thread(Supervisor); let labels_actor_ref = spawn_in_thread!( @@ -86,6 +94,12 @@ pub(crate) async fn run(ctx: &FullCtx) -> color_eyre::Result<()> { LabelsActor::new(ctx.clone(), nextcloud_board_id) ); + let now = ctx.now(); + let trello_rate_limiter = spawn_in_thread!( + supervisor, + RateLimitActor::new(RateLimit::new("trello", 100, 10.0, now)) + ); + let _main = spawn_in_thread!( supervisor, ImportStacksActor::new( @@ -94,8 +108,10 @@ pub(crate) async fn run(ctx: &FullCtx) -> color_eyre::Result<()> { selected_trello_stack_names.into_iter().cloned().collect(), nextcloud_board_id, labels_actor_ref.clone(), + trello_rate_limiter.clone(), ) ); + println!("> Spawned main actors"); supervisor.wait_for_stop().await; diff --git a/src/import/rate_limit.rs b/src/import/rate_limit.rs new file mode 100644 index 0000000..b0cf3ea --- /dev/null +++ b/src/import/rate_limit.rs @@ -0,0 +1,34 @@ +use crate::{message, rate_limit::RateLimit}; +use kameo::{ + mailbox::unbounded::UnboundedMailbox, + message::{Context, Message}, + Actor, +}; +use tokio::time::Instant; + +message!(RequestToken, "Request a rate limited token"); + +pub(crate) struct RateLimitActor { + limiter: RateLimit, +} +impl RateLimitActor { + pub fn new(limiter: RateLimit) -> Self { + Self { limiter } + } +} +impl Actor for RateLimitActor { + type Mailbox = UnboundedMailbox; +} +impl Message for RateLimitActor { + type Reply = (); + + async fn handle( + &mut self, + _msg: RequestToken, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.limiter + .block_until_token_available(Instant::now()) + .await + } +} diff --git a/src/import/stack.rs b/src/import/stack.rs index 553790c..73c6a8e 100644 --- a/src/import/stack.rs +++ b/src/import/stack.rs @@ -8,7 +8,10 @@ use kameo::{ Actor, }; +use crate::import::rate_limit::{RateLimitActor, RequestToken}; +use crate::rate_limit::RateLimit; use crate::{ + ask, import::{card::ImportCardActor, labels::LabelsActor}, nextcloud::model::{NextcloudBoardId, NextcloudStackId}, on_actor_link_died, on_actor_start, spawn_in_thread, @@ -28,6 +31,7 @@ pub(super) struct ImportStackActor { nextcloud_board_id: NextcloudBoardId, nextcloud_stack_id: NextcloudStackId, labels_actor_ref: ActorRef, + rate_limit_actor: ActorRef, children: HashMap)>, } impl ImportStackActor { @@ -37,6 +41,7 @@ impl ImportStackActor { nextcloud_board_id: NextcloudBoardId, nextcloud_stack_id: NextcloudStackId, labels_actor_ref: ActorRef, + rate_limit_actor: ActorRef, ) -> Self { Self { ctx, @@ -44,6 +49,7 @@ impl ImportStackActor { nextcloud_board_id, nextcloud_stack_id, labels_actor_ref, + rate_limit_actor, children: Default::default(), } } @@ -56,6 +62,7 @@ impl Actor for ImportStackActor { crate::p!(this.ctx.prt, "Importing stack: {}", this.trello_stack.name); // - get the list of trello cards in the stack + ask!(this.rate_limit_actor, RequestToken)?; let mut trello_cards = trello_client .list_cards(&TrelloListId::new(this.trello_stack.id.as_ref())) .await @@ -63,8 +70,12 @@ impl Actor for ImportStackActor { // sort cards by their position trello_cards.sort_by_key(|card| card.pos); + let mut limit = RateLimit::new("cards", 10, 10.0, this.ctx.now()); + // - for each card in the trello stack for trello_card in trello_cards.into_iter() { + limit.block_until_token_available(this.ctx.now()).await; + ask!(this.rate_limit_actor, RequestToken)?; let trello_card_name = trello_card.name.clone(); let child: ActorRef = spawn_in_thread!( actor_ref.clone(), @@ -74,6 +85,7 @@ impl Actor for ImportStackActor { this.nextcloud_board_id, this.nextcloud_stack_id, this.labels_actor_ref.clone(), + this.rate_limit_actor.clone(), ) ); this.children diff --git a/src/import/stacks.rs b/src/import/stacks.rs index 3b949c6..256db3b 100644 --- a/src/import/stacks.rs +++ b/src/import/stacks.rs @@ -8,6 +8,8 @@ use kameo::{ Actor, }; +use crate::import::rate_limit::RateLimitActor; +use crate::rate_limit::RateLimit; use crate::{ import::{labels::LabelsActor, stack::ImportStackActor}, nextcloud::{ @@ -72,6 +74,7 @@ pub(super) struct ImportStacksActor { selected_trello_stack_names: Vec, nextcloud_board_id: NextcloudBoardId, labels_actor_ref: ActorRef, + rate_limit_actor: ActorRef, children: HashMap)>, } @@ -82,6 +85,7 @@ impl ImportStacksActor { selected_trello_stack_names: Vec, nextcloud_board_id: NextcloudBoardId, labels_actor_ref: ActorRef, + rate_limit_actor: ActorRef, ) -> Self { Self { ctx, @@ -89,6 +93,7 @@ impl ImportStacksActor { selected_trello_stack_names, nextcloud_board_id, labels_actor_ref, + rate_limit_actor, children: Default::default(), } } @@ -126,8 +131,11 @@ impl Actor for ImportStacksActor { .await .result?; + let mut limit = RateLimit::new("stacks", 1, 0.1, this.ctx.now()); + // for each selected trello stack for selected_trello_stack in selected_trello_stacks { + limit.block_until_token_available(this.ctx.now()).await; let nextcloud_stack_id = //this. nextcloud_stacks .iter() @@ -142,6 +150,7 @@ impl Actor for ImportStacksActor { this.nextcloud_board_id, nextcloud_stack_id, this.labels_actor_ref.clone(), + this.rate_limit_actor.clone() ) ); this.children diff --git a/src/lib.rs b/src/lib.rs index c06d6b7..d68d503 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,10 +5,10 @@ use crate::{nextcloud::client::DeckClient, trello::client::TrelloClient}; use clap::Parser; use color_eyre::eyre::eyre; use config::AppConfig; +use execute::Execute; use kxio::fs::TempFileSystem; use kxio::{fs::FileSystem, kxeprintln as e, kxprintln as p, net::Net, print::Printer}; - -use execute::Execute; +use tokio::time::Instant; mod api_result; mod check; @@ -19,6 +19,7 @@ mod import; mod init; pub mod macros; mod nextcloud; +mod rate_limit; mod template; mod trello; @@ -86,6 +87,13 @@ pub(crate) struct FullCtx { pub prt: Printer, pub cfg: AppConfig, } + +impl FullCtx { + pub(crate) fn now(&self) -> Instant { + Instant::now() + } +} + impl FullCtx { pub(crate) const fn deck_client(&self) -> DeckClient { DeckClient::new(self) diff --git a/src/macros/message.rs b/src/macros/message.rs new file mode 100644 index 0000000..5a0fdc2 --- /dev/null +++ b/src/macros/message.rs @@ -0,0 +1,15 @@ +#[macro_export] +macro_rules! message { + ($name:ident, $value:ty, $docs:literal) => { + $crate::newtype!($name, $value, $docs); + }; + ($name:ident, $docs:literal) => { + $crate::newtype!($name, $docs); + }; + ($name:ident, $value:ty => $result:ty, $docs:literal) => { + $crate::newtype!($name, $value, $docs); + }; + ($name:ident => $result:ty, $docs:literal) => { + $crate::newtype!($name, $docs); + }; +} diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 9abc796..639db4f 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -1,6 +1,7 @@ // mod actor; mod backoff; +mod message; mod newtype; mod print; mod send; diff --git a/src/rate_limit.rs b/src/rate_limit.rs new file mode 100644 index 0000000..e2a64b2 --- /dev/null +++ b/src/rate_limit.rs @@ -0,0 +1,96 @@ +use std::time::Duration; +use tokio::time::Instant; +use tracing::instrument; + +#[derive(Debug)] +pub(crate) struct RateLimit { + /// The name of the limit + pub(crate) name: String, + + /// Last token was consumed at + pub(crate) last_consumed_at: Instant, + + /// The number of tokens available after the last was consumed + pub(crate) last_token_count: u32, + + /// Maximum tokens we can have at any one time + pub(crate) max: u32, + + /// Tokens replenish rate + pub(crate) tokens_per_second: f32, +} + +#[derive(Debug, PartialEq, Eq)] +pub(crate) enum Token { + Available, + NotAvailable { sleep_nanos: u64 }, +} +impl RateLimit { + // 300 tokens / 10 seconds => max 300, tokens_per_second 30 + pub(crate) fn new( + name: impl Into, + max: u32, + tokens_per_second: f32, + now: Instant, + ) -> Self { + Self { + name: name.into(), + last_consumed_at: now, + last_token_count: max, + max, + tokens_per_second, + } + } + + pub(crate) async fn block_until_token_available(&mut self, now: Instant) { + let mut now = now; + loop { + match self.acquire_token(now) { + Token::Available => { + return; + } + Token::NotAvailable { sleep_nanos } => { + tracing::debug!("sleeping for {}ms", sleep_nanos / 1_000_000); + tokio::time::sleep(Duration::from_nanos(sleep_nanos)).await; + now += Duration::from_nanos(sleep_nanos); + } + } + } + } + + #[instrument] + pub(crate) fn acquire_token(&mut self, now: Instant) -> Token { + debug_assert!(now > self.last_consumed_at, "time must pass"); + let elapsed_nanos = now.duration_since(self.last_consumed_at).as_nanos(); + tracing::trace!(?elapsed_nanos, "ELAPSED"); + + // Calculate new tokens generated since last consumption + let nanos_per_token = (Duration::from_secs(1).as_nanos() as f32) / self.tokens_per_second; + let new_tokens = (elapsed_nanos as f32 / nanos_per_token) as u32; + let current_tokens = (self.last_token_count + new_tokens).min(self.max); + tracing::trace!(%elapsed_nanos, %nanos_per_token, %new_tokens, %current_tokens, "tokens?"); + + if current_tokens > 0 { + self.last_token_count = current_tokens - 1; + self.last_consumed_at = now; + tracing::trace!("token available"); + return Token::Available; + } + + // Calculate time until next token is available + let time_to_next = if current_tokens == 0 { + let tokens_needed = 1; + let seconds = (tokens_needed as f64) / (self.tokens_per_second as f64); + tracing::trace!(?seconds, "time until next token"); + Duration::from_secs_f64(seconds) + } else { + Duration::from_secs(0) + }; + + tracing::trace!(?time_to_next, "token not available"); + println!("[{}] sleep {}ms", self.name, time_to_next.as_millis()); + Token::NotAvailable { + sleep_nanos: time_to_next.as_nanos() as u64, + } + } +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 1bc9586..bc43773 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -13,6 +13,7 @@ mod check; mod config; pub(crate) mod given; mod init; +mod rate_limit; mod template; #[test] diff --git a/src/tests/rate_limit.rs b/src/tests/rate_limit.rs new file mode 100644 index 0000000..3fa5f96 --- /dev/null +++ b/src/tests/rate_limit.rs @@ -0,0 +1,141 @@ +// +use std::ops::Add; + +use tokio::time::{Duration, Instant}; + +use crate::rate_limit::{RateLimit, Token}; + +#[tokio::test] +async fn test_initial_token_available() { + //given + let mut rate_limit = RateLimit::new("Test", 10, 10.0, Instant::now()); // 10 tokens per second, max 10 tokens + + //when + let token = rate_limit.acquire_token(Instant::now()); + + //then + assert_eq!(token, Token::Available); +} + +#[tokio::test] +async fn test_max_tokens_respected() { + //given + let start = Instant::now(); + let mut rate_limit = RateLimit::new("Test", 5, 10.0, start); // 10 tokens per second, max 5 tokens + + //when + let token = rate_limit.acquire_token(start.add(Duration::from_secs(1))); + + //then + assert_eq!(token, Token::Available); + assert_eq!(rate_limit.last_token_count, 4); // Should be max-1 after consuming one token +} + +#[tokio::test] +async fn test_token_exhaustion() { + //given + let mut rate_limit = RateLimit::new("Test", 1, 1.0, Instant::now()); // 1 token per second, max 1 token + + // Consume the only available token + assert_eq!(rate_limit.acquire_token(Instant::now()), Token::Available); + + //when + let token = rate_limit.acquire_token(Instant::now()); + + //then + // Next request should not be available + match token { + Token::NotAvailable { sleep_nanos } => { + assert!(sleep_nanos > 0); + } + Token::Available => panic!("Should not have token available"), + } +} + +#[test] +fn add_duration_to_instant() { + //given + let start = Instant::now(); + //when + let later = start.add(Duration::from_secs(1)); + //then + assert!(later > start); + assert!(later.duration_since(start) == Duration::from_secs(1)); +} + +#[test_log::test(tokio::test)] +async fn test_token_regeneration() { + //given + let mut rate_limit = RateLimit::new("Test", 1, 1.0, Instant::now()); // 1 token per second, max 1 token + + tracing::info!("Consume initial token"); + assert_eq!(rate_limit.acquire_token(Instant::now()), Token::Available); + + //when + let later = Instant::now().add(Duration::from_millis(1100)); + tracing::info!(later=?later.elapsed().as_nanos(), "Wait for more than 1 second"); + let token = rate_limit.acquire_token(later); // 1.1 seconds + + //then + // Should have generated a new token + assert_eq!(token, Token::Available); +} + +#[test_log::test(tokio::test)] +async fn test_block_until_token_available() { + //given + let mut rate_limit = RateLimit::new("Test", 1, 2.0, Instant::now()); // 2 tokens per second, max 1 token + tracing::info!("consume initial token"); + rate_limit.block_until_token_available(Instant::now()).await; // Consume initial tokens + + //then + tracing::info!("blocking until next token available"); + let start = Instant::now(); + tokio::select! { + _ = rate_limit.block_until_token_available(Instant::now()) => { // This should block for approximately 0.5 seconds + tracing::info!("received token"); + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + panic!("Timeout waiting for token"); + } + } + let elapsed = start.elapsed(); + + //then + assert!(elapsed >= Duration::from_millis(450), "{elapsed:?}") +} + +#[tokio::test] +async fn test_multiple_concurrent_requests() { + let rate_limit = std::sync::Arc::new(tokio::sync::Mutex::new( + RateLimit::new("Test", 2, 2.0, Instant::now()), // 2 tokens per second, max 1 token + )); + + let mut handles = vec![]; + + // Launch 3 concurrent requests + + tokio::select! { + _ = { + // should take ~0.5 second for third token to become available + async{ + for _ in 0..3 { + let rate_limit_clone = rate_limit.clone(); + handles.push(tokio::spawn(async move { + let mut rate_limit = rate_limit_clone.lock().await; + rate_limit.block_until_token_available(Instant::now()).await; + })); + } + // Wait for all requests to complete + for handle in handles { + handle.await.unwrap(); + } + } + } => { + tracing::info!("concurrent requests okay"); + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + panic!("Timeout waiting for token"); + } + } +} diff --git a/src/trello/client.rs b/src/trello/client.rs index 7483dfe..5aebd8a 100644 --- a/src/trello/client.rs +++ b/src/trello/client.rs @@ -97,6 +97,18 @@ impl<'ctx> TrelloClient<'ctx> { .bytes() .await .context("downloading attachment")?; + let Some(expected_file_size) = attachment.bytes else { + crate::e!(self.ctx.prt, "==> no file size in attachment"); + return Err(color_eyre::eyre::eyre!("no file size in attachment")); + }; + if resp.len() != expected_file_size { + crate::e!( + self.ctx.prt, + "==> expected file size: {} != actual file size: {}", + expected_file_size, + resp.len() + ); + } let file = fs.file(&file_name); file.write(resp) .map_err(|e| { diff --git a/src/trello/model/attachment.rs b/src/trello/model/attachment.rs index 16f8f79..d2aa813 100644 --- a/src/trello/model/attachment.rs +++ b/src/trello/model/attachment.rs @@ -23,6 +23,7 @@ newtype!( pub(crate) struct TrelloAttachment { pub(crate) id: TrelloAttachmentId, // "5abbe4b7ddc1b351ef961414", pub(crate) name: TrelloAttachmentName, //"Deprecation Extension Notice", + pub(crate) bytes: Option, pub(crate) url: TrelloAttachmentUrl, //"https://admin.typeform.com/form/RzExEM/share#/link", #[serde(rename = "fileName")] pub(crate) file_name: TrelloAttachmentFilename,