Callbacks, rins sub.

This commit is contained in:
James Pace 2024-09-28 20:32:34 +00:00
parent ed30daaf06
commit 3250a5adc6
5 changed files with 123 additions and 30 deletions

1
scripts/run-sub-callback.sh Executable file
View File

@ -0,0 +1 @@
ZENOH_CONFIG=config/peer.yaml ./target/debug/sub_callback

View File

@ -1,22 +1,55 @@
use anyhow::anyhow;
use anyhow::Result;
use std::time::Duration; use std::time::Duration;
#[tokio::main] fn build_message1() -> Result<Vec<u8>> {
async fn main() {
let config = zenoh::Config::from_env().unwrap();
let session = zenoh::open(config).await.unwrap();
let publisher = session.declare_publisher("test").await.unwrap();
loop {
let mut message = zenoh_exp::Message::default(); let mut message = zenoh_exp::Message::default();
message.sender = "pub".to_string(); message.sender = "pub".to_string();
message.message = "Hi sub!".to_string(); message.message = "Hi sub!".to_string();
let mut buff: Vec<u8> = Vec::new(); let mut buff: Vec<u8> = Vec::new();
let _ = ciborium::into_writer(&message, &mut buff).unwrap(); let _ = ciborium::into_writer(&message, &mut buff)?;
publisher
.put(&buff) return Ok(buff);
.encoding("application/cbor;zenoh_exp::Mesage") }
fn build_message2() -> Result<Vec<u8>> {
let mut message = zenoh_exp::Message::default();
message.sender = "pub".to_string();
message.message = "yo sub!".to_string();
let mut buff: Vec<u8> = Vec::new();
let _ = ciborium::into_writer(&message, &mut buff)?;
return Ok(buff);
}
#[tokio::main]
async fn main() -> Result<()> {
let config = zenoh::Config::from_env().map_err(|e| anyhow!(e))?;
let session = zenoh::open(config).await.map_err(|e| anyhow!(e))?;
let test_publisher = session
.declare_publisher("test")
.await .await
.unwrap(); .map_err(|e| anyhow!(e))?;
tokio::time::sleep(Duration::from_secs(1)).await; let yo_publisher = session
.declare_publisher("yo")
.await
.map_err(|e| anyhow!(e))?;
loop {
let buff = build_message1()?;
test_publisher
.put(&buff)
.encoding("application/cbor;zenoh_exp::Message")
.await
.map_err(|e| anyhow!(e))?;
tokio::time::sleep(Duration::from_secs_f32(0.5)).await;
let buff = build_message2()?;
yo_publisher
.put(&buff)
.encoding("application/cbor;zenoh_exp::Message")
.await
.map_err(|e| anyhow!(e))?;
tokio::time::sleep(Duration::from_secs_f32(0.5)).await;
} }
} }

View File

@ -1,20 +1,35 @@
#[tokio::main] use anyhow::{anyhow, Result};
async fn main() {
let config = zenoh::Config::from_env().unwrap();
let session = zenoh::open(config).await.unwrap();
let subscriber = session.declare_subscriber("test").await.unwrap();
#[tokio::main]
async fn main() -> Result<()> {
let config = zenoh::Config::from_env().map_err(|e| anyhow!(e))?;
let session = zenoh::open(config).await.map_err(|e| anyhow!(e))?;
let test_session = session.clone();
tokio::spawn(async move {
let subscriber = test_session
.declare_subscriber("test")
.with(zenoh::handlers::RingChannel::new(1))
.await
.unwrap();
loop { loop {
let sample = subscriber.recv_async().await.unwrap(); let sample = subscriber.recv_async().await.unwrap();
let payload = sample.payload(); zenoh_exp::print_message(&sample);
let bytes: Vec<u8> = payload.into();
let message: zenoh_exp::Message = ciborium::from_reader(&bytes[..]).unwrap();
println!(
"Encoding: {} Key: {} Value: {:?}",
sample.encoding().to_string(),
sample.key_expr().as_str(),
message
);
} }
});
let yo_session = session.clone();
tokio::spawn(async move {
let subscriber = yo_session
.declare_subscriber("yo")
.with(zenoh::handlers::RingChannel::new(1))
.await
.unwrap();
loop {
let sample = subscriber.recv_async().await.unwrap();
zenoh_exp::print_message(&sample);
}
});
loop {}
} }

25
src/bin/sub_callback.rs Normal file
View File

@ -0,0 +1,25 @@
use anyhow::Result;
use zenoh_exp::print_message;
#[tokio::main]
async fn main() -> Result<()> {
let config = zenoh::Config::from_env().unwrap();
let session = zenoh::open(config).await.unwrap();
let test_session = session.clone();
tokio::spawn(async move {
let _ = test_session
.declare_subscriber("test")
.callback(|sample| print_message(&sample))
.await;
});
let yo_session = session.clone();
tokio::spawn(async move {
let _ = yo_session
.declare_subscriber("yo")
.callback(|sample| print_message(&sample))
.await;
});
loop {}
}

View File

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::string::String; use std::string::String;
// A request for a plan. // A message.
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct Message { pub struct Message {
pub sender: String, pub sender: String,
@ -16,3 +16,22 @@ impl Message {
} }
} }
} }
// Go from a sample to a message and print it.
pub fn print_message(sample: &zenoh::sample::Sample) {
let payload = sample.payload();
let bytes: Vec<u8> = payload.into();
let message: Message = ciborium::from_reader(&bytes[..]).unwrap();
println!(
"Encoding: {} Key: {} Value: {:?}",
sample.encoding().to_string(),
sample.key_expr().as_str(),
message
);
}
// For debug.
pub fn print_type_of<T>(_: &T) {
println!("{}", std::any::type_name::<T>());
}