feat: add Rate Limits on trello API and own parallisation
Some checks failed
Test / build (map[name:nightly]) (push) Successful in 4m30s
Test / build (map[name:stable]) (push) Successful in 9m30s
Release Please / Release-plz (push) Failing after 42s

- Limits the rate of API calls to Trello
- Limits the number of stacks, cards, labels and attachments that are running in parallel
This commit is contained in:
Paul Campbell 2024-12-23 14:55:51 +00:00
parent 2dac4374f3
commit 756abd61c7
14 changed files with 387 additions and 9 deletions

View file

@ -1,12 +1,15 @@
use color_eyre::eyre::Context;
//
use kameo::{mailbox::unbounded::UnboundedMailbox, Actor};
use crate::import::rate_limit::{RateLimitActor, RequestToken};
use crate::{
ask, e, f,
nextcloud::model::{NextcloudBoardId, NextcloudCardId, NextcloudStackId},
on_actor_start, p,
trello::model::{attachment::TrelloAttachment, card::TrelloCardId},
FullCtx,
};
use kameo::actor::ActorRef;
use kameo::{mailbox::unbounded::UnboundedMailbox, Actor};
pub(crate) struct ImportAttachmentActor {
ctx: FullCtx,
@ -15,6 +18,7 @@ pub(crate) struct ImportAttachmentActor {
nextcloud_card_id: NextcloudCardId,
trello_card_id: TrelloCardId,
trello_attachment: TrelloAttachment,
rate_limit_actor: ActorRef<RateLimitActor>,
}
impl ImportAttachmentActor {
pub fn new(
@ -24,6 +28,7 @@ impl ImportAttachmentActor {
nextcloud_card_id: NextcloudCardId,
trello_card_id: TrelloCardId,
trello_attachment: TrelloAttachment,
rate_limit_actor: ActorRef<RateLimitActor>,
) -> Self {
Self {
ctx,
@ -32,6 +37,7 @@ impl ImportAttachmentActor {
nextcloud_card_id,
trello_card_id,
trello_attachment,
rate_limit_actor,
}
}
}
@ -49,6 +55,7 @@ impl Actor for ImportAttachmentActor {
let dir = this.ctx.temp_fs.dir(&dir_path);
dir.create()?;
let file_path = dir_path.join(&*this.trello_attachment.name);
ask!(this.rate_limit_actor, RequestToken)?;
let attachment_path = this
.ctx
.trello_client()
@ -58,7 +65,8 @@ impl Actor for ImportAttachmentActor {
Some(&file_path),
&this.ctx.temp_fs,
)
.await?;
.await
.with_context(|| f!("saving attachment {}", file_path.display()))?;
let attachment_file = this.ctx.temp_fs.file(&attachment_path);
// - - - upload the attachment to nextcloud card
let attachment = this
@ -71,7 +79,16 @@ impl Actor for ImportAttachmentActor {
&attachment_file,
)
.await
.result?;
.result
.inspect_err(|e| {
e!(
this.ctx.prt,
">> Error adding attachment {} to card: {}",
file_path.display(),
e.to_string()
);
})
.with_context(|| f!("adding attachment to card {}", file_path.display()))?;
p!(
this.ctx.prt,
">> Attachment added: {}:{}",
@ -80,8 +97,11 @@ impl Actor for ImportAttachmentActor {
// attachment.extended_data.mimetype
);
// delete local copy of attachment
attachment_file.remove()?;
dir.remove()?;
attachment_file
.remove()
.with_context(|| f!("deleting temp file {attachment_file}"))?;
dir.remove()
.with_context(|| f!("deleting temp dir {dir}"))?;
Ok(actor_ref.stop_gracefully().await?)
});

View file

@ -8,7 +8,10 @@ use kameo::{
Actor,
};
use crate::import::rate_limit::{RateLimitActor, RequestToken};
use crate::rate_limit::RateLimit;
use crate::{
ask,
import::{attachment::ImportAttachmentActor, label::ImportLabelActor, labels::LabelsActor},
nextcloud::{
client::DeckClient,
@ -28,6 +31,7 @@ pub(crate) struct ImportCardActor {
nextcloud_board_id: NextcloudBoardId,
nextcloud_stack_id: NextcloudStackId,
labels_actor_ref: ActorRef<LabelsActor>,
rate_limit_actor: ActorRef<RateLimitActor>,
labels_children: HashMap<ActorID, (TrelloLabelName, ActorRef<ImportLabelActor>)>,
attachments_children: HashMap<ActorID, (TrelloAttachmentName, ActorRef<ImportAttachmentActor>)>,
@ -39,6 +43,7 @@ impl ImportCardActor {
nextcloud_board_id: NextcloudBoardId,
nextcloud_stack_id: NextcloudStackId,
labels_actor_ref: ActorRef<LabelsActor>,
rate_limit_actor: ActorRef<RateLimitActor>,
) -> Self {
Self {
ctx,
@ -46,6 +51,7 @@ impl ImportCardActor {
nextcloud_board_id,
nextcloud_stack_id,
labels_actor_ref,
rate_limit_actor,
labels_children: Default::default(),
attachments_children: Default::default(),
@ -77,10 +83,13 @@ impl Actor for ImportCardActor {
.await
.result?;
let mut limit = RateLimit::new("labels & attachments", 10, 10.0, this.ctx.now());
// - - for each label on the trello card
let mut labels = vec![];
std::mem::swap(&mut this.trello_card.labels, &mut labels);
for trello_label in labels.into_iter() {
limit.block_until_token_available(this.ctx.now()).await;
let trello_label_name = trello_label.name.clone();
let child = spawn_in_thread!(
actor_ref,
@ -97,12 +106,14 @@ impl Actor for ImportCardActor {
.insert(child.id(), (trello_label_name, child.clone()));
}
ask!(this.rate_limit_actor, RequestToken)?;
let attachments = trello_client
.card(&this.trello_card.id)
.await
.result?
.attachments;
for trello_attachment in attachments.into_iter() {
limit.block_until_token_available(this.ctx.now()).await;
if trello_attachment.is_upload {
let trello_attachment_name = trello_attachment.name.clone();
let child = spawn_in_thread!(
@ -114,6 +125,7 @@ impl Actor for ImportCardActor {
nextcloud_card.id,
this.trello_card.id.clone(),
trello_attachment,
this.rate_limit_actor.clone(),
)
);
this.attachments_children

View file

@ -4,8 +4,12 @@ use kameo::actor::spawn_in_thread;
use crate::{
f,
import::{labels::LabelsActor, stacks::ImportStacksActor, supervisor::Supervisor},
import::{
labels::LabelsActor, rate_limit::RateLimitActor, stacks::ImportStacksActor,
supervisor::Supervisor,
},
nextcloud::model::NextcloudBoardTitle,
rate_limit::RateLimit,
spawn_in_thread,
trello::model::board::{TrelloBoardId, TrelloBoardName, TrelloBoards},
FullCtx,
@ -15,6 +19,7 @@ mod attachment;
mod card;
mod label;
mod labels;
mod rate_limit;
mod stack;
mod stacks;
mod supervisor;
@ -72,13 +77,16 @@ pub(crate) async fn run(ctx: &FullCtx) -> color_eyre::Result<()> {
// Start importing
println!("> Loading Nextcloud board: {nextcloud_board_name}");
// get the id of the selected board
let nextcloud_board_id = nextcloud_boards
.iter()
.find(|b| b.title == nextcloud_board_name)
.map(|b| b.id)
.expect("find selected board");
println!("> Loaded Nextcloud board: {nextcloud_board_name}");
println!("> Spawning actors");
let supervisor = spawn_in_thread(Supervisor);
let labels_actor_ref = spawn_in_thread!(
@ -86,6 +94,12 @@ pub(crate) async fn run(ctx: &FullCtx) -> color_eyre::Result<()> {
LabelsActor::new(ctx.clone(), nextcloud_board_id)
);
let now = ctx.now();
let trello_rate_limiter = spawn_in_thread!(
supervisor,
RateLimitActor::new(RateLimit::new("trello", 100, 10.0, now))
);
let _main = spawn_in_thread!(
supervisor,
ImportStacksActor::new(
@ -94,8 +108,10 @@ pub(crate) async fn run(ctx: &FullCtx) -> color_eyre::Result<()> {
selected_trello_stack_names.into_iter().cloned().collect(),
nextcloud_board_id,
labels_actor_ref.clone(),
trello_rate_limiter.clone(),
)
);
println!("> Spawned main actors");
supervisor.wait_for_stop().await;

34
src/import/rate_limit.rs Normal file
View file

@ -0,0 +1,34 @@
use crate::{message, rate_limit::RateLimit};
use kameo::{
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
Actor,
};
use tokio::time::Instant;
message!(RequestToken, "Request a rate limited token");
pub(crate) struct RateLimitActor {
limiter: RateLimit,
}
impl RateLimitActor {
pub fn new(limiter: RateLimit) -> Self {
Self { limiter }
}
}
impl Actor for RateLimitActor {
type Mailbox = UnboundedMailbox<Self>;
}
impl Message<RequestToken> for RateLimitActor {
type Reply = ();
async fn handle(
&mut self,
_msg: RequestToken,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.limiter
.block_until_token_available(Instant::now())
.await
}
}

View file

@ -8,7 +8,10 @@ use kameo::{
Actor,
};
use crate::import::rate_limit::{RateLimitActor, RequestToken};
use crate::rate_limit::RateLimit;
use crate::{
ask,
import::{card::ImportCardActor, labels::LabelsActor},
nextcloud::model::{NextcloudBoardId, NextcloudStackId},
on_actor_link_died, on_actor_start, spawn_in_thread,
@ -28,6 +31,7 @@ pub(super) struct ImportStackActor {
nextcloud_board_id: NextcloudBoardId,
nextcloud_stack_id: NextcloudStackId,
labels_actor_ref: ActorRef<LabelsActor>,
rate_limit_actor: ActorRef<RateLimitActor>,
children: HashMap<ActorID, (TrelloCardName, ActorRef<ImportCardActor>)>,
}
impl ImportStackActor {
@ -37,6 +41,7 @@ impl ImportStackActor {
nextcloud_board_id: NextcloudBoardId,
nextcloud_stack_id: NextcloudStackId,
labels_actor_ref: ActorRef<LabelsActor>,
rate_limit_actor: ActorRef<RateLimitActor>,
) -> Self {
Self {
ctx,
@ -44,6 +49,7 @@ impl ImportStackActor {
nextcloud_board_id,
nextcloud_stack_id,
labels_actor_ref,
rate_limit_actor,
children: Default::default(),
}
}
@ -56,6 +62,7 @@ impl Actor for ImportStackActor {
crate::p!(this.ctx.prt, "Importing stack: {}", this.trello_stack.name);
// - get the list of trello cards in the stack
ask!(this.rate_limit_actor, RequestToken)?;
let mut trello_cards = trello_client
.list_cards(&TrelloListId::new(this.trello_stack.id.as_ref()))
.await
@ -63,8 +70,12 @@ impl Actor for ImportStackActor {
// sort cards by their position
trello_cards.sort_by_key(|card| card.pos);
let mut limit = RateLimit::new("cards", 10, 10.0, this.ctx.now());
// - for each card in the trello stack
for trello_card in trello_cards.into_iter() {
limit.block_until_token_available(this.ctx.now()).await;
ask!(this.rate_limit_actor, RequestToken)?;
let trello_card_name = trello_card.name.clone();
let child: ActorRef<ImportCardActor> = spawn_in_thread!(
actor_ref.clone(),
@ -74,6 +85,7 @@ impl Actor for ImportStackActor {
this.nextcloud_board_id,
this.nextcloud_stack_id,
this.labels_actor_ref.clone(),
this.rate_limit_actor.clone(),
)
);
this.children

View file

@ -8,6 +8,8 @@ use kameo::{
Actor,
};
use crate::import::rate_limit::RateLimitActor;
use crate::rate_limit::RateLimit;
use crate::{
import::{labels::LabelsActor, stack::ImportStackActor},
nextcloud::{
@ -72,6 +74,7 @@ pub(super) struct ImportStacksActor {
selected_trello_stack_names: Vec<String>,
nextcloud_board_id: NextcloudBoardId,
labels_actor_ref: ActorRef<LabelsActor>,
rate_limit_actor: ActorRef<RateLimitActor>,
children: HashMap<ActorID, (TrelloListName, ActorRef<ImportStackActor>)>,
}
@ -82,6 +85,7 @@ impl ImportStacksActor {
selected_trello_stack_names: Vec<String>,
nextcloud_board_id: NextcloudBoardId,
labels_actor_ref: ActorRef<LabelsActor>,
rate_limit_actor: ActorRef<RateLimitActor>,
) -> Self {
Self {
ctx,
@ -89,6 +93,7 @@ impl ImportStacksActor {
selected_trello_stack_names,
nextcloud_board_id,
labels_actor_ref,
rate_limit_actor,
children: Default::default(),
}
}
@ -126,8 +131,11 @@ impl Actor for ImportStacksActor {
.await
.result?;
let mut limit = RateLimit::new("stacks", 1, 0.1, this.ctx.now());
// for each selected trello stack
for selected_trello_stack in selected_trello_stacks {
limit.block_until_token_available(this.ctx.now()).await;
let nextcloud_stack_id = //this.
nextcloud_stacks
.iter()
@ -142,6 +150,7 @@ impl Actor for ImportStacksActor {
this.nextcloud_board_id,
nextcloud_stack_id,
this.labels_actor_ref.clone(),
this.rate_limit_actor.clone()
)
);
this.children

View file

@ -5,10 +5,10 @@ use crate::{nextcloud::client::DeckClient, trello::client::TrelloClient};
use clap::Parser;
use color_eyre::eyre::eyre;
use config::AppConfig;
use execute::Execute;
use kxio::fs::TempFileSystem;
use kxio::{fs::FileSystem, kxeprintln as e, kxprintln as p, net::Net, print::Printer};
use execute::Execute;
use tokio::time::Instant;
mod api_result;
mod check;
@ -19,6 +19,7 @@ mod import;
mod init;
pub mod macros;
mod nextcloud;
mod rate_limit;
mod template;
mod trello;
@ -86,6 +87,13 @@ pub(crate) struct FullCtx {
pub prt: Printer,
pub cfg: AppConfig,
}
impl FullCtx {
pub(crate) fn now(&self) -> Instant {
Instant::now()
}
}
impl FullCtx {
pub(crate) const fn deck_client(&self) -> DeckClient {
DeckClient::new(self)

15
src/macros/message.rs Normal file
View file

@ -0,0 +1,15 @@
#[macro_export]
macro_rules! message {
($name:ident, $value:ty, $docs:literal) => {
$crate::newtype!($name, $value, $docs);
};
($name:ident, $docs:literal) => {
$crate::newtype!($name, $docs);
};
($name:ident, $value:ty => $result:ty, $docs:literal) => {
$crate::newtype!($name, $value, $docs);
};
($name:ident => $result:ty, $docs:literal) => {
$crate::newtype!($name, $docs);
};
}

View file

@ -1,6 +1,7 @@
//
mod actor;
mod backoff;
mod message;
mod newtype;
mod print;
mod send;

96
src/rate_limit.rs Normal file
View file

@ -0,0 +1,96 @@
use std::time::Duration;
use tokio::time::Instant;
use tracing::instrument;
#[derive(Debug)]
pub(crate) struct RateLimit {
/// The name of the limit
pub(crate) name: String,
/// Last token was consumed at
pub(crate) last_consumed_at: Instant,
/// The number of tokens available after the last was consumed
pub(crate) last_token_count: u32,
/// Maximum tokens we can have at any one time
pub(crate) max: u32,
/// Tokens replenish rate
pub(crate) tokens_per_second: f32,
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum Token {
Available,
NotAvailable { sleep_nanos: u64 },
}
impl RateLimit {
// 300 tokens / 10 seconds => max 300, tokens_per_second 30
pub(crate) fn new(
name: impl Into<String>,
max: u32,
tokens_per_second: f32,
now: Instant,
) -> Self {
Self {
name: name.into(),
last_consumed_at: now,
last_token_count: max,
max,
tokens_per_second,
}
}
pub(crate) async fn block_until_token_available(&mut self, now: Instant) {
let mut now = now;
loop {
match self.acquire_token(now) {
Token::Available => {
return;
}
Token::NotAvailable { sleep_nanos } => {
tracing::debug!("sleeping for {}ms", sleep_nanos / 1_000_000);
tokio::time::sleep(Duration::from_nanos(sleep_nanos)).await;
now += Duration::from_nanos(sleep_nanos);
}
}
}
}
#[instrument]
pub(crate) fn acquire_token(&mut self, now: Instant) -> Token {
debug_assert!(now > self.last_consumed_at, "time must pass");
let elapsed_nanos = now.duration_since(self.last_consumed_at).as_nanos();
tracing::trace!(?elapsed_nanos, "ELAPSED");
// Calculate new tokens generated since last consumption
let nanos_per_token = (Duration::from_secs(1).as_nanos() as f32) / self.tokens_per_second;
let new_tokens = (elapsed_nanos as f32 / nanos_per_token) as u32;
let current_tokens = (self.last_token_count + new_tokens).min(self.max);
tracing::trace!(%elapsed_nanos, %nanos_per_token, %new_tokens, %current_tokens, "tokens?");
if current_tokens > 0 {
self.last_token_count = current_tokens - 1;
self.last_consumed_at = now;
tracing::trace!("token available");
return Token::Available;
}
// Calculate time until next token is available
let time_to_next = if current_tokens == 0 {
let tokens_needed = 1;
let seconds = (tokens_needed as f64) / (self.tokens_per_second as f64);
tracing::trace!(?seconds, "time until next token");
Duration::from_secs_f64(seconds)
} else {
Duration::from_secs(0)
};
tracing::trace!(?time_to_next, "token not available");
println!("[{}] sleep {}ms", self.name, time_to_next.as_millis());
Token::NotAvailable {
sleep_nanos: time_to_next.as_nanos() as u64,
}
}
}

View file

@ -13,6 +13,7 @@ mod check;
mod config;
pub(crate) mod given;
mod init;
mod rate_limit;
mod template;
#[test]

141
src/tests/rate_limit.rs Normal file
View file

@ -0,0 +1,141 @@
//
use std::ops::Add;
use tokio::time::{Duration, Instant};
use crate::rate_limit::{RateLimit, Token};
#[tokio::test]
async fn test_initial_token_available() {
//given
let mut rate_limit = RateLimit::new("Test", 10, 10.0, Instant::now()); // 10 tokens per second, max 10 tokens
//when
let token = rate_limit.acquire_token(Instant::now());
//then
assert_eq!(token, Token::Available);
}
#[tokio::test]
async fn test_max_tokens_respected() {
//given
let start = Instant::now();
let mut rate_limit = RateLimit::new("Test", 5, 10.0, start); // 10 tokens per second, max 5 tokens
//when
let token = rate_limit.acquire_token(start.add(Duration::from_secs(1)));
//then
assert_eq!(token, Token::Available);
assert_eq!(rate_limit.last_token_count, 4); // Should be max-1 after consuming one token
}
#[tokio::test]
async fn test_token_exhaustion() {
//given
let mut rate_limit = RateLimit::new("Test", 1, 1.0, Instant::now()); // 1 token per second, max 1 token
// Consume the only available token
assert_eq!(rate_limit.acquire_token(Instant::now()), Token::Available);
//when
let token = rate_limit.acquire_token(Instant::now());
//then
// Next request should not be available
match token {
Token::NotAvailable { sleep_nanos } => {
assert!(sleep_nanos > 0);
}
Token::Available => panic!("Should not have token available"),
}
}
#[test]
fn add_duration_to_instant() {
//given
let start = Instant::now();
//when
let later = start.add(Duration::from_secs(1));
//then
assert!(later > start);
assert!(later.duration_since(start) == Duration::from_secs(1));
}
#[test_log::test(tokio::test)]
async fn test_token_regeneration() {
//given
let mut rate_limit = RateLimit::new("Test", 1, 1.0, Instant::now()); // 1 token per second, max 1 token
tracing::info!("Consume initial token");
assert_eq!(rate_limit.acquire_token(Instant::now()), Token::Available);
//when
let later = Instant::now().add(Duration::from_millis(1100));
tracing::info!(later=?later.elapsed().as_nanos(), "Wait for more than 1 second");
let token = rate_limit.acquire_token(later); // 1.1 seconds
//then
// Should have generated a new token
assert_eq!(token, Token::Available);
}
#[test_log::test(tokio::test)]
async fn test_block_until_token_available() {
//given
let mut rate_limit = RateLimit::new("Test", 1, 2.0, Instant::now()); // 2 tokens per second, max 1 token
tracing::info!("consume initial token");
rate_limit.block_until_token_available(Instant::now()).await; // Consume initial tokens
//then
tracing::info!("blocking until next token available");
let start = Instant::now();
tokio::select! {
_ = rate_limit.block_until_token_available(Instant::now()) => { // This should block for approximately 0.5 seconds
tracing::info!("received token");
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
panic!("Timeout waiting for token");
}
}
let elapsed = start.elapsed();
//then
assert!(elapsed >= Duration::from_millis(450), "{elapsed:?}")
}
#[tokio::test]
async fn test_multiple_concurrent_requests() {
let rate_limit = std::sync::Arc::new(tokio::sync::Mutex::new(
RateLimit::new("Test", 2, 2.0, Instant::now()), // 2 tokens per second, max 1 token
));
let mut handles = vec![];
// Launch 3 concurrent requests
tokio::select! {
_ = {
// should take ~0.5 second for third token to become available
async{
for _ in 0..3 {
let rate_limit_clone = rate_limit.clone();
handles.push(tokio::spawn(async move {
let mut rate_limit = rate_limit_clone.lock().await;
rate_limit.block_until_token_available(Instant::now()).await;
}));
}
// Wait for all requests to complete
for handle in handles {
handle.await.unwrap();
}
}
} => {
tracing::info!("concurrent requests okay");
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
panic!("Timeout waiting for token");
}
}
}

View file

@ -97,6 +97,18 @@ impl<'ctx> TrelloClient<'ctx> {
.bytes()
.await
.context("downloading attachment")?;
let Some(expected_file_size) = attachment.bytes else {
crate::e!(self.ctx.prt, "==> no file size in attachment");
return Err(color_eyre::eyre::eyre!("no file size in attachment"));
};
if resp.len() != expected_file_size {
crate::e!(
self.ctx.prt,
"==> expected file size: {} != actual file size: {}",
expected_file_size,
resp.len()
);
}
let file = fs.file(&file_name);
file.write(resp)
.map_err(|e| {

View file

@ -23,6 +23,7 @@ newtype!(
pub(crate) struct TrelloAttachment {
pub(crate) id: TrelloAttachmentId, // "5abbe4b7ddc1b351ef961414",
pub(crate) name: TrelloAttachmentName, //"Deprecation Extension Notice",
pub(crate) bytes: Option<usize>,
pub(crate) url: TrelloAttachmentUrl, //"https://admin.typeform.com/form/RzExEM/share#/link",
#[serde(rename = "fileName")]
pub(crate) file_name: TrelloAttachmentFilename,