feat: add module experimental::kameo
This commit is contained in:
parent
d2e0db259d
commit
f43961e596
6 changed files with 338 additions and 0 deletions
209
src/experimental/kameo/actor.rs
Normal file
209
src/experimental/kameo/actor.rs
Normal file
|
@ -0,0 +1,209 @@
|
|||
//
|
||||
|
||||
/// Called when the actor starts, before it processes any messages.
|
||||
///
|
||||
/// Messages sent internally by the actor during `on_start` are prioritized and processed
|
||||
/// before any externally sent messages, even if external messages are received first.
|
||||
///
|
||||
/// This ensures that the actor can properly initialize before handling external messages.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use trello_to_deck::on_actor_start;
|
||||
/// struct ServerActor;
|
||||
/// impl kameo::Actor for ServerActor {
|
||||
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||
///
|
||||
/// on_actor_start!(this, actor_ref, {
|
||||
/// // handle start here
|
||||
/// Ok(())
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! on_actor_start {
|
||||
($this:ident, $actor_ref:ident, $body:expr) => {
|
||||
#[allow(unused_variables)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn on_start(
|
||||
&mut self,
|
||||
actor_ref: kameo::actor::ActorRef<Self>,
|
||||
) -> std::result::Result<(), kameo::error::BoxError> {
|
||||
tracing::debug!(?actor_ref, "{}", <Self as kameo::Actor>::name());
|
||||
let $this = self;
|
||||
let $actor_ref = actor_ref;
|
||||
$body
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! default_on_actor_start {
|
||||
($this:ident, $actor_ref:ident) => {
|
||||
$crate::on_actor_start!($this, $actor_ref, { Ok(()) });
|
||||
};
|
||||
}
|
||||
|
||||
/// Called swhen the actor encounters a panic or an error during "tell" message handling.
|
||||
///
|
||||
/// This method gives the actor an opportunity to clean up or reset its state and determine
|
||||
/// whether it should be stopped or continue processing messages.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `err`: The panic or error that occurred.
|
||||
///
|
||||
/// # Returns
|
||||
/// - `Some(ActorStopReason)`: Stops the actor.
|
||||
/// - `None`: Allows the actor to continue processing messages.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use trello_to_deck::on_actor_panic;
|
||||
/// struct ServerActor;
|
||||
/// impl kameo::Actor for ServerActor {
|
||||
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||
///
|
||||
/// on_actor_panic!(this, actor_ref, err, {
|
||||
/// // handle panic here
|
||||
/// Ok(None)
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! on_actor_panic {
|
||||
($this:ident, $actor_ref:ident, $err:ident, $body:expr) => {
|
||||
#[allow(unused_variables)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn on_panic(
|
||||
&mut self,
|
||||
actor_ref: kameo::actor::WeakActorRef<Self>,
|
||||
err: kameo::error::PanicError,
|
||||
) -> std::result::Result<Option<kameo::error::ActorStopReason>, kameo::error::BoxError> {
|
||||
tracing::debug!(?actor_ref, %err, "{}", <Self as kameo::Actor>::name());
|
||||
let $this = self;
|
||||
let $actor_ref = actor_ref;
|
||||
let $err = err;
|
||||
$body
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! default_on_actor_panic {
|
||||
($this:ident, $actor_ref:ident, $err:ident) => {
|
||||
$crate::on_actor_panic!($this, $actor_ref, $err, {
|
||||
Ok(Some(kameo::error::ActorStopReason::Panicked($err)))
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/// Called when a linked actor dies.
|
||||
///
|
||||
/// By default, the actor will stop if the reason for the linked actor's death is anything other
|
||||
/// than `Normal`. You can customize this behavior in the implementation.
|
||||
///
|
||||
/// # Returns
|
||||
/// Whether the actor should stop or continue processing messages.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use trello_to_deck::on_actor_link_died;
|
||||
/// struct ServerActor;
|
||||
/// impl kameo::Actor for ServerActor {
|
||||
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||
///
|
||||
/// on_actor_link_died!(this, actor_ref, id, reason, {
|
||||
/// // handle link death here
|
||||
/// Ok(None)
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! on_actor_link_died {
|
||||
($this:ident, $actor_ref:ident, $id:ident, $reason:ident, $body:expr) => {
|
||||
#[allow(unused_variables)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn on_link_died(
|
||||
&mut self,
|
||||
actor_ref: kameo::actor::WeakActorRef<Self>,
|
||||
id: kameo::actor::ActorID,
|
||||
reason: kameo::error::ActorStopReason,
|
||||
) -> std::result::Result<Option<kameo::error::ActorStopReason>, kameo::error::BoxError> {
|
||||
tracing::debug!(?actor_ref, %id, %reason, "{}", <Self as kameo::Actor>::name());
|
||||
let $this = self;
|
||||
let $actor_ref = actor_ref;
|
||||
let $id = id;
|
||||
let $reason = reason;
|
||||
$body
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! default_on_actor_link_died {
|
||||
($this:ident, $actor_ref:ident, $id:ident, $reason:ident) => {
|
||||
$crate::on_actor_link_died!($this, $actor_ref, $id, $reason, {
|
||||
match &$reason {
|
||||
kameo::error::ActorStopReason::Normal => Ok(None),
|
||||
kameo::error::ActorStopReason::Killed
|
||||
| kameo::error::ActorStopReason::Panicked(_)
|
||||
| kameo::error::ActorStopReason::LinkDied { .. } => {
|
||||
Ok(Some(kameo::error::ActorStopReason::LinkDied {
|
||||
id: $id,
|
||||
reason: Box::new($reason),
|
||||
}))
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
/// Called before the actor stops.
|
||||
///
|
||||
/// This allows the actor to perform any necessary cleanup or release resources before being fully stopped.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `reason`: The reason why the actor is being stopped.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use trello_to_deck::on_actor_stop;
|
||||
/// struct ServerActor;
|
||||
/// impl kameo::Actor for ServerActor {
|
||||
/// type Mailbox = kameo::mailbox::unbounded::UnboundedMailbox<Self>;
|
||||
///
|
||||
/// on_actor_stop!(this, actor_ref, reason, {
|
||||
/// // handle stop here
|
||||
/// Ok(())
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! on_actor_stop {
|
||||
($this:ident, $actor_ref:ident, $reason:ident, $body:expr) => {
|
||||
#[allow(unused_variables)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn on_stop(
|
||||
&mut self,
|
||||
actor_ref: kameo::actor::WeakActorRef<Self>,
|
||||
reason: kameo::error::ActorStopReason,
|
||||
) -> std::result::Result<(), kameo::error::BoxError> {
|
||||
tracing::debug!(?actor_ref, %reason, "{}", <Self as kameo::Actor>::name());
|
||||
let $this = self;
|
||||
let $actor_ref = actor_ref;
|
||||
let $reason = reason;
|
||||
$body
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! default_on_actor_stop {
|
||||
($this:ident, $actor_ref:ident, $reason:ident) => {
|
||||
$crate::on_actor_stop!($this, $actor_ref, $reason, { Ok(()) });
|
||||
};
|
||||
}
|
15
src/experimental/kameo/message.rs
Normal file
15
src/experimental/kameo/message.rs
Normal file
|
@ -0,0 +1,15 @@
|
|||
#[macro_export]
|
||||
macro_rules! message {
|
||||
($name:ident, $value:ty, $docs:literal) => {
|
||||
$crate::newtype!($name, $value, $docs);
|
||||
};
|
||||
($name:ident, $docs:literal) => {
|
||||
$crate::newtype!($name, $docs);
|
||||
};
|
||||
($name:ident, $value:ty => $result:ty, $docs:literal) => {
|
||||
$crate::newtype!($name, $value, $docs);
|
||||
};
|
||||
($name:ident => $result:ty, $docs:literal) => {
|
||||
$crate::newtype!($name, $docs);
|
||||
};
|
||||
}
|
5
src/experimental/kameo/mod.rs
Normal file
5
src/experimental/kameo/mod.rs
Normal file
|
@ -0,0 +1,5 @@
|
|||
//
|
||||
mod actor;
|
||||
mod message;
|
||||
mod send;
|
||||
mod spawn;
|
64
src/experimental/kameo/send.rs
Normal file
64
src/experimental/kameo/send.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
//
|
||||
#[macro_export]
|
||||
macro_rules! tell {
|
||||
($actor_ref:expr, $message:expr) => {
|
||||
tell!(stringify!($actor_ref), $actor_ref, $message)
|
||||
};
|
||||
($actor_name:expr, $actor_ref:expr, $message:expr) => {{
|
||||
tracing::debug!(actor = $actor_name, msg = stringify!($message), "send");
|
||||
$actor_ref.tell($message).await
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! ask {
|
||||
($actor_ref:expr, $message:expr) => {
|
||||
ask!(stringify!($actor_ref), $actor_ref, $message)
|
||||
};
|
||||
($actor_name:expr, $actor_ref:expr, $message:expr) => {{
|
||||
tracing::debug!(actor = $actor_name, msg = stringify!($message), "send");
|
||||
$actor_ref.ask($message).await
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! subscribe {
|
||||
($message_bus:expr, $actor_ref:expr) => {
|
||||
subscribe!(
|
||||
stringify!($message_bus),
|
||||
$message_bus,
|
||||
stringify!($actor_ref),
|
||||
$actor_ref
|
||||
)
|
||||
};
|
||||
($message_bus:expr, $actor_name:expr, $actor_ref:expr) => {
|
||||
subscribe!(
|
||||
stringify!($message_bus),
|
||||
$message_bus,
|
||||
$actor_name,
|
||||
$actor_ref
|
||||
)
|
||||
};
|
||||
($bus_name:expr, $message_bus:expr, $actor_ref:expr) => {
|
||||
subscribe!($bus_name, $message_bus, stringify!($actor_ref), $actor_ref)
|
||||
};
|
||||
($bus_name:expr, $message_bus:expr, $actor_name:expr, $actor_ref:expr) => {{
|
||||
tracing::debug!(msg_bus = $bus_name, actor = $actor_name, "subscribe");
|
||||
$message_bus
|
||||
.tell(kameo::actor::pubsub::Subscribe($actor_ref))
|
||||
.await
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! publish {
|
||||
($message_bus:expr, $message:expr) => {
|
||||
publish!(stringify!($message_bus), $message_bus, $message)
|
||||
};
|
||||
($bus_name:expr, $message_bus:expr, $message:expr) => {{
|
||||
tracing::debug!(bus = $bus_name, msg = stringify!($message), "publish");
|
||||
$message_bus
|
||||
.tell(kameo::actor::pubsub::Publish($message))
|
||||
.await
|
||||
}};
|
||||
}
|
43
src/experimental/kameo/spawn.rs
Normal file
43
src/experimental/kameo/spawn.rs
Normal file
|
@ -0,0 +1,43 @@
|
|||
//
|
||||
/// spawns a new actor and sets up bi-directional links
|
||||
#[macro_export]
|
||||
macro_rules! spawn {
|
||||
($parent:expr, $actor:expr) => {{
|
||||
tracing::debug!("spawning : {}", $crate::stringify_expr!($actor));
|
||||
let new_actor_ref = kameo::spawn($actor);
|
||||
new_actor_ref.link(&$parent).await;
|
||||
$parent.link(&new_actor_ref).await;
|
||||
tracing::debug!("spawned : {}", $crate::stringify_expr!($actor));
|
||||
new_actor_ref
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! stringify_expr {
|
||||
($expr:expr) => {
|
||||
stringify!($expr).lines().collect::<Vec<_>>().join(" ")
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_line_breaks() {
|
||||
let out = stringify_expr!([
|
||||
"line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3",
|
||||
"line 1", "line 2", "line 3",
|
||||
]);
|
||||
let expected = r#"["line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3", "line 1", "line 2", "line 3",]"#;
|
||||
|
||||
assert_eq!(out, expected);
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! spawn_in_thread {
|
||||
($parent:expr, $actor:expr) => {{
|
||||
tracing::debug!("spawning in thread : {}", $crate::stringify_expr!($actor));
|
||||
let new_actor_ref = kameo::actor::spawn_in_thread($actor);
|
||||
new_actor_ref.link(&$parent).await;
|
||||
$parent.link(&new_actor_ref).await;
|
||||
tracing::debug!("spawned in thread : {}", $crate::stringify_expr!($actor));
|
||||
new_actor_ref
|
||||
}};
|
||||
}
|
|
@ -1,4 +1,6 @@
|
|||
//
|
||||
#[cfg(feature = "use-kameo")]
|
||||
mod kameo;
|
||||
|
||||
#[cfg(feature = "use-kxio")]
|
||||
mod kxio;
|
||||
|
|
Loading…
Add table
Reference in a new issue