From ffccbd452b834cc822ea71ef1e1f8e4d9ddf08f2 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 12 Aug 2024 16:25:15 +0200 Subject: [PATCH 01/11] chore: bump to zenoh 1.0.0-alpha.6 Signed-off-by: Gabriele Baldoni --- Cargo.toml | 14 +++++++------- zrpc/src/rpcchannel.rs | 20 ++++++++++++-------- zrpc/src/server.rs | 35 +++++++++++++++-------------------- zrpc/src/zrpcresult.rs | 4 ++-- 4 files changed, 36 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8f54358..c89745e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,10 +62,10 @@ tokio = { version = "1.35.1", default-features = false, features = [ "rt", "time", ] } -zenoh = { version = "0.11.0", default-features = false } -zenoh-codec = { version = "0.11.0" } -zenoh-core = { version = "0.11.0" } -zenoh-ext = { version = "0.11.0" } -zenoh-macros = { version = "0.11.0" } -zenoh-protocol = { version = "0.11.0" } -zenoh-util = { version = "0.11.0" } +zenoh = { version = "1.0.0-alpha.6", default-features = false } +zenoh-codec = { version = "1.0.0-alpha.6" } +zenoh-core = { version = "1.0.0-alpha.6" } +zenoh-ext = { version = "1.0.0-alpha.6" } +zenoh-macros = { version = "1.0.0-alpha.6" } +zenoh-protocol = { version = "1.0.0-alpha.6" } +zenoh-util = { version = "1.0.0-alpha.6" } diff --git a/zrpc/src/rpcchannel.rs b/zrpc/src/rpcchannel.rs index a6b7718..150d341 100644 --- a/zrpc/src/rpcchannel.rs +++ b/zrpc/src/rpcchannel.rs @@ -20,7 +20,7 @@ use flume::Receiver; use log::trace; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use zenoh::prelude::r#async::*; +use zenoh::config::ZenohId; use zenoh::query::*; use zenoh::Session; @@ -74,10 +74,9 @@ impl RPCClientChannel { Ok(self .z .get(&selector) - .with_value(req) + .payload(req) .target(QueryTarget::All) .timeout(tout) - .res() .await?) } @@ -104,9 +103,14 @@ impl RPCClientChannel { let reply = data_receiver.recv_async().await; log::trace!("Response from zenoh is {:?}", reply); if let Ok(reply) = reply { - match reply.sample { + match reply.result() { Ok(sample) => { - let raw_data: Vec = sample.payload.contiguous().to_vec(); + // This is infallible, using unwrap_or_default so that if cannot get + // the vec then the deseriazliation fail on an empty one. + let raw_data = sample + .payload() + .deserialize::>() + .unwrap_or_default(); let wmsg: WireMessage = deserialize(&raw_data).map_err(|e| { Status::new(Code::InternalError, format!("deserialization error: {e:?}")) @@ -151,17 +155,17 @@ impl RPCClientChannel { .get(ke) .target(QueryTarget::All) .timeout(tout) - .res() .await .map_err(|e| Status::new(Code::InternalError, format!("communication error: {e:?}")))?; let metadata = data .into_iter() // getting only reply with Sample::Ok - .filter_map(|r| r.sample.ok()) + .filter_map(|r| r.into_result().ok()) // getting only the ones we can deserialize .filter_map(|s| { - let raw_data = s.payload.contiguous().to_vec(); + // This is infallible + let raw_data = s.payload().deserialize::>().unwrap_or_default(); deserialize::(&raw_data).ok() }) // get only the ones that do not have errors diff --git a/zrpc/src/server.rs b/zrpc/src/server.rs index 6d1f1bc..d5e0fc7 100644 --- a/zrpc/src/server.rs +++ b/zrpc/src/server.rs @@ -14,8 +14,10 @@ use std::collections::HashSet; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; +use zenoh::config::ZenohId; +use zenoh::key_expr::KeyExpr; use zenoh::liveliness::LivelinessToken; -use zenoh::prelude::r#async::*; +use zenoh::prelude::*; use zenoh::Session; @@ -100,17 +102,12 @@ impl Server { // register the queryables and declare a liveliness token let ke = format!("@rpc/{}/**", self.instance_uuid()); - let queryable = self - .session - .declare_queryable(&ke) - .res() - .await - .map_err(|e| { - Status::new( - Code::InternalError, - format!("Cannot declare queryable: {e:?}"), - ) - })?; + let queryable = self.session.declare_queryable(&ke).await.map_err(|e| { + Status::new( + Code::InternalError, + format!("Cannot declare queryable: {e:?}"), + ) + })?; for k in self.services.keys() { let ke = format!("@rpc/{}/service/{k}", self.instance_uuid()); @@ -118,7 +115,6 @@ impl Server { .session .liveliness() .declare_token(ke) - .res() .await .map_err(|e| { Status::new( @@ -162,13 +158,12 @@ impl Server { .clone(); let payload = query - .value() + .payload() .ok_or_else(|| { Status::internal_error("Query has empty value cannot proceed") })? - .payload - .contiguous() - .to_vec(); + .deserialize::>() + .unwrap_or_default(); // this is call to a service Box::pin(Self::service_call(svc, ke.clone(), payload)) @@ -187,16 +182,16 @@ impl Server { tokio::task::spawn(async move { let res = fut.await; let sample = match res { - Ok(data) => Sample::new(ke, data), + Ok(data) => data, Err(e) => { let wmgs = WireMessage { payload: None, status: e, }; - Sample::new(ke, serialize(&wmgs).unwrap_or_default()) + serialize(&wmgs).unwrap_or_default() } }; - let res = query.reply(Ok(sample)).res().await; + let res = query.reply(ke, sample).await; log::trace!("Query Result is: {res:?}"); }); } diff --git a/zrpc/src/zrpcresult.rs b/zrpc/src/zrpcresult.rs index 00faaf4..8658c1a 100644 --- a/zrpc/src/zrpcresult.rs +++ b/zrpc/src/zrpcresult.rs @@ -98,8 +98,8 @@ impl From for ZRPCError { } } -impl From for ZRPCError { - fn from(err: zenoh_util::core::Error) -> Self { +impl From for ZRPCError { + fn from(err: zenoh::Error) -> Self { ZRPCError::ZenohError(err.to_string()) } } From e6d9e66691ae8a50e60be80a7ab124d2d0a43b8a Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 12 Aug 2024 17:11:33 +0200 Subject: [PATCH 02/11] chore: bump zenoh 1.0.0, updating macros Signed-off-by: Gabriele Baldoni --- zrpc-derive/examples/service.rs | 3 +- zrpc-derive/examples/simplified.rs | 22 +++++---------- zrpc-derive/src/lib.rs | 19 ++++++------- zrpc-derive/tests/service.rs | 45 ++++++++++++++++-------------- 4 files changed, 40 insertions(+), 49 deletions(-) diff --git a/zrpc-derive/examples/service.rs b/zrpc-derive/examples/service.rs index fb099a6..e084255 100644 --- a/zrpc-derive/examples/service.rs +++ b/zrpc-derive/examples/service.rs @@ -69,13 +69,12 @@ impl Hello for MyServer { #[tokio::main] async fn main() { env_logger::init(); - use zenoh::prelude::r#async::*; let mut config = zenoh::config::Config::default(); config .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) .unwrap(); - let zsession = Arc::new(zenoh::open(config).res().await.unwrap()); + let zsession = Arc::new(zenoh::open(config).await.unwrap()); let z = zsession.clone(); let client = HelloClient::builder(zsession).build(); diff --git a/zrpc-derive/examples/simplified.rs b/zrpc-derive/examples/simplified.rs index 744b37f..4d42642 100644 --- a/zrpc-derive/examples/simplified.rs +++ b/zrpc-derive/examples/simplified.rs @@ -13,19 +13,18 @@ use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::sync::Mutex; +use zenoh::config::ZenohId; use zenoh::key_expr::format::KeFormat; -use zenoh::prelude::ZenohId; +use zenoh::key_expr::KeyExpr; use zrpc::prelude::*; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::ops::Deref; use std::str::{self, FromStr}; use std::time::Duration; -use zenoh::{Session, SessionDeclarations}; - -use serde::{Deserialize, Serialize}; -use zenoh::prelude::r#async::*; +use zenoh::{session::SessionDeclarations, Session}; // this is the user defined trait #[async_trait::async_trait] @@ -349,7 +348,6 @@ impl<'a> HelloClient<'a> { .z .liveliness() .get("@rpc/*/service/Hello") - .res() .await .map_err(|e| { Status::unavailable(format!("Unable to perform liveliness query: {e:?}")) @@ -357,13 +355,8 @@ impl<'a> HelloClient<'a> { let ids = res .into_iter() - .map(|e| { - self.extract_id_from_ke( - &e.sample - .map_err(|_| Status::unavailable("Cannot get value from sample"))? - .key_expr, - ) - }) + .filter_map(|e| e.into_result().ok()) + .map(|e| self.extract_id_from_ke(e.key_expr())) .collect::, Status>>()?; // get server metadata @@ -400,13 +393,12 @@ impl<'a> HelloClient<'a> { async fn main() { { env_logger::init(); - use zenoh::prelude::r#async::*; let mut config = zenoh::config::Config::default(); config .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) .unwrap(); - let zsession = Arc::new(zenoh::open(config).res().await.unwrap()); + let zsession = Arc::new(zenoh::open(config).await.unwrap()); let z = zsession.clone(); diff --git a/zrpc-derive/src/lib.rs b/zrpc-derive/src/lib.rs index cade450..677ce74 100644 --- a/zrpc-derive/src/lib.rs +++ b/zrpc-derive/src/lib.rs @@ -535,15 +535,13 @@ impl<'a> ServiceGenerator<'a> { #(#fns)* - async fn find_server(&self) -> std::result::Result { - use zenoh::prelude::r#async::AsyncResolve; - use zenoh::SessionDeclarations; + async fn find_server(&self) -> std::result::Result { + use zenoh::session::SessionDeclarations; let res = self .z .liveliness() .get(#rpc_ke) - .res() .await .map_err(|e| { zrpc::prelude::Status::unavailable(format!("Unable to perform liveliness query: {e:?}")) @@ -551,18 +549,17 @@ impl<'a> ServiceGenerator<'a> { let ids = res .into_iter() + .filter_map(|e| e.into_result().ok()) .map(|e| { self.extract_id_from_ke( - &e.sample - .map_err(|_| zrpc::prelude::Status::unavailable("Cannot get value from sample"))? - .key_expr, + e.key_expr() ) }) - .collect::, zrpc::prelude::Status>>()?; + .collect::, zrpc::prelude::Status>>()?; let metadatas = self.ch.get_servers_metadata(&ids, self.tout).await?; - let mut ids: Vec = metadatas + let mut ids: Vec = metadatas .into_iter() .filter(|m| m.labels.is_superset(&self.labels)) .map(|m| m.id) @@ -572,7 +569,7 @@ impl<'a> ServiceGenerator<'a> { .ok_or(zrpc::prelude::Status::unavailable("No servers found")) } - fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result { + fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result { use std::str::FromStr; let id_str = self .ke_format @@ -581,7 +578,7 @@ impl<'a> ServiceGenerator<'a> { .get("zid") .map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to get server id from key expression: {e:?}")))?; - zenoh::prelude::ZenohId::from_str(id_str) + zenoh::config::ZenohId::from_str(id_str) .map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}"))) } diff --git a/zrpc-derive/tests/service.rs b/zrpc-derive/tests/service.rs index 43f50a8..3fef5e9 100644 --- a/zrpc-derive/tests/service.rs +++ b/zrpc-derive/tests/service.rs @@ -11,12 +11,12 @@ * ADLINK fog05 team, *********************************************************************************/ -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; use tokio::sync::Mutex; use async_trait::async_trait; +use zenoh::{config::{EndPoint, ZenohId}, session::SessionDeclarations}; //importing the macros -use zenoh::prelude::r#async::*; use zrpc::prelude::*; use zrpc_derive::service; @@ -59,22 +59,25 @@ fn configure_zenoh(id: ZenohId, listen: String, connect: String) -> zenoh::confi .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); - config.listen.endpoints.push(listen.parse().unwrap()); - config.connect.endpoints.push(connect.parse().unwrap()); + + let listen : Vec = vec![listen.parse().unwrap()]; + let connect : Vec = vec![connect.parse().unwrap()]; + config.listen.endpoints.set(listen).unwrap(); + config.connect.endpoints.set(connect).unwrap(); config } async fn wait_for_peer(session: &zenoh::Session, id: ZenohId) { - while !session.info().peers_zid().res().await.any(|e| e == id) { + while !session.info().peers_zid().await.any(|e| e == id) { tokio::time::sleep(std::time::Duration::from_secs(1)).await } } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn service_call() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a0").unwrap(); + let client_zid = ZenohId::from_str("a1").unwrap(); let client_config = configure_zenoh( client_zid, @@ -82,7 +85,7 @@ async fn service_call() { "tcp/127.0.0.1:9002".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); let c_zid_server = server_zid; tokio::task::spawn(async move { @@ -91,7 +94,7 @@ async fn service_call() { "tcp/127.0.0.1:9002".to_string(), "tcp/127.0.0.1:9003".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); + let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); wait_for_peer(&server_session, client_zid).await; let service = MyServer { @@ -138,8 +141,8 @@ async fn service_call() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn service_unavailable() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a2").unwrap(); + let client_zid = ZenohId::from_str("a3").unwrap(); let server_config = configure_zenoh( server_zid, @@ -153,8 +156,8 @@ async fn service_unavailable() { "tcp/127.0.0.1:9004".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); + let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); // Check zenoh sessions are connected wait_for_peer(&server_session, client_zid).await; @@ -170,8 +173,8 @@ async fn service_unavailable() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn server_not_matching() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a4").unwrap(); + let client_zid = ZenohId::from_str("a5").unwrap(); let client_config = configure_zenoh( client_zid, @@ -179,7 +182,7 @@ async fn server_not_matching() { "tcp/127.0.0.1:9006".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); let c_zid_server = server_zid; let st = tokio::task::spawn(async move { @@ -188,7 +191,7 @@ async fn server_not_matching() { "tcp/127.0.0.1:9006".to_string(), "tcp/127.0.0.1:9007".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); + let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); wait_for_peer(&server_session, client_zid).await; let service = MyServer { @@ -220,8 +223,8 @@ async fn server_not_matching() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn server_matching() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a6").unwrap(); + let client_zid = ZenohId::from_str("a7").unwrap(); let client_config = configure_zenoh( client_zid, @@ -229,7 +232,7 @@ async fn server_matching() { "tcp/127.0.0.1:9008".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); let c_zid_server = server_zid; tokio::task::spawn(async move { @@ -238,7 +241,7 @@ async fn server_matching() { "tcp/127.0.0.1:9008".to_string(), "tcp/127.0.0.1:9009".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); + let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); wait_for_peer(&server_session, client_zid).await; let service = MyServer { From 53f5cb8c2523487a55989a28bb4337226e34db21 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 12 Aug 2024 17:11:55 +0200 Subject: [PATCH 03/11] style: cargo format Signed-off-by: Gabriele Baldoni --- zrpc-derive/tests/service.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/zrpc-derive/tests/service.rs b/zrpc-derive/tests/service.rs index 3fef5e9..9495f8c 100644 --- a/zrpc-derive/tests/service.rs +++ b/zrpc-derive/tests/service.rs @@ -15,7 +15,10 @@ use std::{str::FromStr, sync::Arc}; use tokio::sync::Mutex; use async_trait::async_trait; -use zenoh::{config::{EndPoint, ZenohId}, session::SessionDeclarations}; +use zenoh::{ + config::{EndPoint, ZenohId}, + session::SessionDeclarations, +}; //importing the macros use zrpc::prelude::*; use zrpc_derive::service; @@ -60,8 +63,8 @@ fn configure_zenoh(id: ZenohId, listen: String, connect: String) -> zenoh::confi .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); - let listen : Vec = vec![listen.parse().unwrap()]; - let connect : Vec = vec![connect.parse().unwrap()]; + let listen: Vec = vec![listen.parse().unwrap()]; + let connect: Vec = vec![connect.parse().unwrap()]; config.listen.endpoints.set(listen).unwrap(); config.connect.endpoints.set(connect).unwrap(); @@ -141,8 +144,8 @@ async fn service_call() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn service_unavailable() { - let server_zid = ZenohId::from_str("a2").unwrap(); - let client_zid = ZenohId::from_str("a3").unwrap(); + let server_zid = ZenohId::from_str("a2").unwrap(); + let client_zid = ZenohId::from_str("a3").unwrap(); let server_config = configure_zenoh( server_zid, @@ -174,7 +177,7 @@ async fn service_unavailable() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn server_not_matching() { let server_zid = ZenohId::from_str("a4").unwrap(); - let client_zid = ZenohId::from_str("a5").unwrap(); + let client_zid = ZenohId::from_str("a5").unwrap(); let client_config = configure_zenoh( client_zid, @@ -223,8 +226,8 @@ async fn server_not_matching() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn server_matching() { - let server_zid = ZenohId::from_str("a6").unwrap(); - let client_zid = ZenohId::from_str("a7").unwrap(); + let server_zid = ZenohId::from_str("a6").unwrap(); + let client_zid = ZenohId::from_str("a7").unwrap(); let client_config = configure_zenoh( client_zid, From 19799cfaafa38938e88e95b3cccd5d64a89ddfb3 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 12 Aug 2024 17:15:33 +0200 Subject: [PATCH 04/11] ci: adding cleanup step Signed-off-by: Gabriele Baldoni --- .github/workflows/rust.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 58d82a5..02b7a28 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -61,4 +61,10 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: --release --verbose \ No newline at end of file + args: --release --verbose + + - name: Clean up + if: always() + uses: actions-rs/cargo@v1 + with: + command: clean \ No newline at end of file From 2d4342d58dbc4b3ab251f72a012f20102e6c0a6a Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Fri, 30 Aug 2024 15:19:51 +0200 Subject: [PATCH 05/11] deps: tracking zenoh main Signed-off-by: Gabriele Baldoni --- Cargo.toml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c89745e..26ee215 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,10 +62,10 @@ tokio = { version = "1.35.1", default-features = false, features = [ "rt", "time", ] } -zenoh = { version = "1.0.0-alpha.6", default-features = false } -zenoh-codec = { version = "1.0.0-alpha.6" } -zenoh-core = { version = "1.0.0-alpha.6" } -zenoh-ext = { version = "1.0.0-alpha.6" } -zenoh-macros = { version = "1.0.0-alpha.6" } -zenoh-protocol = { version = "1.0.0-alpha.6" } -zenoh-util = { version = "1.0.0-alpha.6" } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main", default-features = false } +zenoh-codec = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } +zenoh-core = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } +zenoh-macros = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } +zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } +zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } From 4790d74951e03a47ff31a6e086f7bc46430eebc6 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Fri, 13 Sep 2024 09:34:34 +0200 Subject: [PATCH 06/11] deps: zenoh Signed-off-by: Gabriele Baldoni --- zrpc-derive/examples/service.rs | 2 +- zrpc-derive/examples/simplified.rs | 10 +++++----- zrpc-derive/src/lib.rs | 7 +++---- zrpc-derive/tests/service.rs | 21 +++++++++------------ zrpc/src/rpcchannel.rs | 5 ++--- zrpc/src/server.rs | 11 +++++------ 6 files changed, 25 insertions(+), 31 deletions(-) diff --git a/zrpc-derive/examples/service.rs b/zrpc-derive/examples/service.rs index e084255..b1a330d 100644 --- a/zrpc-derive/examples/service.rs +++ b/zrpc-derive/examples/service.rs @@ -74,7 +74,7 @@ async fn main() { config .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) .unwrap(); - let zsession = Arc::new(zenoh::open(config).await.unwrap()); + let zsession = zenoh::open(config).await.unwrap(); let z = zsession.clone(); let client = HelloClient::builder(zsession).build(); diff --git a/zrpc-derive/examples/simplified.rs b/zrpc-derive/examples/simplified.rs index 4d42642..70ca96e 100644 --- a/zrpc-derive/examples/simplified.rs +++ b/zrpc-derive/examples/simplified.rs @@ -24,7 +24,7 @@ use std::collections::HashSet; use std::ops::Deref; use std::str::{self, FromStr}; use std::time::Duration; -use zenoh::{session::SessionDeclarations, Session}; +use zenoh::Session; // this is the user defined trait #[async_trait::async_trait] @@ -238,7 +238,7 @@ impl Deref for SubResponse { } pub struct HelloClientBuilder<'a> { - pub z: Arc, + pub z: Session, pub labels: HashSet, ke_format: KeFormat<'a>, pub tout: Duration, @@ -283,7 +283,7 @@ impl<'a> HelloClientBuilder<'a> { pub struct HelloClient<'a> { pub(crate) ch: RPCClientChannel, pub(crate) ke_format: KeFormat<'a>, - pub(crate) z: Arc, + pub(crate) z: Session, pub(crate) tout: Duration, pub(crate) labels: HashSet, } @@ -291,7 +291,7 @@ pub struct HelloClient<'a> { // generated client code impl<'a> HelloClient<'a> { - pub fn builder(z: Arc) -> HelloClientBuilder<'a> { + pub fn builder(z: zenoh::Session) -> HelloClientBuilder<'a> { HelloClientBuilder { z, labels: HashSet::new(), @@ -398,7 +398,7 @@ async fn main() { config .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) .unwrap(); - let zsession = Arc::new(zenoh::open(config).await.unwrap()); + let zsession = zenoh::open(config).await.unwrap(); let z = zsession.clone(); diff --git a/zrpc-derive/src/lib.rs b/zrpc-derive/src/lib.rs index 677ce74..81671a8 100644 --- a/zrpc-derive/src/lib.rs +++ b/zrpc-derive/src/lib.rs @@ -467,7 +467,7 @@ impl<'a> ServiceGenerator<'a> { quote! { #[automatically_derived] #vis struct #client_builder_ident<'a> { - pub(crate) z: std::sync::Arc, + pub(crate) z: zenoh::Session, ke_format: zenoh::key_expr::format::KeFormat<'a>, pub(crate) tout: std::time::Duration, pub(crate) labels: std::collections::HashSet, @@ -513,7 +513,7 @@ impl<'a> ServiceGenerator<'a> { #[derive(Clone, Debug)] #vis struct #client_ident<'a> { pub(crate) ch : zrpc::prelude::RPCClientChannel, - pub(crate) z: std::sync::Arc, + pub(crate) z: zenoh::Session, pub(crate) ke_format: zenoh::key_expr::format::KeFormat<'a>, pub(crate) tout: std::time::Duration, pub(crate) labels: std::collections::HashSet, @@ -523,7 +523,7 @@ impl<'a> ServiceGenerator<'a> { impl<'a> #client_ident<'a> { - #vis fn builder(z: std::sync::Arc) -> #client_builder_ident<'a> { + #vis fn builder(z: zenoh::Session) -> #client_builder_ident<'a> { #client_builder_ident { z, labels: std::collections::HashSet::new(), @@ -536,7 +536,6 @@ impl<'a> ServiceGenerator<'a> { #(#fns)* async fn find_server(&self) -> std::result::Result { - use zenoh::session::SessionDeclarations; let res = self .z diff --git a/zrpc-derive/tests/service.rs b/zrpc-derive/tests/service.rs index 9495f8c..a7e6159 100644 --- a/zrpc-derive/tests/service.rs +++ b/zrpc-derive/tests/service.rs @@ -15,10 +15,7 @@ use std::{str::FromStr, sync::Arc}; use tokio::sync::Mutex; use async_trait::async_trait; -use zenoh::{ - config::{EndPoint, ZenohId}, - session::SessionDeclarations, -}; +use zenoh::config::{EndPoint, ZenohId}; //importing the macros use zrpc::prelude::*; use zrpc_derive::service; @@ -88,7 +85,7 @@ async fn service_call() { "tcp/127.0.0.1:9002".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); + let client_session = zenoh::open(client_config).await.unwrap(); let c_zid_server = server_zid; tokio::task::spawn(async move { @@ -97,7 +94,7 @@ async fn service_call() { "tcp/127.0.0.1:9002".to_string(), "tcp/127.0.0.1:9003".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); wait_for_peer(&server_session, client_zid).await; let service = MyServer { @@ -159,8 +156,8 @@ async fn service_unavailable() { "tcp/127.0.0.1:9004".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); - let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); + let client_session = zenoh::open(client_config).await.unwrap(); // Check zenoh sessions are connected wait_for_peer(&server_session, client_zid).await; @@ -185,7 +182,7 @@ async fn server_not_matching() { "tcp/127.0.0.1:9006".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); + let client_session = zenoh::open(client_config).await.unwrap(); let c_zid_server = server_zid; let st = tokio::task::spawn(async move { @@ -194,7 +191,7 @@ async fn server_not_matching() { "tcp/127.0.0.1:9006".to_string(), "tcp/127.0.0.1:9007".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); wait_for_peer(&server_session, client_zid).await; let service = MyServer { @@ -235,7 +232,7 @@ async fn server_matching() { "tcp/127.0.0.1:9008".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).await.unwrap()); + let client_session = zenoh::open(client_config).await.unwrap(); let c_zid_server = server_zid; tokio::task::spawn(async move { @@ -244,7 +241,7 @@ async fn server_matching() { "tcp/127.0.0.1:9008".to_string(), "tcp/127.0.0.1:9009".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); wait_for_peer(&server_session, client_zid).await; let service = MyServer { diff --git a/zrpc/src/rpcchannel.rs b/zrpc/src/rpcchannel.rs index 150d341..9300fbd 100644 --- a/zrpc/src/rpcchannel.rs +++ b/zrpc/src/rpcchannel.rs @@ -19,7 +19,6 @@ use std::time::Duration; use flume::Receiver; use log::trace; use serde::{Deserialize, Serialize}; -use std::sync::Arc; use zenoh::config::ZenohId; use zenoh::query::*; use zenoh::Session; @@ -35,12 +34,12 @@ use crate::zrpcresult::ZRPCResult; #[derive(Clone, Debug)] pub struct RPCClientChannel { - z: Arc, + z: Session, service_name: String, } impl RPCClientChannel { - pub fn new(z: Arc, service_name: IntoString) -> RPCClientChannel + pub fn new(z: Session, service_name: IntoString) -> RPCClientChannel where IntoString: Into, { diff --git a/zrpc/src/server.rs b/zrpc/src/server.rs index d5e0fc7..9371a3a 100644 --- a/zrpc/src/server.rs +++ b/zrpc/src/server.rs @@ -17,7 +17,6 @@ use tokio::sync::Mutex; use zenoh::config::ZenohId; use zenoh::key_expr::KeyExpr; use zenoh::liveliness::LivelinessToken; -use zenoh::prelude::*; use zenoh::Session; @@ -27,13 +26,13 @@ use crate::status::{Code, Status}; use crate::types::{Message, ServerMetadata, ServerTaskFuture, WireMessage}; pub struct ServerBuilder { - pub(crate) session: Arc, + pub(crate) session: Session, pub(crate) services: HashMap>, pub(crate) labels: HashSet, } impl ServerBuilder { - pub fn session(mut self, session: Arc) -> Self { + pub fn session(mut self, session: Session) -> Self { self.session = session; self } @@ -76,14 +75,14 @@ impl ServerBuilder { } pub struct Server { - pub(crate) session: Arc, + pub(crate) session: Session, pub(crate) services: HashMap>, - pub(crate) tokens: Arc>>>, + pub(crate) tokens: Arc>>, pub(crate) labels: HashSet, } impl Server { - pub fn builder(session: Arc) -> ServerBuilder { + pub fn builder(session: Session) -> ServerBuilder { ServerBuilder { session, services: HashMap::new(), From c0314342da71df621151cf7c206d9b0cd760c98a Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Thu, 3 Oct 2024 16:52:04 +0200 Subject: [PATCH 07/11] deps: updated zenoh Signed-off-by: Gabriele Baldoni --- zrpc-derive/examples/service.rs | 4 +--- zrpc-derive/examples/simplified.rs | 4 +--- zrpc-derive/tests/service.rs | 2 +- zrpc/src/rpcchannel.rs | 7 ++----- zrpc/src/server.rs | 4 ++-- 5 files changed, 7 insertions(+), 14 deletions(-) diff --git a/zrpc-derive/examples/service.rs b/zrpc-derive/examples/service.rs index b1a330d..0a3e157 100644 --- a/zrpc-derive/examples/service.rs +++ b/zrpc-derive/examples/service.rs @@ -71,9 +71,7 @@ async fn main() { env_logger::init(); let mut config = zenoh::config::Config::default(); - config - .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) - .unwrap(); + config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap(); let zsession = zenoh::open(config).await.unwrap(); let z = zsession.clone(); diff --git a/zrpc-derive/examples/simplified.rs b/zrpc-derive/examples/simplified.rs index 70ca96e..f0861eb 100644 --- a/zrpc-derive/examples/simplified.rs +++ b/zrpc-derive/examples/simplified.rs @@ -395,9 +395,7 @@ async fn main() { env_logger::init(); let mut config = zenoh::config::Config::default(); - config - .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) - .unwrap(); + config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap(); let zsession = zenoh::open(config).await.unwrap(); let z = zsession.clone(); diff --git a/zrpc-derive/tests/service.rs b/zrpc-derive/tests/service.rs index a7e6159..b8a0c5e 100644 --- a/zrpc-derive/tests/service.rs +++ b/zrpc-derive/tests/service.rs @@ -56,7 +56,7 @@ fn configure_zenoh(id: ZenohId, listen: String, connect: String) -> zenoh::confi let mut config = zenoh::config::Config::default(); config.set_id(id).unwrap(); config - .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) + .set_mode(Some(zenoh::config::WhatAmI::Peer)) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); diff --git a/zrpc/src/rpcchannel.rs b/zrpc/src/rpcchannel.rs index 9300fbd..4a217df 100644 --- a/zrpc/src/rpcchannel.rs +++ b/zrpc/src/rpcchannel.rs @@ -106,10 +106,7 @@ impl RPCClientChannel { Ok(sample) => { // This is infallible, using unwrap_or_default so that if cannot get // the vec then the deseriazliation fail on an empty one. - let raw_data = sample - .payload() - .deserialize::>() - .unwrap_or_default(); + let raw_data = sample.payload().to_bytes().to_vec(); let wmsg: WireMessage = deserialize(&raw_data).map_err(|e| { Status::new(Code::InternalError, format!("deserialization error: {e:?}")) @@ -164,7 +161,7 @@ impl RPCClientChannel { // getting only the ones we can deserialize .filter_map(|s| { // This is infallible - let raw_data = s.payload().deserialize::>().unwrap_or_default(); + let raw_data = s.payload().to_bytes().to_vec(); deserialize::(&raw_data).ok() }) // get only the ones that do not have errors diff --git a/zrpc/src/server.rs b/zrpc/src/server.rs index 9371a3a..6142d90 100644 --- a/zrpc/src/server.rs +++ b/zrpc/src/server.rs @@ -161,8 +161,8 @@ impl Server { .ok_or_else(|| { Status::internal_error("Query has empty value cannot proceed") })? - .deserialize::>() - .unwrap_or_default(); + .to_bytes() + .to_vec(); // this is call to a service Box::pin(Self::service_call(svc, ke.clone(), payload)) From 5b03343f62071fffaa0ca38d7586c3c675602b81 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 8 Oct 2024 14:41:04 +0200 Subject: [PATCH 08/11] deps: sync zenoh Signed-off-by: Gabriele Baldoni --- zrpc-derive/tests/service.rs | 4 +--- zrpc/src/rpcchannel.rs | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/zrpc-derive/tests/service.rs b/zrpc-derive/tests/service.rs index b8a0c5e..b67e106 100644 --- a/zrpc-derive/tests/service.rs +++ b/zrpc-derive/tests/service.rs @@ -55,9 +55,7 @@ impl Hello for MyServer { fn configure_zenoh(id: ZenohId, listen: String, connect: String) -> zenoh::config::Config { let mut config = zenoh::config::Config::default(); config.set_id(id).unwrap(); - config - .set_mode(Some(zenoh::config::WhatAmI::Peer)) - .unwrap(); + config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); let listen: Vec = vec![listen.parse().unwrap()]; diff --git a/zrpc/src/rpcchannel.rs b/zrpc/src/rpcchannel.rs index 4a217df..5751198 100644 --- a/zrpc/src/rpcchannel.rs +++ b/zrpc/src/rpcchannel.rs @@ -16,10 +16,10 @@ extern crate serde; use std::time::Duration; -use flume::Receiver; use log::trace; use serde::{Deserialize, Serialize}; use zenoh::config::ZenohId; +use zenoh::handlers::FifoChannelHandler; use zenoh::query::*; use zenoh::Session; @@ -59,7 +59,7 @@ impl RPCClientChannel { request: &Request, method: &str, tout: Duration, - ) -> ZRPCResult> + ) -> ZRPCResult> where T: Serialize + Clone + std::fmt::Debug, for<'de2> T: Deserialize<'de2>, From 27c90ab488c97adc13ceb7aee69f87463400a0c6 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Tue, 15 Oct 2024 09:49:08 +0200 Subject: [PATCH 09/11] chore: prepare beta release Signed-off-by: Gabriele Baldoni --- Cargo.toml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 26ee215..cdc94be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ homepage = "https://github.com/ZettaScaleLabs/zenoh-rpc" license = " EPL-2.0 OR Apache-2.0" readme = "README.md" repository = "https://github.com/ZettaScaleLabs/zenoh-rpc" -version = "0.8.0-alpha.1" +version = "0.8.0-beta.1" [profile.release] codegen-units = 1 @@ -62,10 +62,10 @@ tokio = { version = "1.35.1", default-features = false, features = [ "rt", "time", ] } -zenoh = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main", default-features = false } -zenoh-codec = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } -zenoh-core = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } -zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } -zenoh-macros = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } -zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } -zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "main" } +zenoh = { version= "1.0.0-rc.1", default-features = false } +zenoh-codec = { version= "1.0.0-rc.1" } +zenoh-core = { version= "1.0.0-rc.1" } +zenoh-ext = { version= "1.0.0-rc.1" } +zenoh-macros = { version= "1.0.0-rc.1" } +zenoh-protocol = { version= "1.0.0-rc.1" } +zenoh-util = { version= "1.0.0-rc.1" } From 6c15614a8ce66871783c6b210618bf90130dd952 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 21 Oct 2024 16:40:31 +0200 Subject: [PATCH 10/11] deps: zenoh 1.0.0 Signed-off-by: Gabriele Baldoni --- Cargo.toml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cdc94be..719e40e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,10 +62,10 @@ tokio = { version = "1.35.1", default-features = false, features = [ "rt", "time", ] } -zenoh = { version= "1.0.0-rc.1", default-features = false } -zenoh-codec = { version= "1.0.0-rc.1" } -zenoh-core = { version= "1.0.0-rc.1" } -zenoh-ext = { version= "1.0.0-rc.1" } -zenoh-macros = { version= "1.0.0-rc.1" } -zenoh-protocol = { version= "1.0.0-rc.1" } -zenoh-util = { version= "1.0.0-rc.1" } +zenoh = { version= "1.0.0", default-features = false } +zenoh-codec = { version= "1.0.0" } +zenoh-core = { version= "1.0.0" } +zenoh-ext = { version= "1.0.0" } +zenoh-macros = { version= "1.0.0" } +zenoh-protocol = { version= "1.0.0" } +zenoh-util = { version= "1.0.0" } From 92d292cc33b4212f198a6d5bf1fc7bb54f46317f Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Mon, 21 Oct 2024 16:41:23 +0200 Subject: [PATCH 11/11] chore: release 0.8.0 Signed-off-by: Gabriele Baldoni --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 719e40e..a6d3dce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ homepage = "https://github.com/ZettaScaleLabs/zenoh-rpc" license = " EPL-2.0 OR Apache-2.0" readme = "README.md" repository = "https://github.com/ZettaScaleLabs/zenoh-rpc" -version = "0.8.0-beta.1" +version = "0.8.0" [profile.release] codegen-units = 1