diff --git a/dubbo/src/extension/invoker_extension.rs b/dubbo/src/extension/invoker_extension.rs index d27ab93c..6f9793ed 100644 --- a/dubbo/src/extension/invoker_extension.rs +++ b/dubbo/src/extension/invoker_extension.rs @@ -28,6 +28,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures_core::Stream; use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin}; +use thiserror::Error; #[async_trait] pub trait Invoker { @@ -78,7 +79,9 @@ pub mod proxy { use bytes::Bytes; use futures_core::Stream; use std::pin::Pin; + use thiserror::Error; use tokio::sync::{mpsc::Sender, oneshot}; + use tracing::error; pub(super) enum InvokerOpt { Invoke( @@ -100,14 +103,36 @@ pub mod proxy { invocation: GrpcInvocation, ) -> Result + Send + 'static>>, StdError> { let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(InvokerOpt::Invoke(invocation, tx)); + let ret = self.tx.send(InvokerOpt::Invoke(invocation, tx)).await; + match ret { + Ok(_) => {} + Err(err) => { + error!( + "call invoke method failed by invoker proxy, error: {:?}", + err + ); + return Err(InvokerProxyError::new( + "call invoke method failed by invoker proxy", + ) + .into()); + } + } let ret = rx.await?; ret } async fn url(&self) -> Result { let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(InvokerOpt::Url(tx)); + let ret = self.tx.send(InvokerOpt::Url(tx)).await; + match ret { + Ok(_) => {} + Err(err) => { + error!("call url method failed by invoker proxy, error: {:?}", err); + return Err( + InvokerProxyError::new("call url method failed by invoker proxy").into(), + ); + } + } let ret = rx.await?; ret } @@ -121,10 +146,22 @@ pub mod proxy { match opt { InvokerOpt::Invoke(invocation, tx) => { let result = invoker.invoke(invocation).await; - let _ = tx.send(result); + let callback_ret = tx.send(result); + match callback_ret { + Ok(_) => {} + Err(err) => { + error!("invoke method has been called, but callback to caller failed. {:?}", err); + } + } } InvokerOpt::Url(tx) => { - let _ = tx.send(invoker.url().await); + let ret = tx.send(invoker.url().await); + match ret { + Ok(_) => {} + Err(err) => { + error!("url method has been called, but callback to caller failed. {:?}", err); + } + } } } } @@ -132,6 +169,16 @@ pub mod proxy { InvokerProxy { tx } } } + + #[derive(Error, Debug)] + #[error("invoker proxy error: {0}")] + pub struct InvokerProxyError(String); + + impl InvokerProxyError { + pub fn new(msg: &str) -> Self { + InvokerProxyError(msg.to_string()) + } + } } #[derive(Default)] @@ -149,9 +196,22 @@ impl InvokerExtensionLoader { } pub fn load(&mut self, url: Url) -> Result, StdError> { - let extension_name = url.query::().unwrap(); + let extension_name = url.query::(); + let Some(extension_name) = extension_name else { + return Err(InvokerExtensionLoaderError::new( + "load invoker extension failed, extension mustn't be empty", + ) + .into()); + }; let extension_name = extension_name.value(); - let factory = self.factories.get_mut(&extension_name).unwrap(); + let factory = self.factories.get_mut(&extension_name); + let Some(factory) = factory else { + let err_msg = format!( + "load {} invoker extension failed, can not found extension factory", + extension_name + ); + return Err(InvokerExtensionLoaderError(err_msg).into()); + }; factory.create(url) } } @@ -230,3 +290,13 @@ where )) } } + +#[derive(Error, Debug)] +#[error("{0}")] +pub struct InvokerExtensionLoaderError(String); + +impl InvokerExtensionLoaderError { + pub fn new(msg: &str) -> Self { + InvokerExtensionLoaderError(msg.to_string()) + } +} diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index fc48dc5c..ee8cc1e6 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -43,7 +43,9 @@ pub mod echo_client { let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); self.inner.unary(request, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -100,7 +102,9 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream> + type ServerStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -114,14 +118,19 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream> + type BidirectionalStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result, dubbo::status::Status>; + ) -> Result< + Response, + dubbo::status::Status, + >; } /// Echo is the echo service. #[derive(Debug)] @@ -151,7 +160,10 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -164,16 +176,24 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -184,20 +204,30 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc for ServerStreamingEchoServer { + impl ServerStreamingSvc + for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.server_streaming_echo(request).await }; + let fut = async move { + inner.server_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -210,21 +240,29 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc for ClientStreamingEchoServer { + impl ClientStreamingSvc + for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.client_streaming_echo(request).await }; + let fut = async move { + inner.client_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -237,39 +275,54 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc for BidirectionalStreamingEchoServer { + impl StreamingSvc + for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = - async move { inner.bidirectional_streaming_echo(request).await }; + let fut = async move { + inner.bidirectional_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = - TripleServer::::new(); + let mut server = TripleServer::< + super::EchoRequest, + super::EchoResponse, + >::new(); let res = server - .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) + .bidi_streaming( + BidirectionalStreamingEchoServer { + inner, + }, + req, + ) .await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } }