feat: add module experimental::kameo
Some checks failed
Test / build (map[name:stable]) (push) Failing after 56s
Test / build (map[name:nightly]) (push) Failing after 57s

This commit is contained in:
Paul Campbell 2025-01-08 21:05:51 +00:00
parent 6dc172d6a1
commit 9f13fb1dba
6 changed files with 338 additions and 0 deletions

View file

@ -0,0 +1,209 @@
//
/// Called when the actor starts, before it processes any messages.
///
/// Messages sent internally by the actor during `on_start` are prioritized and processed
/// before any externally sent messages, even if external messages are received first.
///
/// This ensures that the actor can properly initialize before handling external messages.
///
/// # Example
///
/// ```rust
/// use trello_to_deck::on_actor_start;
/// struct ServerActor;
/// impl kameo::Actor for ServerActor {
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
///
/// on_actor_start!(this, actor_ref, {
/// // handle start here
/// Ok(())
/// });
/// }
/// ```
#[macro_export]
macro_rules! on_actor_start {
($this:ident, $actor_ref:ident, $body:expr) => {
#[allow(unused_variables)]
#[tracing::instrument(skip_all)]
async fn on_start(
&mut self,
actor_ref: kameo::actor::ActorRef<Self>,
) -> std::result::Result<(), kameo::error::BoxError> {
tracing::debug!(?actor_ref, "{}", <Self as kameo::Actor>::name());
let $this = self;
let $actor_ref = actor_ref;
$body
}
};
}
#[macro_export]
macro_rules! default_on_actor_start {
($this:ident, $actor_ref:ident) => {
$crate::on_actor_start!($this, $actor_ref, { Ok(()) });
};
}
/// Called swhen the actor encounters a panic or an error during "tell" message handling.
///
/// This method gives the actor an opportunity to clean up or reset its state and determine
/// whether it should be stopped or continue processing messages.
///
/// # Parameters
/// - `err`: The panic or error that occurred.
///
/// # Returns
/// - `Some(ActorStopReason)`: Stops the actor.
/// - `None`: Allows the actor to continue processing messages.
///
/// # Example
///
/// ```rust
/// use trello_to_deck::on_actor_panic;
/// struct ServerActor;
/// impl kameo::Actor for ServerActor {
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
///
/// on_actor_panic!(this, actor_ref, err, {
/// // handle panic here
/// Ok(None)
/// });
/// }
/// ```
#[macro_export]
macro_rules! on_actor_panic {
($this:ident, $actor_ref:ident, $err:ident, $body:expr) => {
#[allow(unused_variables)]
#[tracing::instrument(skip_all)]
async fn on_panic(
&mut self,
actor_ref: kameo::actor::WeakActorRef<Self>,
err: kameo::error::PanicError,
) -> std::result::Result<Option<kameo::error::ActorStopReason>, kameo::error::BoxError> {
tracing::debug!(?actor_ref, %err, "{}", <Self as kameo::Actor>::name());
let $this = self;
let $actor_ref = actor_ref;
let $err = err;
$body
}
};
}
#[macro_export]
macro_rules! default_on_actor_panic {
($this:ident, $actor_ref:ident, $err:ident) => {
$crate::on_actor_panic!($this, $actor_ref, $err, {
Ok(Some(kameo::error::ActorStopReason::Panicked($err)))
});
};
}
/// Called when a linked actor dies.
///
/// By default, the actor will stop if the reason for the linked actor's death is anything other
/// than `Normal`. You can customize this behavior in the implementation.
///
/// # Returns
/// Whether the actor should stop or continue processing messages.
///
/// # Example
///
/// ```rust
/// use trello_to_deck::on_actor_link_died;
/// struct ServerActor;
/// impl kameo::Actor for ServerActor {
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
///
/// on_actor_link_died!(this, actor_ref, id, reason, {
/// // handle link death here
/// Ok(None)
/// });
/// }
/// ```
#[macro_export]
macro_rules! on_actor_link_died {
($this:ident, $actor_ref:ident, $id:ident, $reason:ident, $body:expr) => {
#[allow(unused_variables)]
#[tracing::instrument(skip_all)]
async fn on_link_died(
&mut self,
actor_ref: kameo::actor::WeakActorRef<Self>,
id: kameo::actor::ActorID,
reason: kameo::error::ActorStopReason,
) -> std::result::Result<Option<kameo::error::ActorStopReason>, kameo::error::BoxError> {
tracing::debug!(?actor_ref, %id, %reason, "{}", <Self as kameo::Actor>::name());
let $this = self;
let $actor_ref = actor_ref;
let $id = id;
let $reason = reason;
$body
}
};
}
#[macro_export]
macro_rules! default_on_actor_link_died {
($this:ident, $actor_ref:ident, $id:ident, $reason:ident) => {
$crate::on_actor_link_died!($this, $actor_ref, $id, $reason, {
match &$reason {
kameo::error::ActorStopReason::Normal => Ok(None),
kameo::error::ActorStopReason::Killed
| kameo::error::ActorStopReason::Panicked(_)
| kameo::error::ActorStopReason::LinkDied { .. } => {
Ok(Some(kameo::error::ActorStopReason::LinkDied {
id: $id,
reason: Box::new($reason),
}))
}
}
});
};
}
/// Called before the actor stops.
///
/// This allows the actor to perform any necessary cleanup or release resources before being fully stopped.
///
/// # Parameters
/// - `reason`: The reason why the actor is being stopped.
///
/// # Example
///
/// ```rust
/// use trello_to_deck::on_actor_stop;
/// struct ServerActor;
/// impl kameo::Actor for ServerActor {
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
///
/// on_actor_stop!(this, actor_ref, reason, {
/// // handle stop here
/// Ok(())
/// });
/// }
/// ```
#[macro_export]
macro_rules! on_actor_stop {
($this:ident, $actor_ref:ident, $reason:ident, $body:expr) => {
#[allow(unused_variables)]
#[tracing::instrument(skip_all)]
async fn on_stop(
&mut self,
actor_ref: kameo::actor::WeakActorRef<Self>,
reason: kameo::error::ActorStopReason,
) -> std::result::Result<(), kameo::error::BoxError> {
tracing::debug!(?actor_ref, %reason, "{}", <Self as kameo::Actor>::name());
let $this = self;
let $actor_ref = actor_ref;
let $reason = reason;
$body
}
};
}
#[macro_export]
macro_rules! default_on_actor_stop {
($this:ident, $actor_ref:ident, $reason:ident) => {
$crate::on_actor_stop!($this, $actor_ref, $reason, { Ok(()) });
};
}

View file

@ -0,0 +1,15 @@
#[macro_export]
macro_rules! message {
($name:ident, $value:ty, $docs:literal) => {
$crate::newtype!($name, $value, $docs);
};
($name:ident, $docs:literal) => {
$crate::newtype!($name, $docs);
};
($name:ident, $value:ty => $result:ty, $docs:literal) => {
$crate::newtype!($name, $value, $docs);
};
($name:ident => $result:ty, $docs:literal) => {
$crate::newtype!($name, $docs);
};
}

View file

@ -0,0 +1,5 @@
//
mod actor;
mod message;
mod send;
mod spawn;

View file

@ -0,0 +1,64 @@
//
#[macro_export]
macro_rules! tell {
($actor_ref:expr, $message:expr) => {
tell!(stringify!($actor_ref), $actor_ref, $message)
};
($actor_name:expr, $actor_ref:expr, $message:expr) => {{
tracing::debug!(actor = $actor_name, msg = stringify!($message), "send");
$actor_ref.tell($message).await
}};
}
#[macro_export]
macro_rules! ask {
($actor_ref:expr, $message:expr) => {
ask!(stringify!($actor_ref), $actor_ref, $message)
};
($actor_name:expr, $actor_ref:expr, $message:expr) => {{
tracing::debug!(actor = $actor_name, msg = stringify!($message), "send");
$actor_ref.ask($message).await
}};
}
#[macro_export]
macro_rules! subscribe {
($message_bus:expr, $actor_ref:expr) => {
subscribe!(
stringify!($message_bus),
$message_bus,
stringify!($actor_ref),
$actor_ref
)
};
($message_bus:expr, $actor_name:expr, $actor_ref:expr) => {
subscribe!(
stringify!($message_bus),
$message_bus,
$actor_name,
$actor_ref
)
};
($bus_name:expr, $message_bus:expr, $actor_ref:expr) => {
subscribe!($bus_name, $message_bus, stringify!($actor_ref), $actor_ref)
};
($bus_name:expr, $message_bus:expr, $actor_name:expr, $actor_ref:expr) => {{
tracing::debug!(msg_bus = $bus_name, actor = $actor_name, "subscribe");
$message_bus
.tell(kameo::actor::pubsub::Subscribe($actor_ref))
.await
}};
}
#[macro_export]
macro_rules! publish {
($message_bus:expr, $message:expr) => {
publish!(stringify!($message_bus), $message_bus, $message)
};
($bus_name:expr, $message_bus:expr, $message:expr) => {{
tracing::debug!(bus = $bus_name, msg = stringify!($message), "publish");
$message_bus
.tell(kameo::actor::pubsub::Publish($message))
.await
}};
}

View file

@ -0,0 +1,43 @@
//
/// spawns a new actor and sets up bi-directional links
#[macro_export]
macro_rules! spawn {
($parent:expr, $actor:expr) => {{
tracing::debug!("spawning : {}", $crate::stringify_expr!($actor));
let new_actor_ref = kameo::spawn($actor);
new_actor_ref.link(&$parent).await;
$parent.link(&new_actor_ref).await;
tracing::debug!("spawned : {}", $crate::stringify_expr!($actor));
new_actor_ref
}};
}
#[macro_export]
macro_rules! stringify_expr {
($expr:expr) => {
stringify!($expr).lines().collect::<Vec<_>>().join(" ")
};
}
#[test]
fn remove_line_breaks() {
let out = stringify_expr!([
"line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3",
"line 1", "line 2", "line 3",
]);
let expected = r#"["line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3",]"#;
assert_eq!(out, expected);
}
#[macro_export]
macro_rules! spawn_in_thread {
($parent:expr, $actor:expr) => {{
tracing::debug!("spawning in thread : {}", $crate::stringify_expr!($actor));
let new_actor_ref = kameo::actor::spawn_in_thread($actor);
new_actor_ref.link(&$parent).await;
$parent.link(&new_actor_ref).await;
tracing::debug!("spawned in thread : {}", $crate::stringify_expr!($actor));
new_actor_ref
}};
}

View file

@ -1,4 +1,6 @@
// //
#[cfg(feature = "use-kameo")]
mod kameo;
#[cfg(feature = "use-kxio")] #[cfg(feature = "use-kxio")]
mod kxio; mod kxio;