feat: add module experimental::kameo
This commit is contained in:
parent
6dc172d6a1
commit
c48543d5d3
7 changed files with 340 additions and 1 deletions
|
@ -9,10 +9,11 @@ repository = "https://git.kemitix.net/kemitix/kx-utils"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
use-kameo = ["kameo", "tokio"]
|
use-kameo = ["kameo", "tracing", "tokio"]
|
||||||
use-kxio = ["kxio"]
|
use-kxio = ["kxio"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
kxio = { version = "5.0", optional = true }
|
kxio = { version = "5.0", optional = true }
|
||||||
kameo = { version = "0.13", optional = true }
|
kameo = { version = "0.13", optional = true }
|
||||||
|
tracing = { version = "0.1", optional = true }
|
||||||
tokio = { version = "1.43", optional = true }
|
tokio = { version = "1.43", optional = true }
|
||||||
|
|
209
src/experimental/kameo/actor.rs
Normal file
209
src/experimental/kameo/actor.rs
Normal file
|
@ -0,0 +1,209 @@
|
||||||
|
//
|
||||||
|
|
||||||
|
/// Called when the actor starts, before it processes any messages.
|
||||||
|
///
|
||||||
|
/// Messages sent internally by the actor during `on_start` are prioritized and processed
|
||||||
|
/// before any externally sent messages, even if external messages are received first.
|
||||||
|
///
|
||||||
|
/// This ensures that the actor can properly initialize before handling external messages.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use kx_utils::on_actor_start;
|
||||||
|
/// struct ServerActor;
|
||||||
|
/// impl kameo::Actor for ServerActor {
|
||||||
|
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||||
|
///
|
||||||
|
/// on_actor_start!(this, actor_ref, {
|
||||||
|
/// // handle start here
|
||||||
|
/// Ok(())
|
||||||
|
/// });
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! on_actor_start {
|
||||||
|
($this:ident, $actor_ref:ident, $body:expr) => {
|
||||||
|
#[allow(unused_variables)]
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
async fn on_start(
|
||||||
|
&mut self,
|
||||||
|
actor_ref: kameo::actor::ActorRef<Self>,
|
||||||
|
) -> std::result::Result<(), kameo::error::BoxError> {
|
||||||
|
tracing::debug!(?actor_ref, "{}", <Self as kameo::Actor>::name());
|
||||||
|
let $this = self;
|
||||||
|
let $actor_ref = actor_ref;
|
||||||
|
$body
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! default_on_actor_start {
|
||||||
|
($this:ident, $actor_ref:ident) => {
|
||||||
|
$crate::on_actor_start!($this, $actor_ref, { Ok(()) });
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called swhen the actor encounters a panic or an error during "tell" message handling.
|
||||||
|
///
|
||||||
|
/// This method gives the actor an opportunity to clean up or reset its state and determine
|
||||||
|
/// whether it should be stopped or continue processing messages.
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
/// - `err`: The panic or error that occurred.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// - `Some(ActorStopReason)`: Stops the actor.
|
||||||
|
/// - `None`: Allows the actor to continue processing messages.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use kx_utils::on_actor_panic;
|
||||||
|
/// struct ServerActor;
|
||||||
|
/// impl kameo::Actor for ServerActor {
|
||||||
|
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||||
|
///
|
||||||
|
/// on_actor_panic!(this, actor_ref, err, {
|
||||||
|
/// // handle panic here
|
||||||
|
/// Ok(None)
|
||||||
|
/// });
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! on_actor_panic {
|
||||||
|
($this:ident, $actor_ref:ident, $err:ident, $body:expr) => {
|
||||||
|
#[allow(unused_variables)]
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
async fn on_panic(
|
||||||
|
&mut self,
|
||||||
|
actor_ref: kameo::actor::WeakActorRef<Self>,
|
||||||
|
err: kameo::error::PanicError,
|
||||||
|
) -> std::result::Result<Option<kameo::error::ActorStopReason>, kameo::error::BoxError> {
|
||||||
|
tracing::debug!(?actor_ref, %err, "{}", <Self as kameo::Actor>::name());
|
||||||
|
let $this = self;
|
||||||
|
let $actor_ref = actor_ref;
|
||||||
|
let $err = err;
|
||||||
|
$body
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! default_on_actor_panic {
|
||||||
|
($this:ident, $actor_ref:ident, $err:ident) => {
|
||||||
|
$crate::on_actor_panic!($this, $actor_ref, $err, {
|
||||||
|
Ok(Some(kameo::error::ActorStopReason::Panicked($err)))
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called when a linked actor dies.
|
||||||
|
///
|
||||||
|
/// By default, the actor will stop if the reason for the linked actor's death is anything other
|
||||||
|
/// than `Normal`. You can customize this behavior in the implementation.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// Whether the actor should stop or continue processing messages.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use kx_utils::on_actor_link_died;
|
||||||
|
/// struct ServerActor;
|
||||||
|
/// impl kameo::Actor for ServerActor {
|
||||||
|
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||||
|
///
|
||||||
|
/// on_actor_link_died!(this, actor_ref, id, reason, {
|
||||||
|
/// // handle link death here
|
||||||
|
/// Ok(None)
|
||||||
|
/// });
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! on_actor_link_died {
|
||||||
|
($this:ident, $actor_ref:ident, $id:ident, $reason:ident, $body:expr) => {
|
||||||
|
#[allow(unused_variables)]
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
async fn on_link_died(
|
||||||
|
&mut self,
|
||||||
|
actor_ref: kameo::actor::WeakActorRef<Self>,
|
||||||
|
id: kameo::actor::ActorID,
|
||||||
|
reason: kameo::error::ActorStopReason,
|
||||||
|
) -> std::result::Result<Option<kameo::error::ActorStopReason>, kameo::error::BoxError> {
|
||||||
|
tracing::debug!(?actor_ref, %id, %reason, "{}", <Self as kameo::Actor>::name());
|
||||||
|
let $this = self;
|
||||||
|
let $actor_ref = actor_ref;
|
||||||
|
let $id = id;
|
||||||
|
let $reason = reason;
|
||||||
|
$body
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! default_on_actor_link_died {
|
||||||
|
($this:ident, $actor_ref:ident, $id:ident, $reason:ident) => {
|
||||||
|
$crate::on_actor_link_died!($this, $actor_ref, $id, $reason, {
|
||||||
|
match &$reason {
|
||||||
|
kameo::error::ActorStopReason::Normal => Ok(None),
|
||||||
|
kameo::error::ActorStopReason::Killed
|
||||||
|
| kameo::error::ActorStopReason::Panicked(_)
|
||||||
|
| kameo::error::ActorStopReason::LinkDied { .. } => {
|
||||||
|
Ok(Some(kameo::error::ActorStopReason::LinkDied {
|
||||||
|
id: $id,
|
||||||
|
reason: Box::new($reason),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called before the actor stops.
|
||||||
|
///
|
||||||
|
/// This allows the actor to perform any necessary cleanup or release resources before being fully stopped.
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
/// - `reason`: The reason why the actor is being stopped.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use kx_utils::on_actor_stop;
|
||||||
|
/// struct ServerActor;
|
||||||
|
/// impl kameo::Actor for ServerActor {
|
||||||
|
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||||
|
///
|
||||||
|
/// on_actor_stop!(this, actor_ref, reason, {
|
||||||
|
/// // handle stop here
|
||||||
|
/// Ok(())
|
||||||
|
/// });
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! on_actor_stop {
|
||||||
|
($this:ident, $actor_ref:ident, $reason:ident, $body:expr) => {
|
||||||
|
#[allow(unused_variables)]
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
async fn on_stop(
|
||||||
|
&mut self,
|
||||||
|
actor_ref: kameo::actor::WeakActorRef<Self>,
|
||||||
|
reason: kameo::error::ActorStopReason,
|
||||||
|
) -> std::result::Result<(), kameo::error::BoxError> {
|
||||||
|
tracing::debug!(?actor_ref, %reason, "{}", <Self as kameo::Actor>::name());
|
||||||
|
let $this = self;
|
||||||
|
let $actor_ref = actor_ref;
|
||||||
|
let $reason = reason;
|
||||||
|
$body
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! default_on_actor_stop {
|
||||||
|
($this:ident, $actor_ref:ident, $reason:ident) => {
|
||||||
|
$crate::on_actor_stop!($this, $actor_ref, $reason, { Ok(()) });
|
||||||
|
};
|
||||||
|
}
|
15
src/experimental/kameo/message.rs
Normal file
15
src/experimental/kameo/message.rs
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! message {
|
||||||
|
($name:ident, $value:ty, $docs:literal) => {
|
||||||
|
$crate::newtype!($name, $value, $docs);
|
||||||
|
};
|
||||||
|
($name:ident, $docs:literal) => {
|
||||||
|
$crate::newtype!($name, $docs);
|
||||||
|
};
|
||||||
|
($name:ident, $value:ty => $result:ty, $docs:literal) => {
|
||||||
|
$crate::newtype!($name, $value, $docs);
|
||||||
|
};
|
||||||
|
($name:ident => $result:ty, $docs:literal) => {
|
||||||
|
$crate::newtype!($name, $docs);
|
||||||
|
};
|
||||||
|
}
|
5
src/experimental/kameo/mod.rs
Normal file
5
src/experimental/kameo/mod.rs
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
//
|
||||||
|
mod actor;
|
||||||
|
mod message;
|
||||||
|
mod send;
|
||||||
|
mod spawn;
|
64
src/experimental/kameo/send.rs
Normal file
64
src/experimental/kameo/send.rs
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
//
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! tell {
|
||||||
|
($actor_ref:expr, $message:expr) => {
|
||||||
|
tell!(stringify!($actor_ref), $actor_ref, $message)
|
||||||
|
};
|
||||||
|
($actor_name:expr, $actor_ref:expr, $message:expr) => {{
|
||||||
|
tracing::debug!(actor = $actor_name, msg = stringify!($message), "send");
|
||||||
|
$actor_ref.tell($message).await
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! ask {
|
||||||
|
($actor_ref:expr, $message:expr) => {
|
||||||
|
ask!(stringify!($actor_ref), $actor_ref, $message)
|
||||||
|
};
|
||||||
|
($actor_name:expr, $actor_ref:expr, $message:expr) => {{
|
||||||
|
tracing::debug!(actor = $actor_name, msg = stringify!($message), "send");
|
||||||
|
$actor_ref.ask($message).await
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! subscribe {
|
||||||
|
($message_bus:expr, $actor_ref:expr) => {
|
||||||
|
subscribe!(
|
||||||
|
stringify!($message_bus),
|
||||||
|
$message_bus,
|
||||||
|
stringify!($actor_ref),
|
||||||
|
$actor_ref
|
||||||
|
)
|
||||||
|
};
|
||||||
|
($message_bus:expr, $actor_name:expr, $actor_ref:expr) => {
|
||||||
|
subscribe!(
|
||||||
|
stringify!($message_bus),
|
||||||
|
$message_bus,
|
||||||
|
$actor_name,
|
||||||
|
$actor_ref
|
||||||
|
)
|
||||||
|
};
|
||||||
|
($bus_name:expr, $message_bus:expr, $actor_ref:expr) => {
|
||||||
|
subscribe!($bus_name, $message_bus, stringify!($actor_ref), $actor_ref)
|
||||||
|
};
|
||||||
|
($bus_name:expr, $message_bus:expr, $actor_name:expr, $actor_ref:expr) => {{
|
||||||
|
tracing::debug!(msg_bus = $bus_name, actor = $actor_name, "subscribe");
|
||||||
|
$message_bus
|
||||||
|
.tell(kameo::actor::pubsub::Subscribe($actor_ref))
|
||||||
|
.await
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! publish {
|
||||||
|
($message_bus:expr, $message:expr) => {
|
||||||
|
publish!(stringify!($message_bus), $message_bus, $message)
|
||||||
|
};
|
||||||
|
($bus_name:expr, $message_bus:expr, $message:expr) => {{
|
||||||
|
tracing::debug!(bus = $bus_name, msg = stringify!($message), "publish");
|
||||||
|
$message_bus
|
||||||
|
.tell(kameo::actor::pubsub::Publish($message))
|
||||||
|
.await
|
||||||
|
}};
|
||||||
|
}
|
43
src/experimental/kameo/spawn.rs
Normal file
43
src/experimental/kameo/spawn.rs
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
//
|
||||||
|
/// spawns a new actor and sets up bi-directional links
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! spawn {
|
||||||
|
($parent:expr, $actor:expr) => {{
|
||||||
|
tracing::debug!("spawning : {}", $crate::stringify_expr!($actor));
|
||||||
|
let new_actor_ref = kameo::spawn($actor);
|
||||||
|
new_actor_ref.link(&$parent).await;
|
||||||
|
$parent.link(&new_actor_ref).await;
|
||||||
|
tracing::debug!("spawned : {}", $crate::stringify_expr!($actor));
|
||||||
|
new_actor_ref
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! stringify_expr {
|
||||||
|
($expr:expr) => {
|
||||||
|
stringify!($expr).lines().collect::<Vec<_>>().join(" ")
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn remove_line_breaks() {
|
||||||
|
let out = stringify_expr!([
|
||||||
|
"line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3",
|
||||||
|
"line 1", "line 2", "line 3",
|
||||||
|
]);
|
||||||
|
let expected = r#"["line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3",]"#;
|
||||||
|
|
||||||
|
assert_eq!(out, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! spawn_in_thread {
|
||||||
|
($parent:expr, $actor:expr) => {{
|
||||||
|
tracing::debug!("spawning in thread : {}", $crate::stringify_expr!($actor));
|
||||||
|
let new_actor_ref = kameo::actor::spawn_in_thread($actor);
|
||||||
|
new_actor_ref.link(&$parent).await;
|
||||||
|
$parent.link(&new_actor_ref).await;
|
||||||
|
tracing::debug!("spawned in thread : {}", $crate::stringify_expr!($actor));
|
||||||
|
new_actor_ref
|
||||||
|
}};
|
||||||
|
}
|
|
@ -1,4 +1,6 @@
|
||||||
//
|
//
|
||||||
|
#[cfg(feature = "use-kameo")]
|
||||||
|
mod kameo;
|
||||||
|
|
||||||
#[cfg(feature = "use-kxio")]
|
#[cfg(feature = "use-kxio")]
|
||||||
mod kxio;
|
mod kxio;
|
||||||
|
|
Loading…
Add table
Reference in a new issue