diff --git a/scripts/run-sub-callback.sh b/scripts/run-sub-callback.sh new file mode 100755 index 0000000..18891e1 --- /dev/null +++ b/scripts/run-sub-callback.sh @@ -0,0 +1 @@ +ZENOH_CONFIG=config/peer.yaml ./target/debug/sub_callback diff --git a/src/bin/pub.rs b/src/bin/pub.rs index c4dc16c..b8c05b9 100644 --- a/src/bin/pub.rs +++ b/src/bin/pub.rs @@ -1,22 +1,55 @@ +use anyhow::anyhow; +use anyhow::Result; use std::time::Duration; +fn build_message1() -> Result> { + let mut message = zenoh_exp::Message::default(); + message.sender = "pub".to_string(); + message.message = "Hi sub!".to_string(); + let mut buff: Vec = Vec::new(); + let _ = ciborium::into_writer(&message, &mut buff)?; + + return Ok(buff); +} + +fn build_message2() -> Result> { + let mut message = zenoh_exp::Message::default(); + message.sender = "pub".to_string(); + message.message = "yo sub!".to_string(); + let mut buff: Vec = Vec::new(); + let _ = ciborium::into_writer(&message, &mut buff)?; + + return Ok(buff); +} + #[tokio::main] -async fn main() { - let config = zenoh::Config::from_env().unwrap(); - let session = zenoh::open(config).await.unwrap(); - let publisher = session.declare_publisher("test").await.unwrap(); +async fn main() -> Result<()> { + let config = zenoh::Config::from_env().map_err(|e| anyhow!(e))?; + let session = zenoh::open(config).await.map_err(|e| anyhow!(e))?; + let test_publisher = session + .declare_publisher("test") + .await + .map_err(|e| anyhow!(e))?; + let yo_publisher = session + .declare_publisher("yo") + .await + .map_err(|e| anyhow!(e))?; loop { - let mut message = zenoh_exp::Message::default(); - message.sender = "pub".to_string(); - message.message = "Hi sub!".to_string(); - let mut buff: Vec = Vec::new(); - let _ = ciborium::into_writer(&message, &mut buff).unwrap(); - publisher + let buff = build_message1()?; + test_publisher .put(&buff) - .encoding("application/cbor;zenoh_exp::Mesage") + .encoding("application/cbor;zenoh_exp::Message") .await - .unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; + .map_err(|e| anyhow!(e))?; + tokio::time::sleep(Duration::from_secs_f32(0.5)).await; + + let buff = build_message2()?; + yo_publisher + .put(&buff) + .encoding("application/cbor;zenoh_exp::Message") + .await + .map_err(|e| anyhow!(e))?; + tokio::time::sleep(Duration::from_secs_f32(0.5)).await; } } diff --git a/src/bin/sub.rs b/src/bin/sub.rs index e3bab12..499f3b8 100644 --- a/src/bin/sub.rs +++ b/src/bin/sub.rs @@ -1,20 +1,35 @@ +use anyhow::{anyhow, Result}; + #[tokio::main] -async fn main() { - let config = zenoh::Config::from_env().unwrap(); - let session = zenoh::open(config).await.unwrap(); - let subscriber = session.declare_subscriber("test").await.unwrap(); +async fn main() -> Result<()> { + let config = zenoh::Config::from_env().map_err(|e| anyhow!(e))?; + let session = zenoh::open(config).await.map_err(|e| anyhow!(e))?; - loop { - let sample = subscriber.recv_async().await.unwrap(); - let payload = sample.payload(); - let bytes: Vec = payload.into(); - let message: zenoh_exp::Message = ciborium::from_reader(&bytes[..]).unwrap(); + let test_session = session.clone(); + tokio::spawn(async move { + let subscriber = test_session + .declare_subscriber("test") + .with(zenoh::handlers::RingChannel::new(1)) + .await + .unwrap(); + loop { + let sample = subscriber.recv_async().await.unwrap(); + zenoh_exp::print_message(&sample); + } + }); - println!( - "Encoding: {} Key: {} Value: {:?}", - sample.encoding().to_string(), - sample.key_expr().as_str(), - message - ); - } + let yo_session = session.clone(); + tokio::spawn(async move { + let subscriber = yo_session + .declare_subscriber("yo") + .with(zenoh::handlers::RingChannel::new(1)) + .await + .unwrap(); + loop { + let sample = subscriber.recv_async().await.unwrap(); + zenoh_exp::print_message(&sample); + } + }); + + loop {} } diff --git a/src/bin/sub_callback.rs b/src/bin/sub_callback.rs new file mode 100644 index 0000000..dfb0ad3 --- /dev/null +++ b/src/bin/sub_callback.rs @@ -0,0 +1,25 @@ +use anyhow::Result; +use zenoh_exp::print_message; + +#[tokio::main] +async fn main() -> Result<()> { + let config = zenoh::Config::from_env().unwrap(); + let session = zenoh::open(config).await.unwrap(); + + let test_session = session.clone(); + tokio::spawn(async move { + let _ = test_session + .declare_subscriber("test") + .callback(|sample| print_message(&sample)) + .await; + }); + let yo_session = session.clone(); + tokio::spawn(async move { + let _ = yo_session + .declare_subscriber("yo") + .callback(|sample| print_message(&sample)) + .await; + }); + + loop {} +} diff --git a/src/lib.rs b/src/lib.rs index 3a25b43..c02d4da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::string::String; -// A request for a plan. +// A message. #[derive(Serialize, Deserialize, Debug)] pub struct Message { pub sender: String, @@ -16,3 +16,22 @@ impl Message { } } } + +// Go from a sample to a message and print it. +pub fn print_message(sample: &zenoh::sample::Sample) { + let payload = sample.payload(); + let bytes: Vec = payload.into(); + let message: Message = ciborium::from_reader(&bytes[..]).unwrap(); + + println!( + "Encoding: {} Key: {} Value: {:?}", + sample.encoding().to_string(), + sample.key_expr().as_str(), + message + ); +} + +// For debug. +pub fn print_type_of(_: &T) { + println!("{}", std::any::type_name::()); +}