Skip to content

Commit

Permalink
chore: cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
yang20150702 committed Mar 22, 2024
1 parent 8984192 commit db28e46
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 42 deletions.
2 changes: 1 addition & 1 deletion dubbo/src/registry/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/

#[allow(unused_variables, dead_code, missing_docs)]
use dubbo_base::{registry_param::InterfaceName, url::UrlParam, Url};
use dubbo_logger::tracing;
use std::{
Expand All @@ -36,6 +35,7 @@ pub struct RegistryProtocol {
// registerAddr: Registry
registries: Vec<RegistryProxy>,
// providerUrl: Exporter
#[allow(dead_code)]
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
// serviceName: registryUrls
services: HashMap<String, Vec<Url>>,
Expand Down
6 changes: 2 additions & 4 deletions dubbo/src/registry/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet};

use async_trait::async_trait;
use itertools::Itertools;
use thiserror::Error;

use tokio::sync::{
mpsc::{self},
Mutex,
Expand Down Expand Up @@ -217,6 +217,4 @@ impl Extension for StaticRegistry {
Ok(Box::new(static_registry))
}
}
#[derive(Error, Debug)]
#[error("static registry error: {0}")]
struct StaticRegistryError(String);

127 changes: 90 additions & 37 deletions examples/echo/src/generated/grpc.examples.echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ pub mod echo_client {
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("UnaryEcho"));
let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/UnaryEcho",
);
self.inner.unary(request, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
Expand Down Expand Up @@ -100,7 +102,9 @@ pub mod echo_server {
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the ServerStreamingEcho method.
type ServerStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
type ServerStreamingEchoStream: futures_util::Stream<
Item = Result<super::EchoResponse, dubbo::status::Status>,
>
+ Send
+ 'static;
/// ServerStreamingEcho is server side streaming.
Expand All @@ -114,14 +118,19 @@ pub mod echo_server {
request: Request<Decoding<super::EchoRequest>>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the BidirectionalStreamingEcho method.
type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
type BidirectionalStreamingEchoStream: futures_util::Stream<
Item = Result<super::EchoResponse, dubbo::status::Status>,
>
+ Send
+ 'static;
/// BidirectionalStreamingEcho is bidi streaming.
async fn bidirectional_streaming_echo(
&self,
request: Request<Decoding<super::EchoRequest>>,
) -> Result<Response<Self::BidirectionalStreamingEchoStream>, dubbo::status::Status>;
) -> Result<
Response<Self::BidirectionalStreamingEchoStream>,
dubbo::status::Status,
>;
}
/// Echo is the echo service.
#[derive(Debug)]
Expand Down Expand Up @@ -151,7 +160,10 @@ pub mod echo_server {
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
Expand All @@ -164,16 +176,24 @@ pub mod echo_server {
}
impl<T: Echo> UnarySvc<super::EchoRequest> for UnaryEchoServer<T> {
type Response = super::EchoResponse;
type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
type Future = BoxFuture<
Response<Self::Response>,
dubbo::status::Status,
>;
fn call(
&mut self,
request: Request<super::EchoRequest>,
) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move { inner.unary_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
let mut server =
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
let mut server = TripleServer::<
super::EchoRequest,
super::EchoResponse,
>::new();
let res = server.unary(UnaryEchoServer { inner }, req).await;
Ok(res)
};
Expand All @@ -184,20 +204,30 @@ pub mod echo_server {
struct ServerStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for ServerStreamingEchoServer<T> {
impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
for ServerStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::ServerStreamingEchoStream;
type Future =
BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
type Future = BoxFuture<
Response<Self::ResponseStream>,
dubbo::status::Status,
>;
fn call(
&mut self,
request: Request<super::EchoRequest>,
) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move { inner.server_streaming_echo(request).await };
let fut = async move {
inner.server_streaming_echo(request).await
};
Box::pin(fut)
}
}
let fut = async move {
let mut server =
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
let mut server = TripleServer::<
super::EchoRequest,
super::EchoResponse,
>::new();
let res = server
.server_streaming(ServerStreamingEchoServer { inner }, req)
.await;
Expand All @@ -210,21 +240,29 @@ pub mod echo_server {
struct ClientStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for ClientStreamingEchoServer<T> {
impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
for ClientStreamingEchoServer<T> {
type Response = super::EchoResponse;
type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
type Future = BoxFuture<
Response<Self::Response>,
dubbo::status::Status,
>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move { inner.client_streaming_echo(request).await };
let fut = async move {
inner.client_streaming_echo(request).await
};
Box::pin(fut)
}
}
let fut = async move {
let mut server =
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
let mut server = TripleServer::<
super::EchoRequest,
super::EchoResponse,
>::new();
let res = server
.client_streaming(ClientStreamingEchoServer { inner }, req)
.await;
Expand All @@ -237,39 +275,54 @@ pub mod echo_server {
struct BidirectionalStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
impl<T: Echo> StreamingSvc<super::EchoRequest> for BidirectionalStreamingEchoServer<T> {
impl<T: Echo> StreamingSvc<super::EchoRequest>
for BidirectionalStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::BidirectionalStreamingEchoStream;
type Future =
BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
type Future = BoxFuture<
Response<Self::ResponseStream>,
dubbo::status::Status,
>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
let fut =
async move { inner.bidirectional_streaming_echo(request).await };
let fut = async move {
inner.bidirectional_streaming_echo(request).await
};
Box::pin(fut)
}
}
let fut = async move {
let mut server =
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
let mut server = TripleServer::<
super::EchoRequest,
super::EchoResponse,
>::new();
let res = server
.bidi_streaming(BidirectionalStreamingEchoServer { inner }, req)
.bidi_streaming(
BidirectionalStreamingEchoServer {
inner,
},
req,
)
.await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap())
}),
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
Expand Down

0 comments on commit db28e46

Please sign in to comment.