From 6943c9fb1876ad5dbc6d335939afbc37abeed775 Mon Sep 17 00:00:00 2001 From: Yang Yang <962032265@qq.com> Date: Sun, 28 Apr 2024 10:21:17 +0800 Subject: [PATCH] Rft: Optimize the relation of metadata and headers (#193) * refact(triple): optimize metadata and header logic * style: cargo fmt --- dubbo/src/codegen.rs | 2 +- dubbo/src/invocation.rs | 33 +++++++++++++- dubbo/src/protocol/triple/triple_invoker.rs | 4 ++ dubbo/src/triple/client/triple.rs | 50 +++++++++++++++++---- examples/echo/src/echo/client.rs | 20 ++++++--- 5 files changed, 90 insertions(+), 19 deletions(-) diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 92774141..baf39299 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -29,7 +29,7 @@ pub use tower_service::Service; pub use super::{ empty_body, - invocation::{IntoStreamingRequest, Request, Response, RpcInvocation}, + invocation::{IntoStreamingRequest, Metadata, Request, Response, RpcInvocation}, protocol::{triple::triple_invoker::TripleInvoker, Invoker}, triple::{ client::TripleClient, diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index 57503379..7b251835 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -144,11 +144,23 @@ where type Message = T::Item; - fn into_streaming_request(self) -> Request { + fn into_streaming_request(self) -> Request { Request::new(self) } } +impl IntoStreamingRequest for Request +where + T: Stream + Send + 'static, +{ + type Stream = T; + type Message = T::Item; + + fn into_streaming_request(self) -> Self { + self + } +} + // impl sealed::Sealed for T {} // pub mod sealed { @@ -167,6 +179,11 @@ impl Metadata { } } + pub fn insert(mut self, key: String, value: String) -> Self { + self.inner.insert(key, value); + self + } + pub fn from_headers(headers: http::HeaderMap) -> Self { let mut h: HashMap = HashMap::new(); for (k, v) in headers.into_iter() { @@ -196,10 +213,12 @@ pub trait Invocation { fn get_method_name(&self) -> String; } -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct RpcInvocation { target_service_unique_name: String, method_name: String, + + metadata: Metadata, } impl RpcInvocation { @@ -211,9 +230,19 @@ impl RpcInvocation { self.method_name = method_name; self } + + pub fn with_metadata(mut self, metadata: Metadata) -> Self { + self.metadata = metadata; + self + } + pub fn unique_fingerprint(&self) -> String { format!("{}#{}", self.target_service_unique_name, self.method_name) } + + pub fn get_metadata(&self) -> Metadata { + self.metadata.clone() + } } impl Invocation for RpcInvocation { diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 29704f5f..516dab2e 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -71,6 +71,10 @@ impl TripleInvoker { .body(body) .unwrap(); + // add header of source + for (k, v) in parts.headers.iter() { + req.headers_mut().insert(k, v.to_owned()); + } // *req.version_mut() = http::Version::HTTP_2; req.headers_mut() .insert("method", HeaderValue::from_static("POST")); diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 46f8e9ca..2948dec5 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -137,7 +137,7 @@ impl TripleClient { &mut self, req: Request, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -147,6 +147,9 @@ impl TripleClient { Box + Send + 'static>, Box + Send + 'static>, ) = get_codec("application/grpc+proto"); + + let mt = req.metadata.clone(); + let req = req.map(|m| stream::once(future::ready(m))); let body_stream = encode( encoder, @@ -157,13 +160,18 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(body_stream); + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await @@ -200,7 +208,7 @@ impl TripleClient { &mut self, req: impl IntoStreamingRequest, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result>, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -210,7 +218,10 @@ impl TripleClient { Box + Send + 'static>, Box + Send + 'static>, ) = get_codec("application/grpc+proto"); + let req = req.into_streaming_request(); + let mt = req.metadata.clone(); + let en = encode( encoder, req.into_inner().map(Ok), @@ -220,13 +231,18 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(en); + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await @@ -247,7 +263,7 @@ impl TripleClient { &mut self, req: impl IntoStreamingRequest, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -258,6 +274,8 @@ impl TripleClient { Box + Send + 'static>, ) = get_codec("application/grpc+proto"); let req = req.into_streaming_request(); + let mt = req.metadata.clone(); + let en = encode( encoder, req.into_inner().map(Ok), @@ -266,14 +284,19 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(en); + + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); - // let mut conn = Connection::new().with_host(http_uri); + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await @@ -310,7 +333,7 @@ impl TripleClient { &mut self, req: Request, path: http::uri::PathAndQuery, - invocation: RpcInvocation, + mut invocation: RpcInvocation, ) -> Result>, crate::status::Status> where M1: Message + Send + Sync + 'static + Serialize, @@ -320,7 +343,10 @@ impl TripleClient { Box + Send + 'static>, Box + Send + 'static>, ) = get_codec("application/grpc+proto"); + let req = req.map(|m| stream::once(future::ready(m))); + let mt = req.metadata.clone(); + let en = encode( encoder, req.into_inner().map(Ok), @@ -329,13 +355,19 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(en); + + invocation = invocation.with_metadata(mt.clone()); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() + let mut request = http::Request::builder() .header("path", path.to_string()) .body(body) .unwrap(); + for (k, v) in mt.into_headers().iter() { + request.headers_mut().insert(k, v.to_owned()); + } + let response = invoker .call(request) .await diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index 74dec98e..030003f8 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -15,7 +15,7 @@ * limitations under the License. */ -use dubbo::codegen::*; +use dubbo::{codegen::*, invocation::Metadata}; use example_echo::generated::generated::{echo_client::EchoClient, EchoRequest}; use futures_util::StreamExt; @@ -41,11 +41,15 @@ async fn main() { let mut cli = EchoClient::new(builder); // let mut unary_cli = cli.clone().with_filter(FakeFilter {}); // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); - let resp = cli - .unary_echo(Request::new(EchoRequest { + let mut mtdata = Metadata::default(); + mtdata = mtdata.insert("static_tag".to_string(), "red".to_string()); + let req = Request::from_parts( + mtdata.clone(), + EchoRequest { message: "message from client".to_string(), - })) - .await; + }, + ); + let resp = cli.unary_echo(req).await; let resp = match resp { Ok(resp) => resp, Err(err) => return println!("{:?}", err), @@ -64,7 +68,9 @@ async fn main() { message: "msg3 from client streaming".to_string(), }, ]; - let req = futures_util::stream::iter(data); + let mut mtdata = Metadata::default(); + mtdata = mtdata.insert("client_streaming".to_string(), "true".to_string()); + let req = Request::from_parts(mtdata, futures_util::stream::iter(data)); let resp = cli.client_streaming_echo(req).await; let client_streaming_resp = match resp { Ok(resp) => resp, @@ -84,7 +90,7 @@ async fn main() { message: "msg3 from client".to_string(), }, ]; - let req = futures_util::stream::iter(data); + let req = Request::new(futures_util::stream::iter(data)); let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap();