diff --git a/Cargo.toml b/Cargo.toml index 1fab1e7..4b5385e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,11 @@ repository = "https://git.kemitix.net/kemitix/kx-utils" [features] default = [] -use-kameo = ["kameo", "tokio"] +use-kameo = ["kameo", "tracing", "tokio"] use-kxio = ["kxio"] [dependencies] kxio = { version = "5.0", optional = true } kameo = { version = "0.13", optional = true } +tracing = { version = "0.1", optional = true } tokio = { version = "1.43", optional = true } diff --git a/src/experimental/kameo/actor.rs b/src/experimental/kameo/actor.rs new file mode 100644 index 0000000..2e81570 --- /dev/null +++ b/src/experimental/kameo/actor.rs @@ -0,0 +1,209 @@ +// + +/// Called when the actor starts, before it processes any messages. +/// +/// Messages sent internally by the actor during `on_start` are prioritized and processed +/// before any externally sent messages, even if external messages are received first. +/// +/// This ensures that the actor can properly initialize before handling external messages. +/// +/// # Example +/// +/// ```rust +/// use kx_utils::on_actor_start; +/// struct ServerActor; +/// impl kameo::Actor for ServerActor { +/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox; +/// +/// on_actor_start!(this, actor_ref, { +/// // handle start here +/// Ok(()) +/// }); +/// } +/// ``` +#[macro_export] +macro_rules! on_actor_start { + ($this:ident, $actor_ref:ident, $body:expr) => { + #[allow(unused_variables)] + #[tracing::instrument(skip_all)] + async fn on_start( + &mut self, + actor_ref: kameo::actor::ActorRef, + ) -> std::result::Result<(), kameo::error::BoxError> { + tracing::debug!(?actor_ref, "{}", ::name()); + let $this = self; + let $actor_ref = actor_ref; + $body + } + }; +} + +#[macro_export] +macro_rules! default_on_actor_start { + ($this:ident, $actor_ref:ident) => { + $crate::on_actor_start!($this, $actor_ref, { Ok(()) }); + }; +} + +/// Called swhen the actor encounters a panic or an error during "tell" message handling. +/// +/// This method gives the actor an opportunity to clean up or reset its state and determine +/// whether it should be stopped or continue processing messages. +/// +/// # Parameters +/// - `err`: The panic or error that occurred. +/// +/// # Returns +/// - `Some(ActorStopReason)`: Stops the actor. +/// - `None`: Allows the actor to continue processing messages. +/// +/// # Example +/// +/// ```rust +/// use kx_utils::on_actor_panic; +/// struct ServerActor; +/// impl kameo::Actor for ServerActor { +/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox; +/// +/// on_actor_panic!(this, actor_ref, err, { +/// // handle panic here +/// Ok(None) +/// }); +/// } +/// ``` +#[macro_export] +macro_rules! on_actor_panic { + ($this:ident, $actor_ref:ident, $err:ident, $body:expr) => { + #[allow(unused_variables)] + #[tracing::instrument(skip_all)] + async fn on_panic( + &mut self, + actor_ref: kameo::actor::WeakActorRef, + err: kameo::error::PanicError, + ) -> std::result::Result, kameo::error::BoxError> { + tracing::debug!(?actor_ref, %err, "{}", ::name()); + let $this = self; + let $actor_ref = actor_ref; + let $err = err; + $body + } + }; +} + +#[macro_export] +macro_rules! default_on_actor_panic { + ($this:ident, $actor_ref:ident, $err:ident) => { + $crate::on_actor_panic!($this, $actor_ref, $err, { + Ok(Some(kameo::error::ActorStopReason::Panicked($err))) + }); + }; +} + +/// Called when a linked actor dies. +/// +/// By default, the actor will stop if the reason for the linked actor's death is anything other +/// than `Normal`. You can customize this behavior in the implementation. +/// +/// # Returns +/// Whether the actor should stop or continue processing messages. +/// +/// # Example +/// +/// ```rust +/// use kx_utils::on_actor_link_died; +/// struct ServerActor; +/// impl kameo::Actor for ServerActor { +/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox; +/// +/// on_actor_link_died!(this, actor_ref, id, reason, { +/// // handle link death here +/// Ok(None) +/// }); +/// } +/// ``` +#[macro_export] +macro_rules! on_actor_link_died { + ($this:ident, $actor_ref:ident, $id:ident, $reason:ident, $body:expr) => { + #[allow(unused_variables)] + #[tracing::instrument(skip_all)] + async fn on_link_died( + &mut self, + actor_ref: kameo::actor::WeakActorRef, + id: kameo::actor::ActorID, + reason: kameo::error::ActorStopReason, + ) -> std::result::Result, kameo::error::BoxError> { + tracing::debug!(?actor_ref, %id, %reason, "{}", ::name()); + let $this = self; + let $actor_ref = actor_ref; + let $id = id; + let $reason = reason; + $body + } + }; +} + +#[macro_export] +macro_rules! default_on_actor_link_died { + ($this:ident, $actor_ref:ident, $id:ident, $reason:ident) => { + $crate::on_actor_link_died!($this, $actor_ref, $id, $reason, { + match &$reason { + kameo::error::ActorStopReason::Normal => Ok(None), + kameo::error::ActorStopReason::Killed + | kameo::error::ActorStopReason::Panicked(_) + | kameo::error::ActorStopReason::LinkDied { .. } => { + Ok(Some(kameo::error::ActorStopReason::LinkDied { + id: $id, + reason: Box::new($reason), + })) + } + } + }); + }; +} + +/// Called before the actor stops. +/// +/// This allows the actor to perform any necessary cleanup or release resources before being fully stopped. +/// +/// # Parameters +/// - `reason`: The reason why the actor is being stopped. +/// +/// # Example +/// +/// ```rust +/// use kx_utils::on_actor_stop; +/// struct ServerActor; +/// impl kameo::Actor for ServerActor { +/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox; +/// +/// on_actor_stop!(this, actor_ref, reason, { +/// // handle stop here +/// Ok(()) +/// }); +/// } +/// ``` +#[macro_export] +macro_rules! on_actor_stop { + ($this:ident, $actor_ref:ident, $reason:ident, $body:expr) => { + #[allow(unused_variables)] + #[tracing::instrument(skip_all)] + async fn on_stop( + &mut self, + actor_ref: kameo::actor::WeakActorRef, + reason: kameo::error::ActorStopReason, + ) -> std::result::Result<(), kameo::error::BoxError> { + tracing::debug!(?actor_ref, %reason, "{}", ::name()); + let $this = self; + let $actor_ref = actor_ref; + let $reason = reason; + $body + } + }; +} + +#[macro_export] +macro_rules! default_on_actor_stop { + ($this:ident, $actor_ref:ident, $reason:ident) => { + $crate::on_actor_stop!($this, $actor_ref, $reason, { Ok(()) }); + }; +} diff --git a/src/experimental/kameo/message.rs b/src/experimental/kameo/message.rs new file mode 100644 index 0000000..5a0fdc2 --- /dev/null +++ b/src/experimental/kameo/message.rs @@ -0,0 +1,15 @@ +#[macro_export] +macro_rules! message { + ($name:ident, $value:ty, $docs:literal) => { + $crate::newtype!($name, $value, $docs); + }; + ($name:ident, $docs:literal) => { + $crate::newtype!($name, $docs); + }; + ($name:ident, $value:ty => $result:ty, $docs:literal) => { + $crate::newtype!($name, $value, $docs); + }; + ($name:ident => $result:ty, $docs:literal) => { + $crate::newtype!($name, $docs); + }; +} diff --git a/src/experimental/kameo/mod.rs b/src/experimental/kameo/mod.rs new file mode 100644 index 0000000..bb84ff3 --- /dev/null +++ b/src/experimental/kameo/mod.rs @@ -0,0 +1,5 @@ +// +mod actor; +mod message; +mod send; +mod spawn; diff --git a/src/experimental/kameo/send.rs b/src/experimental/kameo/send.rs new file mode 100644 index 0000000..583fa49 --- /dev/null +++ b/src/experimental/kameo/send.rs @@ -0,0 +1,64 @@ +// +#[macro_export] +macro_rules! tell { + ($actor_ref:expr, $message:expr) => { + tell!(stringify!($actor_ref), $actor_ref, $message) + }; + ($actor_name:expr, $actor_ref:expr, $message:expr) => {{ + tracing::debug!(actor = $actor_name, msg = stringify!($message), "send"); + $actor_ref.tell($message).await + }}; +} + +#[macro_export] +macro_rules! ask { + ($actor_ref:expr, $message:expr) => { + ask!(stringify!($actor_ref), $actor_ref, $message) + }; + ($actor_name:expr, $actor_ref:expr, $message:expr) => {{ + tracing::debug!(actor = $actor_name, msg = stringify!($message), "send"); + $actor_ref.ask($message).await + }}; +} + +#[macro_export] +macro_rules! subscribe { + ($message_bus:expr, $actor_ref:expr) => { + subscribe!( + stringify!($message_bus), + $message_bus, + stringify!($actor_ref), + $actor_ref + ) + }; + ($message_bus:expr, $actor_name:expr, $actor_ref:expr) => { + subscribe!( + stringify!($message_bus), + $message_bus, + $actor_name, + $actor_ref + ) + }; + ($bus_name:expr, $message_bus:expr, $actor_ref:expr) => { + subscribe!($bus_name, $message_bus, stringify!($actor_ref), $actor_ref) + }; + ($bus_name:expr, $message_bus:expr, $actor_name:expr, $actor_ref:expr) => {{ + tracing::debug!(msg_bus = $bus_name, actor = $actor_name, "subscribe"); + $message_bus + .tell(kameo::actor::pubsub::Subscribe($actor_ref)) + .await + }}; +} + +#[macro_export] +macro_rules! publish { + ($message_bus:expr, $message:expr) => { + publish!(stringify!($message_bus), $message_bus, $message) + }; + ($bus_name:expr, $message_bus:expr, $message:expr) => {{ + tracing::debug!(bus = $bus_name, msg = stringify!($message), "publish"); + $message_bus + .tell(kameo::actor::pubsub::Publish($message)) + .await + }}; +} diff --git a/src/experimental/kameo/spawn.rs b/src/experimental/kameo/spawn.rs new file mode 100644 index 0000000..0a288fd --- /dev/null +++ b/src/experimental/kameo/spawn.rs @@ -0,0 +1,43 @@ +// +/// spawns a new actor and sets up bi-directional links +#[macro_export] +macro_rules! spawn { + ($parent:expr, $actor:expr) => {{ + tracing::debug!("spawning : {}", $crate::stringify_expr!($actor)); + let new_actor_ref = kameo::spawn($actor); + new_actor_ref.link(&$parent).await; + $parent.link(&new_actor_ref).await; + tracing::debug!("spawned : {}", $crate::stringify_expr!($actor)); + new_actor_ref + }}; +} + +#[macro_export] +macro_rules! stringify_expr { + ($expr:expr) => { + stringify!($expr).lines().collect::>().join(" ") + }; +} + +#[test] +fn remove_line_breaks() { + let out = stringify_expr!([ + "line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3", + "line 1", "line 2", "line 3", + ]); + let expected = r#"["line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3",]"#; + + assert_eq!(out, expected); +} + +#[macro_export] +macro_rules! spawn_in_thread { + ($parent:expr, $actor:expr) => {{ + tracing::debug!("spawning in thread : {}", $crate::stringify_expr!($actor)); + let new_actor_ref = kameo::actor::spawn_in_thread($actor); + new_actor_ref.link(&$parent).await; + $parent.link(&new_actor_ref).await; + tracing::debug!("spawned in thread : {}", $crate::stringify_expr!($actor)); + new_actor_ref + }}; +} diff --git a/src/experimental/mod.rs b/src/experimental/mod.rs index 3571261..d3aaa0e 100644 --- a/src/experimental/mod.rs +++ b/src/experimental/mod.rs @@ -1,4 +1,6 @@ // +#[cfg(feature = "use-kameo")] +mod kameo; #[cfg(feature = "use-kxio")] mod kxio;