diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 58d82a5..02b7a28 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -61,4 +61,10 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: --release --verbose \ No newline at end of file + args: --release --verbose + + - name: Clean up + if: always() + uses: actions-rs/cargo@v1 + with: + command: clean \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 8f54358..a6d3dce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ homepage = "https://github.com/ZettaScaleLabs/zenoh-rpc" license = " EPL-2.0 OR Apache-2.0" readme = "README.md" repository = "https://github.com/ZettaScaleLabs/zenoh-rpc" -version = "0.8.0-alpha.1" +version = "0.8.0" [profile.release] codegen-units = 1 @@ -62,10 +62,10 @@ tokio = { version = "1.35.1", default-features = false, features = [ "rt", "time", ] } -zenoh = { version = "0.11.0", default-features = false } -zenoh-codec = { version = "0.11.0" } -zenoh-core = { version = "0.11.0" } -zenoh-ext = { version = "0.11.0" } -zenoh-macros = { version = "0.11.0" } -zenoh-protocol = { version = "0.11.0" } -zenoh-util = { version = "0.11.0" } +zenoh = { version= "1.0.0", default-features = false } +zenoh-codec = { version= "1.0.0" } +zenoh-core = { version= "1.0.0" } +zenoh-ext = { version= "1.0.0" } +zenoh-macros = { version= "1.0.0" } +zenoh-protocol = { version= "1.0.0" } +zenoh-util = { version= "1.0.0" } diff --git a/zrpc-derive/examples/service.rs b/zrpc-derive/examples/service.rs index fb099a6..0a3e157 100644 --- a/zrpc-derive/examples/service.rs +++ b/zrpc-derive/examples/service.rs @@ -69,13 +69,10 @@ impl Hello for MyServer { #[tokio::main] async fn main() { env_logger::init(); - use zenoh::prelude::r#async::*; let mut config = zenoh::config::Config::default(); - config - .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) - .unwrap(); - let zsession = Arc::new(zenoh::open(config).res().await.unwrap()); + config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap(); + let zsession = zenoh::open(config).await.unwrap(); let z = zsession.clone(); let client = HelloClient::builder(zsession).build(); diff --git a/zrpc-derive/examples/simplified.rs b/zrpc-derive/examples/simplified.rs index 744b37f..f0861eb 100644 --- a/zrpc-derive/examples/simplified.rs +++ b/zrpc-derive/examples/simplified.rs @@ -13,19 +13,18 @@ use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::sync::Mutex; +use zenoh::config::ZenohId; use zenoh::key_expr::format::KeFormat; -use zenoh::prelude::ZenohId; +use zenoh::key_expr::KeyExpr; use zrpc::prelude::*; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::ops::Deref; use std::str::{self, FromStr}; use std::time::Duration; -use zenoh::{Session, SessionDeclarations}; - -use serde::{Deserialize, Serialize}; -use zenoh::prelude::r#async::*; +use zenoh::Session; // this is the user defined trait #[async_trait::async_trait] @@ -239,7 +238,7 @@ impl Deref for SubResponse { } pub struct HelloClientBuilder<'a> { - pub z: Arc, + pub z: Session, pub labels: HashSet, ke_format: KeFormat<'a>, pub tout: Duration, @@ -284,7 +283,7 @@ impl<'a> HelloClientBuilder<'a> { pub struct HelloClient<'a> { pub(crate) ch: RPCClientChannel, pub(crate) ke_format: KeFormat<'a>, - pub(crate) z: Arc, + pub(crate) z: Session, pub(crate) tout: Duration, pub(crate) labels: HashSet, } @@ -292,7 +291,7 @@ pub struct HelloClient<'a> { // generated client code impl<'a> HelloClient<'a> { - pub fn builder(z: Arc) -> HelloClientBuilder<'a> { + pub fn builder(z: zenoh::Session) -> HelloClientBuilder<'a> { HelloClientBuilder { z, labels: HashSet::new(), @@ -349,7 +348,6 @@ impl<'a> HelloClient<'a> { .z .liveliness() .get("@rpc/*/service/Hello") - .res() .await .map_err(|e| { Status::unavailable(format!("Unable to perform liveliness query: {e:?}")) @@ -357,13 +355,8 @@ impl<'a> HelloClient<'a> { let ids = res .into_iter() - .map(|e| { - self.extract_id_from_ke( - &e.sample - .map_err(|_| Status::unavailable("Cannot get value from sample"))? - .key_expr, - ) - }) + .filter_map(|e| e.into_result().ok()) + .map(|e| self.extract_id_from_ke(e.key_expr())) .collect::, Status>>()?; // get server metadata @@ -400,13 +393,10 @@ impl<'a> HelloClient<'a> { async fn main() { { env_logger::init(); - use zenoh::prelude::r#async::*; let mut config = zenoh::config::Config::default(); - config - .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) - .unwrap(); - let zsession = Arc::new(zenoh::open(config).res().await.unwrap()); + config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap(); + let zsession = zenoh::open(config).await.unwrap(); let z = zsession.clone(); diff --git a/zrpc-derive/src/lib.rs b/zrpc-derive/src/lib.rs index cade450..81671a8 100644 --- a/zrpc-derive/src/lib.rs +++ b/zrpc-derive/src/lib.rs @@ -467,7 +467,7 @@ impl<'a> ServiceGenerator<'a> { quote! { #[automatically_derived] #vis struct #client_builder_ident<'a> { - pub(crate) z: std::sync::Arc, + pub(crate) z: zenoh::Session, ke_format: zenoh::key_expr::format::KeFormat<'a>, pub(crate) tout: std::time::Duration, pub(crate) labels: std::collections::HashSet, @@ -513,7 +513,7 @@ impl<'a> ServiceGenerator<'a> { #[derive(Clone, Debug)] #vis struct #client_ident<'a> { pub(crate) ch : zrpc::prelude::RPCClientChannel, - pub(crate) z: std::sync::Arc, + pub(crate) z: zenoh::Session, pub(crate) ke_format: zenoh::key_expr::format::KeFormat<'a>, pub(crate) tout: std::time::Duration, pub(crate) labels: std::collections::HashSet, @@ -523,7 +523,7 @@ impl<'a> ServiceGenerator<'a> { impl<'a> #client_ident<'a> { - #vis fn builder(z: std::sync::Arc) -> #client_builder_ident<'a> { + #vis fn builder(z: zenoh::Session) -> #client_builder_ident<'a> { #client_builder_ident { z, labels: std::collections::HashSet::new(), @@ -535,15 +535,12 @@ impl<'a> ServiceGenerator<'a> { #(#fns)* - async fn find_server(&self) -> std::result::Result { - use zenoh::prelude::r#async::AsyncResolve; - use zenoh::SessionDeclarations; + async fn find_server(&self) -> std::result::Result { let res = self .z .liveliness() .get(#rpc_ke) - .res() .await .map_err(|e| { zrpc::prelude::Status::unavailable(format!("Unable to perform liveliness query: {e:?}")) @@ -551,18 +548,17 @@ impl<'a> ServiceGenerator<'a> { let ids = res .into_iter() + .filter_map(|e| e.into_result().ok()) .map(|e| { self.extract_id_from_ke( - &e.sample - .map_err(|_| zrpc::prelude::Status::unavailable("Cannot get value from sample"))? - .key_expr, + e.key_expr() ) }) - .collect::, zrpc::prelude::Status>>()?; + .collect::, zrpc::prelude::Status>>()?; let metadatas = self.ch.get_servers_metadata(&ids, self.tout).await?; - let mut ids: Vec = metadatas + let mut ids: Vec = metadatas .into_iter() .filter(|m| m.labels.is_superset(&self.labels)) .map(|m| m.id) @@ -572,7 +568,7 @@ impl<'a> ServiceGenerator<'a> { .ok_or(zrpc::prelude::Status::unavailable("No servers found")) } - fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result { + fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result { use std::str::FromStr; let id_str = self .ke_format @@ -581,7 +577,7 @@ impl<'a> ServiceGenerator<'a> { .get("zid") .map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to get server id from key expression: {e:?}")))?; - zenoh::prelude::ZenohId::from_str(id_str) + zenoh::config::ZenohId::from_str(id_str) .map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}"))) } diff --git a/zrpc-derive/tests/service.rs b/zrpc-derive/tests/service.rs index 43f50a8..b67e106 100644 --- a/zrpc-derive/tests/service.rs +++ b/zrpc-derive/tests/service.rs @@ -11,12 +11,12 @@ * ADLINK fog05 team, *********************************************************************************/ -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; use tokio::sync::Mutex; use async_trait::async_trait; +use zenoh::config::{EndPoint, ZenohId}; //importing the macros -use zenoh::prelude::r#async::*; use zrpc::prelude::*; use zrpc_derive::service; @@ -55,26 +55,27 @@ impl Hello for MyServer { fn configure_zenoh(id: ZenohId, listen: String, connect: String) -> zenoh::config::Config { let mut config = zenoh::config::Config::default(); config.set_id(id).unwrap(); - config - .set_mode(Some(zenoh::config::whatami::WhatAmI::Peer)) - .unwrap(); + config.set_mode(Some(zenoh::config::WhatAmI::Peer)).unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); - config.listen.endpoints.push(listen.parse().unwrap()); - config.connect.endpoints.push(connect.parse().unwrap()); + + let listen: Vec = vec![listen.parse().unwrap()]; + let connect: Vec = vec![connect.parse().unwrap()]; + config.listen.endpoints.set(listen).unwrap(); + config.connect.endpoints.set(connect).unwrap(); config } async fn wait_for_peer(session: &zenoh::Session, id: ZenohId) { - while !session.info().peers_zid().res().await.any(|e| e == id) { + while !session.info().peers_zid().await.any(|e| e == id) { tokio::time::sleep(std::time::Duration::from_secs(1)).await } } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn service_call() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a0").unwrap(); + let client_zid = ZenohId::from_str("a1").unwrap(); let client_config = configure_zenoh( client_zid, @@ -82,7 +83,7 @@ async fn service_call() { "tcp/127.0.0.1:9002".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let client_session = zenoh::open(client_config).await.unwrap(); let c_zid_server = server_zid; tokio::task::spawn(async move { @@ -91,7 +92,7 @@ async fn service_call() { "tcp/127.0.0.1:9002".to_string(), "tcp/127.0.0.1:9003".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); wait_for_peer(&server_session, client_zid).await; let service = MyServer { @@ -138,8 +139,8 @@ async fn service_call() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn service_unavailable() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a2").unwrap(); + let client_zid = ZenohId::from_str("a3").unwrap(); let server_config = configure_zenoh( server_zid, @@ -153,8 +154,8 @@ async fn service_unavailable() { "tcp/127.0.0.1:9004".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); + let client_session = zenoh::open(client_config).await.unwrap(); // Check zenoh sessions are connected wait_for_peer(&server_session, client_zid).await; @@ -170,8 +171,8 @@ async fn service_unavailable() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn server_not_matching() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a4").unwrap(); + let client_zid = ZenohId::from_str("a5").unwrap(); let client_config = configure_zenoh( client_zid, @@ -179,7 +180,7 @@ async fn server_not_matching() { "tcp/127.0.0.1:9006".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let client_session = zenoh::open(client_config).await.unwrap(); let c_zid_server = server_zid; let st = tokio::task::spawn(async move { @@ -188,7 +189,7 @@ async fn server_not_matching() { "tcp/127.0.0.1:9006".to_string(), "tcp/127.0.0.1:9007".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); wait_for_peer(&server_session, client_zid).await; let service = MyServer { @@ -220,8 +221,8 @@ async fn server_not_matching() { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn server_matching() { - let server_zid = ZenohId::rand(); - let client_zid = ZenohId::rand(); + let server_zid = ZenohId::from_str("a6").unwrap(); + let client_zid = ZenohId::from_str("a7").unwrap(); let client_config = configure_zenoh( client_zid, @@ -229,7 +230,7 @@ async fn server_matching() { "tcp/127.0.0.1:9008".to_string(), ); - let client_session = Arc::new(zenoh::open(client_config).res().await.unwrap()); + let client_session = zenoh::open(client_config).await.unwrap(); let c_zid_server = server_zid; tokio::task::spawn(async move { @@ -238,7 +239,7 @@ async fn server_matching() { "tcp/127.0.0.1:9008".to_string(), "tcp/127.0.0.1:9009".to_string(), ); - let server_session = Arc::new(zenoh::open(server_config).res().await.unwrap()); + let server_session = zenoh::open(server_config).await.unwrap(); wait_for_peer(&server_session, client_zid).await; let service = MyServer { diff --git a/zrpc/src/rpcchannel.rs b/zrpc/src/rpcchannel.rs index a6b7718..5751198 100644 --- a/zrpc/src/rpcchannel.rs +++ b/zrpc/src/rpcchannel.rs @@ -16,11 +16,10 @@ extern crate serde; use std::time::Duration; -use flume::Receiver; use log::trace; use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use zenoh::prelude::r#async::*; +use zenoh::config::ZenohId; +use zenoh::handlers::FifoChannelHandler; use zenoh::query::*; use zenoh::Session; @@ -35,12 +34,12 @@ use crate::zrpcresult::ZRPCResult; #[derive(Clone, Debug)] pub struct RPCClientChannel { - z: Arc, + z: Session, service_name: String, } impl RPCClientChannel { - pub fn new(z: Arc, service_name: IntoString) -> RPCClientChannel + pub fn new(z: Session, service_name: IntoString) -> RPCClientChannel where IntoString: Into, { @@ -60,7 +59,7 @@ impl RPCClientChannel { request: &Request, method: &str, tout: Duration, - ) -> ZRPCResult> + ) -> ZRPCResult> where T: Serialize + Clone + std::fmt::Debug, for<'de2> T: Deserialize<'de2>, @@ -74,10 +73,9 @@ impl RPCClientChannel { Ok(self .z .get(&selector) - .with_value(req) + .payload(req) .target(QueryTarget::All) .timeout(tout) - .res() .await?) } @@ -104,9 +102,11 @@ impl RPCClientChannel { let reply = data_receiver.recv_async().await; log::trace!("Response from zenoh is {:?}", reply); if let Ok(reply) = reply { - match reply.sample { + match reply.result() { Ok(sample) => { - let raw_data: Vec = sample.payload.contiguous().to_vec(); + // This is infallible, using unwrap_or_default so that if cannot get + // the vec then the deseriazliation fail on an empty one. + let raw_data = sample.payload().to_bytes().to_vec(); let wmsg: WireMessage = deserialize(&raw_data).map_err(|e| { Status::new(Code::InternalError, format!("deserialization error: {e:?}")) @@ -151,17 +151,17 @@ impl RPCClientChannel { .get(ke) .target(QueryTarget::All) .timeout(tout) - .res() .await .map_err(|e| Status::new(Code::InternalError, format!("communication error: {e:?}")))?; let metadata = data .into_iter() // getting only reply with Sample::Ok - .filter_map(|r| r.sample.ok()) + .filter_map(|r| r.into_result().ok()) // getting only the ones we can deserialize .filter_map(|s| { - let raw_data = s.payload.contiguous().to_vec(); + // This is infallible + let raw_data = s.payload().to_bytes().to_vec(); deserialize::(&raw_data).ok() }) // get only the ones that do not have errors diff --git a/zrpc/src/server.rs b/zrpc/src/server.rs index 6d1f1bc..6142d90 100644 --- a/zrpc/src/server.rs +++ b/zrpc/src/server.rs @@ -14,8 +14,9 @@ use std::collections::HashSet; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; +use zenoh::config::ZenohId; +use zenoh::key_expr::KeyExpr; use zenoh::liveliness::LivelinessToken; -use zenoh::prelude::r#async::*; use zenoh::Session; @@ -25,13 +26,13 @@ use crate::status::{Code, Status}; use crate::types::{Message, ServerMetadata, ServerTaskFuture, WireMessage}; pub struct ServerBuilder { - pub(crate) session: Arc, + pub(crate) session: Session, pub(crate) services: HashMap>, pub(crate) labels: HashSet, } impl ServerBuilder { - pub fn session(mut self, session: Arc) -> Self { + pub fn session(mut self, session: Session) -> Self { self.session = session; self } @@ -74,14 +75,14 @@ impl ServerBuilder { } pub struct Server { - pub(crate) session: Arc, + pub(crate) session: Session, pub(crate) services: HashMap>, - pub(crate) tokens: Arc>>>, + pub(crate) tokens: Arc>>, pub(crate) labels: HashSet, } impl Server { - pub fn builder(session: Arc) -> ServerBuilder { + pub fn builder(session: Session) -> ServerBuilder { ServerBuilder { session, services: HashMap::new(), @@ -100,17 +101,12 @@ impl Server { // register the queryables and declare a liveliness token let ke = format!("@rpc/{}/**", self.instance_uuid()); - let queryable = self - .session - .declare_queryable(&ke) - .res() - .await - .map_err(|e| { - Status::new( - Code::InternalError, - format!("Cannot declare queryable: {e:?}"), - ) - })?; + let queryable = self.session.declare_queryable(&ke).await.map_err(|e| { + Status::new( + Code::InternalError, + format!("Cannot declare queryable: {e:?}"), + ) + })?; for k in self.services.keys() { let ke = format!("@rpc/{}/service/{k}", self.instance_uuid()); @@ -118,7 +114,6 @@ impl Server { .session .liveliness() .declare_token(ke) - .res() .await .map_err(|e| { Status::new( @@ -162,12 +157,11 @@ impl Server { .clone(); let payload = query - .value() + .payload() .ok_or_else(|| { Status::internal_error("Query has empty value cannot proceed") })? - .payload - .contiguous() + .to_bytes() .to_vec(); // this is call to a service @@ -187,16 +181,16 @@ impl Server { tokio::task::spawn(async move { let res = fut.await; let sample = match res { - Ok(data) => Sample::new(ke, data), + Ok(data) => data, Err(e) => { let wmgs = WireMessage { payload: None, status: e, }; - Sample::new(ke, serialize(&wmgs).unwrap_or_default()) + serialize(&wmgs).unwrap_or_default() } }; - let res = query.reply(Ok(sample)).res().await; + let res = query.reply(ke, sample).await; log::trace!("Query Result is: {res:?}"); }); } diff --git a/zrpc/src/zrpcresult.rs b/zrpc/src/zrpcresult.rs index 00faaf4..8658c1a 100644 --- a/zrpc/src/zrpcresult.rs +++ b/zrpc/src/zrpcresult.rs @@ -98,8 +98,8 @@ impl From for ZRPCError { } } -impl From for ZRPCError { - fn from(err: zenoh_util::core::Error) -> Self { +impl From for ZRPCError { + fn from(err: zenoh::Error) -> Self { ZRPCError::ZenohError(err.to_string()) } }