diff --git a/dubbo-macro/src/lib.rs b/dubbo-macro/src/lib.rs index 6db902a9..7656d610 100644 --- a/dubbo-macro/src/lib.rs +++ b/dubbo-macro/src/lib.rs @@ -16,20 +16,18 @@ */ use proc_macro::TokenStream; -use quote::{ToTokens}; +use quote::ToTokens; use syn::parse::Parser; mod server_macro; mod trait_macro; - - #[proc_macro_attribute] pub fn dubbo_trait(attr: TokenStream, item: TokenStream) -> TokenStream { let attr = DubboAttr::from_attr(attr); match attr { - Ok(attr) => { trait_macro::dubbo_trait(attr, item) } - Err(err) => { err.into_compile_error().into() } + Ok(attr) => trait_macro::dubbo_trait(attr, item), + Err(err) => err.into_compile_error().into(), } } @@ -37,12 +35,11 @@ pub fn dubbo_trait(attr: TokenStream, item: TokenStream) -> TokenStream { pub fn dubbo_server(attr: TokenStream, item: TokenStream) -> TokenStream { let attr = DubboAttr::from_attr(attr); match attr { - Ok(attr) => { server_macro::dubbo_server(attr, item) } - Err(err) => { err.into_compile_error().into() } + Ok(attr) => server_macro::dubbo_server(attr, item), + Err(err) => err.into_compile_error().into(), } } - #[derive(Default)] struct DubboAttr { package: Option, @@ -56,7 +53,9 @@ impl DubboAttr { .and_then(|args| Self::build_attr(args)) } - fn build_attr(args: syn::punctuated::Punctuated::) -> Result { + fn build_attr( + args: syn::punctuated::Punctuated, + ) -> Result { let mut package = None; let mut version = None; for arg in args { @@ -71,10 +70,12 @@ impl DubboAttr { .to_string() .to_lowercase(); let lit = match &namevalue.value { - syn::Expr::Lit(syn::ExprLit { lit, .. }) => lit.to_token_stream().to_string(), + syn::Expr::Lit(syn::ExprLit { lit, .. }) => { + lit.to_token_stream().to_string() + } expr => expr.to_token_stream().to_string(), } - .replace("\"", ""); + .replace("\"", ""); match ident.as_str() { "package" => { let _ = package.insert(lit); @@ -99,9 +100,6 @@ impl DubboAttr { } } } - Ok(DubboAttr { - package, - version, - }) + Ok(DubboAttr { package, version }) } } diff --git a/dubbo-macro/src/server_macro.rs b/dubbo-macro/src/server_macro.rs index f07fda7d..1aee33ab 100644 --- a/dubbo-macro/src/server_macro.rs +++ b/dubbo-macro/src/server_macro.rs @@ -15,11 +15,10 @@ * limitations under the License. */ - use crate::DubboAttr; use proc_macro::TokenStream; use quote::{quote, ToTokens}; -use syn::{parse_macro_input, FnArg, ItemImpl, ImplItem}; +use syn::{parse_macro_input, FnArg, ImplItem, ItemImpl}; pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { let version = match attr.version { @@ -48,7 +47,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { let token = quote! { let result : Result<#req_type,_> = serde_json::from_slice(param_req[idx].as_bytes()); if let Err(err) = result { - param.res = Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string())); + param.result = Err(dubbo::status::Status::new(dubbo::status::Code::InvalidArgument,err.to_string())); return param; } let #req : #req_type = result.unwrap(); @@ -62,7 +61,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { ); vec.push(quote! { if ¶m.method_name[..] == stringify!(#method) { - let param_req = ¶m.req; + let param_req = ¶m.args; let mut idx = 0; #( #req @@ -72,7 +71,7 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { #req_pat, )* ).await; - param.res = match res { + param.result = match res { Ok(res) => { let res = serde_json::to_string(&res).unwrap(); Ok(res) @@ -87,20 +86,23 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { vec }); let service_unique = match &attr.package { - None => { quote!(stringify!(#item_trait)) } - Some(attr) => { quote!(#attr.to_owned() + "." + stringify!(#item_trait)) } + None => { + quote!(stringify!(#item_trait)) + } + Some(attr) => { + let service_unique = attr.to_owned() + "." + &item_trait.to_string(); + quote!(&#service_unique) + } }; let expanded = quote! { + #server_item - use dubbo::triple::server::support::RpcServer; - use dubbo::triple::server::support::RpcFuture; - use dubbo::triple::server::support::RpcMsg; - impl RpcServer for #item_self { - fn invoke (&self, param : RpcMsg) -> RpcFuture { + impl dubbo::triple::server::support::RpcServer for #item_self { + fn invoke (&self, param : dubbo::triple::server::support::RpcContext) -> dubbo::triple::server::support::RpcFuture { let mut rpc = self.clone(); Box::pin(async move {rpc.prv_invoke(param).await}) } - fn get_info(&self) -> (&str , &str , Option<&str> , Vec) { + fn get_info(&self) -> (&str, Option<&str>, Vec) { let mut methods = vec![]; #( methods.push(stringify!(#items_ident_fn).to_string()); @@ -110,9 +112,9 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { } impl #item_self { - async fn prv_invoke (&self, mut param : RpcMsg) -> RpcMsg { + async fn prv_invoke (&self, mut param : dubbo::triple::server::support::RpcContext) -> dubbo::triple::server::support::RpcContext { #(#items_fn)* - param.res = Err( + param.result = Err( dubbo::status::Status::new(dubbo::status::Code::NotFound,format!("not find method by {}",param.method_name)) ); return param; @@ -122,7 +124,6 @@ pub fn dubbo_server(attr: DubboAttr, item: TokenStream) -> TokenStream { expanded.into() } - fn get_server_item(item: ItemImpl) -> proc_macro2::TokenStream { let impl_item = item.impl_token; let trait_ident = item.trait_.unwrap().1; @@ -141,4 +142,4 @@ fn get_server_item(item: ItemImpl) -> proc_macro2::TokenStream { )* } } -} \ No newline at end of file +} diff --git a/dubbo-macro/src/trait_macro.rs b/dubbo-macro/src/trait_macro.rs index 82f11176..d4aa95e8 100644 --- a/dubbo-macro/src/trait_macro.rs +++ b/dubbo-macro/src/trait_macro.rs @@ -64,11 +64,12 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream { vec.push(token); vec }); - let package = stringify!(#trait_ident); + let package = trait_ident.to_string(); let service_unique = match &attr.package { - None => { quote!(#package) } - Some(attr) => { quote!(#attr.to_owned() + "." + #package) } + None => package.to_owned(), + Some(attr) => attr.to_owned() + "." + &package, }; + let path = "/".to_string() + &service_unique + "/" + &ident.to_string(); fn_quote.push( quote! { #[allow(non_snake_case)] @@ -82,22 +83,26 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream { req_vec.push(req_poi_str.unwrap()); )* let _version : Option<&str> = #version; - let request = Request::new(TripleRequestWrapper::new(req_vec)); + let request = dubbo::invocation::Request::new(dubbo::triple::triple_wrapper::TripleRequestWrapper::new(req_vec)); let service_unique = #service_unique; let method_name = stringify!(#ident).to_string(); let invocation = dubbo::invocation::RpcInvocation::default() .with_service_unique_name(service_unique.to_owned()) .with_method_name(method_name.clone()); - let path = "/".to_string() + service_unique + "/" + &method_name; - let path = http::uri::PathAndQuery::from_str( - &path, - ).unwrap(); - let res = self.inner.unary::(request, path, invocation).await; + let path = http::uri::PathAndQuery::from_static( + #path, + ); + let res = self.inner.unary::(request, path, invocation).await; match res { Ok(res) => { let response_wrapper = res.into_parts().1; - let res: #output_type = serde_json::from_slice(&response_wrapper.data).unwrap(); - Ok(res) + let data = &response_wrapper.data; + if data.starts_with(b"null") { + Err(dubbo::status::Status::new(dubbo::status::Code::DataLoss,"null".to_string())) + } else { + let res: #output_type = serde_json::from_slice(data).unwrap(); + Ok(res) + } }, Err(err) => Err(err) } @@ -107,33 +112,24 @@ pub fn dubbo_trait(attr: DubboAttr, item: TokenStream) -> TokenStream { } let rpc_client = syn::Ident::new(&format!("{}Client", trait_ident), trait_ident.span()); let expanded = quote! { - use dubbo::triple::client::TripleClient; - use dubbo::triple::triple_wrapper::TripleRequestWrapper; - use dubbo::triple::triple_wrapper::TripleResponseWrapper; - use dubbo::triple::codec::prost::ProstCodec; - use dubbo::invocation::Request; - use dubbo::invocation::Response; - use dubbo::triple::client::builder::ClientBuilder; - use std::str::FromStr; #item_trait #vis struct #rpc_client { - inner: TripleClient + inner: dubbo::triple::client::TripleClient } impl #rpc_client { #( #fn_quote )* - pub fn new(builder: ClientBuilder) -> #rpc_client { - #rpc_client {inner: TripleClient::new(builder),} + pub fn new(builder: dubbo::triple::client::builder::ClientBuilder) -> #rpc_client { + #rpc_client {inner: dubbo::triple::client::TripleClient::new(builder),} } } }; TokenStream::from(expanded) } - fn get_item_trait(item: ItemTrait) -> proc_macro2::TokenStream { let trait_ident = &item.ident; let item_fn = item.items.iter().fold(vec![], |mut vec, e| { @@ -162,4 +158,4 @@ fn get_item_trait(item: ItemTrait) -> proc_macro2::TokenStream { )* } } -} \ No newline at end of file +} diff --git a/dubbo/src/config/service.rs b/dubbo/src/config/service.rs index 8a1f1910..ce639f46 100644 --- a/dubbo/src/config/service.rs +++ b/dubbo/src/config/service.rs @@ -23,6 +23,7 @@ pub struct ServiceConfig { pub group: String, pub protocol: String, pub interface: String, + pub serialization: Option, } impl ServiceConfig { @@ -41,4 +42,10 @@ impl ServiceConfig { pub fn protocol(self, protocol: String) -> Self { Self { protocol, ..self } } + pub fn serialization(self, serialization: Option) -> Self { + Self { + serialization, + ..self + } + } } diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs index 6bb6e3b1..5bafb629 100644 --- a/dubbo/src/framework.rs +++ b/dubbo/src/framework.rs @@ -24,10 +24,10 @@ use crate::{ logger::tracing::{debug, info}, protocol::{BoxExporter, Protocol}, registry::protocol::RegistryProtocol, + triple::server::support::{RpcHttp2Server, RpcServer}, Url, }; use futures::{future, Future}; -use crate::triple::server::support::{RpcHttp2Server, RpcServer}; // Invoker是否可以基于hyper写一个通用的 @@ -93,12 +93,17 @@ impl Dubbo { .protocols .get_protocol_or_default(service_config.protocol.as_str()); let interface_name = service_config.interface.clone(); - let protocol_url = format!( - "{}/{}?interface={}", + let mut protocol_url = format!( + "{}/{}?interface={}&category={}&protocol={}", protocol.to_url(), interface_name, - interface_name + interface_name, + "providers", + "tri" ); + if let Some(serialization) = &service_config.serialization { + protocol_url.push_str(&format!("&prefer.serialization={}", serialization)); + } info!("protocol_url: {:?}", protocol_url); protocol_url.parse().ok() } else { @@ -142,7 +147,7 @@ impl Dubbo { .with_registries(registry_extensions.clone()) .with_services(self.service_registry.clone()), ); - let mut async_vec: Vec + Send>>> = Vec::new(); + let mut async_vec: Vec + Send>>> = Vec::new(); for (name, items) in self.protocols.iter() { for url in items.iter() { info!("base: {:?}, service url: {:?}", name, url); diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index 7b251835..2d1c91db 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -18,6 +18,7 @@ use std::{collections::HashMap, fmt::Debug, str::FromStr}; use futures_core::Stream; +use http::StatusCode; pub struct Request { pub message: T, @@ -81,6 +82,7 @@ impl Request { pub struct Response { message: T, + status: StatusCode, metadata: Metadata, } @@ -88,30 +90,41 @@ impl Response { pub fn new(message: T) -> Response { Self { message, + status: Default::default(), metadata: Metadata::new(), } } pub fn from_parts(metadata: Metadata, message: T) -> Self { - Self { message, metadata } + Self { + message, + status: Default::default(), + metadata, + } } pub fn into_parts(self) -> (Metadata, T) { - (self.metadata, self.message) + let metadata = self + .metadata + .insert("http_status".to_owned(), self.status.as_str().to_owned()); + (metadata, self.message) } pub fn into_http(self) -> http::Response { let mut http_resp = http::Response::new(self.message); *http_resp.version_mut() = http::Version::HTTP_2; *http_resp.headers_mut() = self.metadata.into_headers(); + *http_resp.status_mut() = self.status; http_resp } pub fn from_http(resp: http::Response) -> Self { + let status = resp.status(); let (part, body) = resp.into_parts(); Response { message: body, + status, metadata: Metadata::from_headers(part.headers), } } @@ -123,6 +136,7 @@ impl Response { let u = f(self.message); Response { message: u, + status: self.status, metadata: self.metadata, } } @@ -206,6 +220,14 @@ impl Metadata { header } + + pub fn get(&self, key: &str) -> Option<&String> { + self.inner.get(key) + } + + pub fn get_http_status(&self) -> &str { + self.inner.get("http_status").map_or("200", |e| e) + } } pub trait Invocation { diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs index 9b2ae811..e97b1505 100644 --- a/dubbo/src/protocol/triple/mod.rs +++ b/dubbo/src/protocol/triple/mod.rs @@ -26,7 +26,7 @@ use std::{collections::HashMap, sync::RwLock}; use crate::{utils::boxed_clone::BoxCloneService, BoxBody}; pub type GrpcBoxCloneService = -BoxCloneService, http::Response, std::convert::Infallible>; + BoxCloneService, http::Response, std::convert::Infallible>; lazy_static! { pub static ref TRIPLE_SERVICES: RwLock> = diff --git a/dubbo/src/status.rs b/dubbo/src/status.rs index 7258b481..6ba1076a 100644 --- a/dubbo/src/status.rs +++ b/dubbo/src/status.rs @@ -225,32 +225,38 @@ impl std::fmt::Display for Code { } } -impl From for Code { - fn from(i: i32) -> Self { +impl From<&[u8]> for Code { + fn from(i: &[u8]) -> Self { match i { - 0 => Code::Ok, - 1 => Code::Cancelled, - 2 => Code::Unknown, - 3 => Code::InvalidArgument, - 4 => Code::DeadlineExceeded, - 5 => Code::NotFound, - 6 => Code::AlreadyExists, - 7 => Code::PermissionDenied, - 8 => Code::ResourceExhausted, - 9 => Code::FailedPrecondition, - 10 => Code::Aborted, - 11 => Code::OutOfRange, - 12 => Code::Unimplemented, - 13 => Code::Internal, - 14 => Code::Unavailable, - 15 => Code::DataLoss, - 16 => Code::Unauthenticated, + b"0" => Code::Ok, + b"1" => Code::Cancelled, + b"2" => Code::Unknown, + b"3" => Code::InvalidArgument, + b"4" => Code::DeadlineExceeded, + b"5" => Code::NotFound, + b"6" => Code::AlreadyExists, + b"7" => Code::PermissionDenied, + b"8" => Code::ResourceExhausted, + b"9" => Code::FailedPrecondition, + b"10" => Code::Aborted, + b"11" => Code::OutOfRange, + b"12" => Code::Unimplemented, + b"13" => Code::Internal, + b"14" => Code::Unavailable, + b"15" => Code::DataLoss, + b"16" => Code::Unauthenticated, _ => Code::Unknown, } } } +impl From for Code { + fn from(i: i32) -> Self { + Code::from(i.to_string().as_bytes()) + } +} + #[derive(Debug, Clone)] pub struct Status { // grpc-status diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 2948dec5..55e6d5ad 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -21,6 +21,7 @@ use http::HeaderValue; use prost::Message; use serde::{Deserialize, Serialize}; use tower_service::Service; +use tracing::error; use crate::codegen::{ProstCodec, RpcInvocation, SerdeCodec}; @@ -167,7 +168,6 @@ impl TripleClient { .header("path", path.to_string()) .body(body) .unwrap(); - for (k, v) in mt.into_headers().iter() { request.headers_mut().insert(k, v.to_owned()); } @@ -185,12 +185,24 @@ impl TripleClient { futures_util::pin_mut!(body); - let message = body.try_next().await?.ok_or_else(|| { - crate::status::Status::new( - crate::status::Code::Internal, - "Missing response message.".to_string(), - ) - })?; + let message = match body.try_next().await? { + Some(message) => message, + None => { + let http_status = parts.get_http_status(); + if http_status != "200" { + error!("http status : {}", http_status); + } + let code = parts + .get("grpc-status") + .map(|e| crate::status::Code::from(e.as_bytes())) + .map_or(crate::status::Code::Internal, |e| e); + let message = parts + .get("grpc-message") + .map(|e| e.to_string()) + .map_or(code.to_string(), |e| e); + return Err(crate::status::Status::new(code, message)); + } + }; if let Some(trailers) = body.trailer().await? { let mut h = parts.into_headers(); diff --git a/dubbo/src/triple/server/mod.rs b/dubbo/src/triple/server/mod.rs index abd48f91..a5cf0cf0 100644 --- a/dubbo/src/triple/server/mod.rs +++ b/dubbo/src/triple/server/mod.rs @@ -17,7 +17,7 @@ pub mod builder; pub mod service; -pub mod triple; pub mod support; +pub mod triple; pub use triple::TripleServer; diff --git a/dubbo/src/triple/server/support.rs b/dubbo/src/triple/server/support.rs index 896338c1..d368964b 100644 --- a/dubbo/src/triple/server/support.rs +++ b/dubbo/src/triple/server/support.rs @@ -33,29 +33,30 @@ use super::TripleServer; pub type RpcFuture = std::pin::Pin + Send>>; -pub struct RpcMsg { +#[derive(Debug)] +pub struct RpcContext { pub version: Option, pub class_name: String, pub method_name: String, - pub req: Vec, - pub res: Result, + pub args: Vec, + pub result: Result, } -impl RpcMsg { +impl RpcContext { pub fn new(path: String, version: Option) -> Self { let attr: Vec<&str> = path.split("/").collect(); - RpcMsg { + RpcContext { version, class_name: attr[1].to_string(), method_name: attr[2].to_string(), - req: vec![], - res: Err(Status::new(Code::Ok, "success".to_string())), + args: vec![], + result: Err(Status::new(Code::Ok, "success".to_string())), } } } pub trait RpcServer: Send + Sync + 'static { - fn invoke(&self, msg: RpcMsg) -> RpcFuture; + fn invoke(&self, msg: RpcContext) -> RpcFuture; fn get_info(&self) -> (&str, Option<&str>, Vec); } @@ -86,20 +87,20 @@ where fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn call(&mut self, req: http::Request) -> Self::Future { - let path = req.uri().path().to_string(); - let version = req + fn call(&mut self, request: http::Request) -> Self::Future { + let path = request.uri().path().to_string(); + let version = request .headers() .get("tri-service-version") .map(|e| String::from_utf8_lossy(e.as_bytes()).to_string()); - let rpc_msg = RpcMsg::new(path, version); + let rpc_msg = RpcContext::new(path, version); let rpc_unary_server = RpcUnaryServer { inner: self.inner.clone(), msg: Some(rpc_msg), }; let mut server = TripleServer::new(); let fut = async move { - let res = server.unary(rpc_unary_server, req).await; + let res = server.unary(rpc_unary_server, request).await; Ok(res) }; Box::pin(fut) @@ -109,7 +110,7 @@ where #[allow(non_camel_case_types)] struct RpcUnaryServer { inner: _Inner, - msg: Option, + msg: Option, } impl UnarySvc for RpcUnaryServer { @@ -118,10 +119,10 @@ impl UnarySvc for RpcUnaryServer { fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); let mut msg = self.msg.take().unwrap(); - msg.req = request.message.get_req(); + msg.args = request.message.get_args(); let fut = async move { - let res = inner.invoke(msg).await.res; - match res { + let result = inner.invoke(msg).await.result; + match result { Ok(res) => Ok(Response::new(TripleResponseWrapper::new(res))), Err(err) => Err(err), } diff --git a/dubbo/src/triple/triple_wrapper.rs b/dubbo/src/triple/triple_wrapper.rs index 2f06df69..ce1d7d7b 100644 --- a/dubbo/src/triple/triple_wrapper.rs +++ b/dubbo/src/triple/triple_wrapper.rs @@ -61,7 +61,7 @@ impl TripleRequestWrapper { trip.args = data.iter().map(|e| e.as_bytes().to_vec()).collect(); return trip; } - pub fn get_req(self) -> Vec { + pub fn get_args(self) -> Vec { let mut res = vec![]; for str in self.args { res.push(String::from_utf8(str).unwrap()); diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index ee8cc1e6..fc48dc5c 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -43,9 +43,7 @@ pub mod echo_client { let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static( - "/grpc.examples.echo.Echo/UnaryEcho", - ); + let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); self.inner.unary(request, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -102,9 +100,7 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type ServerStreamingEchoStream: futures_util::Stream> + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -118,19 +114,14 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream< - Item = Result, - > + type BidirectionalStreamingEchoStream: futures_util::Stream> + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result< - Response, - dubbo::status::Status, - >; + ) -> Result, dubbo::status::Status>; } /// Echo is the echo service. #[derive(Debug)] @@ -160,10 +151,7 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -176,24 +164,16 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -204,30 +184,20 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc - for ServerStreamingEchoServer { + impl ServerStreamingSvc for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; - fn call( - &mut self, - request: Request, - ) -> Self::Future { + type Future = + BoxFuture, dubbo::status::Status>; + fn call(&mut self, request: Request) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.server_streaming_echo(request).await - }; + let fut = async move { inner.server_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -240,29 +210,21 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc - for ClientStreamingEchoServer { + impl ClientStreamingSvc for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.client_streaming_echo(request).await - }; + let fut = async move { inner.client_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -275,54 +237,39 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc - for BidirectionalStreamingEchoServer { + impl StreamingSvc for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = BoxFuture< - Response, - dubbo::status::Status, - >; + type Future = + BoxFuture, dubbo::status::Status>; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { - inner.bidirectional_streaming_echo(request).await - }; + let fut = + async move { inner.bidirectional_streaming_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::< - super::EchoRequest, - super::EchoResponse, - >::new(); + let mut server = + TripleServer::::new(); let res = server - .bidi_streaming( - BidirectionalStreamingEchoServer { - inner, - }, - req, - ) + .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) .await; Ok(res) }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/examples/interface/application.yaml b/examples/interface/application.yaml index f6974286..6e819326 100644 --- a/examples/interface/application.yaml +++ b/examples/interface/application.yaml @@ -17,9 +17,4 @@ dubbo: group: test protocol: triple serialization : fastjson - interface: org.apache.dubbo.springboot.demo.DemoService - routers: - consumer: - - service: "org.apache.dubbo.springboot.demo.DemoService" - url: tri://127.0.0.1:20000 - protocol: triple \ No newline at end of file + interface: org.apache.dubbo.springboot.demo.DemoService \ No newline at end of file diff --git a/examples/interface/src/client.rs b/examples/interface/src/client.rs index 622beeff..8ec03eb2 100644 --- a/examples/interface/src/client.rs +++ b/examples/interface/src/client.rs @@ -15,8 +15,7 @@ * limitations under the License. */ -use dubbo::codegen::{ClientBuilder}; -use dubbo::extension; +use dubbo::{codegen::ClientBuilder, extension}; use example_interface::{DemoServiceClient, ReqDto}; use registry_nacos::NacosRegistry; diff --git a/examples/interface/src/lib.rs b/examples/interface/src/lib.rs index bf05fb07..0d4279f5 100644 --- a/examples/interface/src/lib.rs +++ b/examples/interface/src/lib.rs @@ -18,7 +18,6 @@ use dubbo_macro::dubbo_trait; use serde::{Deserialize, Serialize}; - #[derive(Serialize, Deserialize, Default, Debug)] pub struct ReqDto { pub str: String, diff --git a/examples/interface/src/server.rs b/examples/interface/src/server.rs index ce5dd35c..6b71fec4 100644 --- a/examples/interface/src/server.rs +++ b/examples/interface/src/server.rs @@ -15,13 +15,16 @@ * limitations under the License. */ -use dubbo::{Dubbo, extension}; +use std::env; + +use dubbo::{ + config::RootConfig, + extension, + logger::{tracing::span, Level}, + Dubbo, +}; use dubbo_macro::dubbo_server; use example_interface::{DemoService, ReqDto, ResDto}; -use registry_zookeeper::ZookeeperRegistry; -use std::env; -use dubbo::config::RootConfig; -use dubbo::logger::tracing::span; use registry_nacos::NacosRegistry; #[derive(Clone)] @@ -33,35 +36,34 @@ struct DemoServiceImpl { impl DemoService for DemoServiceImpl { async fn sayHello(&self, req: String) -> Result { println!("client request : {:?}", req); - return Ok("Hello ".to_owned() + &req); + Ok("Hello ".to_owned() + &req) } async fn sayHelloV2(&self, req: ReqDto, req2: ReqDto) -> Result { println!("client request : {:?} : {:?}", req, req2); - return Ok(ResDto { + Ok(ResDto { str: "Hello ".to_owned() + &req.str + ":" + &req2.str + " V2", - }); + }) } } #[tokio::main] async fn main() { dubbo::logger::init(); - let span = span!(Level::DEBUG, "greeter.server"); + let span = span!(Level::DEBUG, "interface.server"); + env::set_var("DUBBO_CONFIG_PATH", "examples/interface/application.yaml"); let _enter = span.enter(); - register_server(GreeterServerImpl { - name: "greeter".to_string(), - }); - // let zkr: ZookeeperRegistry = ZookeeperRegistry::default(); let r = RootConfig::new(); let r = match r.load() { Ok(config) => config, Err(_err) => panic!("err: {:?}", _err), // response was droped }; - let _ = extension::EXTENSIONS.register::().await; + let server = DemoServiceImpl { + _db: "i am db".to_owned(), + }; let mut f = Dubbo::new() .with_config(r) - .add_registry("nacos://127.0.0.1:8848/"); - + .add_registry("nacos://127.0.0.1:8848/") + .register_server(server); f.start().await; } diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs index f8ccc967..3a429de8 100644 --- a/registry/nacos/src/lib.rs +++ b/registry/nacos/src/lib.rs @@ -378,8 +378,7 @@ impl NacosServiceName { let group = url.query::().unwrap_or_default(); let group = group.value(); - let value = format!("{}:{}:{}:{}", category, interface, version, group); - + let value = format!("{}:{}:{}:{}", "providers", interface, version, group); Self { category, interface,