Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump to zenoh 1.0.0 #8

Merged
merged 11 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --release --verbose
args: --release --verbose

- name: Clean up
if: always()
uses: actions-rs/cargo@v1
with:
command: clean
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ homepage = "https://github.com/ZettaScaleLabs/zenoh-rpc"
license = " EPL-2.0 OR Apache-2.0"
readme = "README.md"
repository = "https://github.com/ZettaScaleLabs/zenoh-rpc"
version = "0.8.0-alpha.1"
version = "0.8.0"

[profile.release]
codegen-units = 1
Expand Down Expand Up @@ -62,10 +62,10 @@ tokio = { version = "1.35.1", default-features = false, features = [
"rt",
"time",
] }
zenoh = { version = "0.11.0", default-features = false }
zenoh-codec = { version = "0.11.0" }
zenoh-core = { version = "0.11.0" }
zenoh-ext = { version = "0.11.0" }
zenoh-macros = { version = "0.11.0" }
zenoh-protocol = { version = "0.11.0" }
zenoh-util = { version = "0.11.0" }
zenoh = { version= "1.0.0", default-features = false }
zenoh-codec = { version= "1.0.0" }
zenoh-core = { version= "1.0.0" }
zenoh-ext = { version= "1.0.0" }
zenoh-macros = { version= "1.0.0" }
zenoh-protocol = { version= "1.0.0" }
zenoh-util = { version= "1.0.0" }
7 changes: 2 additions & 5 deletions zrpc-derive/examples/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,10 @@ impl Hello for MyServer {
#[tokio::main]
async fn main() {
env_logger::init();
use zenoh::prelude::r#async::*;

let mut config = zenoh::config::Config::default();
config
.set_mode(Some(zenoh::config::whatami::WhatAmI::Peer))
.unwrap();
let zsession = Arc::new(zenoh::open(config).res().await.unwrap());
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
let zsession = zenoh::open(config).await.unwrap();

let z = zsession.clone();
let client = HelloClient::builder(zsession).build();
Expand Down
32 changes: 11 additions & 21 deletions zrpc-derive/examples/simplified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
use zenoh::config::ZenohId;
use zenoh::key_expr::format::KeFormat;
use zenoh::prelude::ZenohId;

use zenoh::key_expr::KeyExpr;
use zrpc::prelude::*;

use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::ops::Deref;
use std::str::{self, FromStr};
use std::time::Duration;
use zenoh::{Session, SessionDeclarations};

use serde::{Deserialize, Serialize};
use zenoh::prelude::r#async::*;
use zenoh::Session;

// this is the user defined trait
#[async_trait::async_trait]
Expand Down Expand Up @@ -239,7 +238,7 @@ impl Deref for SubResponse {
}

pub struct HelloClientBuilder<'a> {
pub z: Arc<Session>,
pub z: Session,
pub labels: HashSet<String>,
ke_format: KeFormat<'a>,
pub tout: Duration,
Expand Down Expand Up @@ -284,15 +283,15 @@ impl<'a> HelloClientBuilder<'a> {
pub struct HelloClient<'a> {
pub(crate) ch: RPCClientChannel,
pub(crate) ke_format: KeFormat<'a>,
pub(crate) z: Arc<Session>,
pub(crate) z: Session,
pub(crate) tout: Duration,
pub(crate) labels: HashSet<String>,
}

// generated client code

impl<'a> HelloClient<'a> {
pub fn builder(z: Arc<zenoh::Session>) -> HelloClientBuilder<'a> {
pub fn builder(z: zenoh::Session) -> HelloClientBuilder<'a> {
HelloClientBuilder {
z,
labels: HashSet::new(),
Expand Down Expand Up @@ -349,21 +348,15 @@ impl<'a> HelloClient<'a> {
.z
.liveliness()
.get("@rpc/*/service/Hello")
.res()
.await
.map_err(|e| {
Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;

let ids = res
.into_iter()
.map(|e| {
self.extract_id_from_ke(
&e.sample
.map_err(|_| Status::unavailable("Cannot get value from sample"))?
.key_expr,
)
})
.filter_map(|e| e.into_result().ok())
.map(|e| self.extract_id_from_ke(e.key_expr()))
.collect::<Result<Vec<ZenohId>, Status>>()?;

// get server metadata
Expand Down Expand Up @@ -400,13 +393,10 @@ impl<'a> HelloClient<'a> {
async fn main() {
{
env_logger::init();
use zenoh::prelude::r#async::*;

let mut config = zenoh::config::Config::default();
config
.set_mode(Some(zenoh::config::whatami::WhatAmI::Peer))
.unwrap();
let zsession = Arc::new(zenoh::open(config).res().await.unwrap());
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
let zsession = zenoh::open(config).await.unwrap();

let z = zsession.clone();

Expand Down
24 changes: 10 additions & 14 deletions zrpc-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
}
}
}
match recv {

Check failure on line 124 in zrpc-derive/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build on ubuntu-22.04

you seem to be trying to use `match` for an equality check. Consider using `if`
None => extend_errors!(
errors,
syn::Error::new(
Expand Down Expand Up @@ -467,7 +467,7 @@
quote! {
#[automatically_derived]
#vis struct #client_builder_ident<'a> {
pub(crate) z: std::sync::Arc<zenoh::Session>,
pub(crate) z: zenoh::Session,
ke_format: zenoh::key_expr::format::KeFormat<'a>,
pub(crate) tout: std::time::Duration,
pub(crate) labels: std::collections::HashSet<std::string::String>,
Expand Down Expand Up @@ -513,7 +513,7 @@
#[derive(Clone, Debug)]
#vis struct #client_ident<'a> {
pub(crate) ch : zrpc::prelude::RPCClientChannel,
pub(crate) z: std::sync::Arc<zenoh::Session>,
pub(crate) z: zenoh::Session,
pub(crate) ke_format: zenoh::key_expr::format::KeFormat<'a>,
pub(crate) tout: std::time::Duration,
pub(crate) labels: std::collections::HashSet<std::string::String>,
Expand All @@ -523,7 +523,7 @@
impl<'a> #client_ident<'a> {


#vis fn builder(z: std::sync::Arc<zenoh::Session>) -> #client_builder_ident<'a> {
#vis fn builder(z: zenoh::Session) -> #client_builder_ident<'a> {
#client_builder_ident {
z,
labels: std::collections::HashSet::new(),
Expand All @@ -535,34 +535,30 @@

#(#fns)*

async fn find_server(&self) -> std::result::Result<zenoh::prelude::ZenohId, zrpc::prelude::Status> {
use zenoh::prelude::r#async::AsyncResolve;
use zenoh::SessionDeclarations;
async fn find_server(&self) -> std::result::Result<zenoh::config::ZenohId, zrpc::prelude::Status> {

let res = self
.z
.liveliness()
.get(#rpc_ke)
.res()
.await
.map_err(|e| {
zrpc::prelude::Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;();

let ids = res
.into_iter()
.filter_map(|e| e.into_result().ok())
.map(|e| {
self.extract_id_from_ke(
&e.sample
.map_err(|_| zrpc::prelude::Status::unavailable("Cannot get value from sample"))?
.key_expr,
e.key_expr()
)
})
.collect::<std::result::Result<std::vec::Vec<zenoh::prelude::ZenohId>, zrpc::prelude::Status>>()?;
.collect::<std::result::Result<std::vec::Vec<zenoh::config::ZenohId>, zrpc::prelude::Status>>()?;

let metadatas = self.ch.get_servers_metadata(&ids, self.tout).await?;

let mut ids: Vec<zenoh::prelude::ZenohId> = metadatas
let mut ids: Vec<zenoh::config::ZenohId> = metadatas
.into_iter()
.filter(|m| m.labels.is_superset(&self.labels))
.map(|m| m.id)
Expand All @@ -572,7 +568,7 @@
.ok_or(zrpc::prelude::Status::unavailable("No servers found"))
}

fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result<zenoh::prelude::ZenohId, zrpc::prelude::Status> {
fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result<zenoh::config::ZenohId, zrpc::prelude::Status> {
use std::str::FromStr;
let id_str = self
.ke_format
Expand All @@ -581,7 +577,7 @@
.get("zid")
.map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to get server id from key expression: {e:?}")))?;

zenoh::prelude::ZenohId::from_str(id_str)
zenoh::config::ZenohId::from_str(id_str)
.map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}")))

}
Expand Down
49 changes: 25 additions & 24 deletions zrpc-derive/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
* ADLINK fog05 team, <[email protected]>
*********************************************************************************/

use std::sync::Arc;
use std::{str::FromStr, sync::Arc};
use tokio::sync::Mutex;

use async_trait::async_trait;
use zenoh::config::{EndPoint, ZenohId};
//importing the macros
use zenoh::prelude::r#async::*;
use zrpc::prelude::*;
use zrpc_derive::service;

Expand Down Expand Up @@ -55,34 +55,35 @@ impl Hello for MyServer {
fn configure_zenoh(id: ZenohId, listen: String, connect: String) -> zenoh::config::Config {
let mut config = zenoh::config::Config::default();
config.set_id(id).unwrap();
config
.set_mode(Some(zenoh::config::whatami::WhatAmI::Peer))
.unwrap();
config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap();
config.scouting.multicast.set_enabled(Some(false)).unwrap();
config.listen.endpoints.push(listen.parse().unwrap());
config.connect.endpoints.push(connect.parse().unwrap());

let listen: Vec<EndPoint> = vec![listen.parse().unwrap()];
let connect: Vec<EndPoint> = vec![connect.parse().unwrap()];
config.listen.endpoints.set(listen).unwrap();
config.connect.endpoints.set(connect).unwrap();

config
}

async fn wait_for_peer(session: &zenoh::Session, id: ZenohId) {
while !session.info().peers_zid().res().await.any(|e| e == id) {
while !session.info().peers_zid().await.any(|e| e == id) {
tokio::time::sleep(std::time::Duration::from_secs(1)).await
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn service_call() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a0").unwrap();
let client_zid = ZenohId::from_str("a1").unwrap();

let client_config = configure_zenoh(
client_zid,
"tcp/127.0.0.1:9003".to_string(),
"tcp/127.0.0.1:9002".to_string(),
);

let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let client_session = zenoh::open(client_config).await.unwrap();

let c_zid_server = server_zid;
tokio::task::spawn(async move {
Expand All @@ -91,7 +92,7 @@ async fn service_call() {
"tcp/127.0.0.1:9002".to_string(),
"tcp/127.0.0.1:9003".to_string(),
);
let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
wait_for_peer(&server_session, client_zid).await;

let service = MyServer {
Expand Down Expand Up @@ -138,8 +139,8 @@ async fn service_call() {

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn service_unavailable() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a2").unwrap();
let client_zid = ZenohId::from_str("a3").unwrap();

let server_config = configure_zenoh(
server_zid,
Expand All @@ -153,8 +154,8 @@ async fn service_unavailable() {
"tcp/127.0.0.1:9004".to_string(),
);

let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
let client_session = zenoh::open(client_config).await.unwrap();

// Check zenoh sessions are connected
wait_for_peer(&server_session, client_zid).await;
Expand All @@ -170,16 +171,16 @@ async fn service_unavailable() {

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn server_not_matching() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a4").unwrap();
let client_zid = ZenohId::from_str("a5").unwrap();

let client_config = configure_zenoh(
client_zid,
"tcp/127.0.0.1:9007".to_string(),
"tcp/127.0.0.1:9006".to_string(),
);

let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let client_session = zenoh::open(client_config).await.unwrap();

let c_zid_server = server_zid;
let st = tokio::task::spawn(async move {
Expand All @@ -188,7 +189,7 @@ async fn server_not_matching() {
"tcp/127.0.0.1:9006".to_string(),
"tcp/127.0.0.1:9007".to_string(),
);
let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
wait_for_peer(&server_session, client_zid).await;

let service = MyServer {
Expand Down Expand Up @@ -220,16 +221,16 @@ async fn server_not_matching() {

#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn server_matching() {
let server_zid = ZenohId::rand();
let client_zid = ZenohId::rand();
let server_zid = ZenohId::from_str("a6").unwrap();
let client_zid = ZenohId::from_str("a7").unwrap();

let client_config = configure_zenoh(
client_zid,
"tcp/127.0.0.1:9009".to_string(),
"tcp/127.0.0.1:9008".to_string(),
);

let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap());
let client_session = zenoh::open(client_config).await.unwrap();

let c_zid_server = server_zid;
tokio::task::spawn(async move {
Expand All @@ -238,7 +239,7 @@ async fn server_matching() {
"tcp/127.0.0.1:9008".to_string(),
"tcp/127.0.0.1:9009".to_string(),
);
let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap());
let server_session = zenoh::open(server_config).await.unwrap();
wait_for_peer(&server_session, client_zid).await;

let service = MyServer {
Expand Down
Loading
Loading