Skip to content

Commit

Permalink
Rft: Optimize the relation of metadata and headers (#193)
Browse files Browse the repository at this point in the history
* refact(triple): optimize metadata and header logic

* style: cargo fmt
  • Loading branch information
yang20150702 authored Apr 28, 2024
1 parent 463dcf4 commit 6943c9f
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 19 deletions.
2 changes: 1 addition & 1 deletion dubbo/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use tower_service::Service;

pub use super::{
empty_body,
invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
invocation::{IntoStreamingRequest, Metadata, Request, Response, RpcInvocation},
protocol::{triple::triple_invoker::TripleInvoker, Invoker},
triple::{
client::TripleClient,
Expand Down
33 changes: 31 additions & 2 deletions dubbo/src/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,23 @@ where

type Message = T::Item;

fn into_streaming_request(self) -> Request<Self::Stream> {
fn into_streaming_request(self) -> Request<Self> {
Request::new(self)
}
}

impl<T> IntoStreamingRequest for Request<T>
where
T: Stream + Send + 'static,
{
type Stream = T;
type Message = T::Item;

fn into_streaming_request(self) -> Self {
self
}
}

// impl<T> sealed::Sealed for T {}

// pub mod sealed {
Expand All @@ -167,6 +179,11 @@ impl Metadata {
}
}

pub fn insert(mut self, key: String, value: String) -> Self {
self.inner.insert(key, value);
self
}

pub fn from_headers(headers: http::HeaderMap) -> Self {
let mut h: HashMap<String, String> = HashMap::new();
for (k, v) in headers.into_iter() {
Expand Down Expand Up @@ -196,10 +213,12 @@ pub trait Invocation {
fn get_method_name(&self) -> String;
}

#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
pub struct RpcInvocation {
target_service_unique_name: String,
method_name: String,

metadata: Metadata,
}

impl RpcInvocation {
Expand All @@ -211,9 +230,19 @@ impl RpcInvocation {
self.method_name = method_name;
self
}

pub fn with_metadata(mut self, metadata: Metadata) -> Self {
self.metadata = metadata;
self
}

pub fn unique_fingerprint(&self) -> String {
format!("{}#{}", self.target_service_unique_name, self.method_name)
}

pub fn get_metadata(&self) -> Metadata {
self.metadata.clone()
}
}

impl Invocation for RpcInvocation {
Expand Down
4 changes: 4 additions & 0 deletions dubbo/src/protocol/triple/triple_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl TripleInvoker {
.body(body)
.unwrap();

// add header of source
for (k, v) in parts.headers.iter() {
req.headers_mut().insert(k, v.to_owned());
}
// *req.version_mut() = http::Version::HTTP_2;
req.headers_mut()
.insert("method", HeaderValue::from_static("POST"));
Expand Down
50 changes: 41 additions & 9 deletions dubbo/src/triple/client/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl TripleClient {
&mut self,
req: Request<M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -147,6 +147,9 @@ impl TripleClient {
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");

let mt = req.metadata.clone();

let req = req.map(|m| stream::once(future::ready(m)));
let body_stream = encode(
encoder,
Expand All @@ -157,13 +160,18 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand Down Expand Up @@ -200,7 +208,7 @@ impl TripleClient {
&mut self,
req: impl IntoStreamingRequest<Message = M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -210,7 +218,10 @@ impl TripleClient {
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");

let req = req.into_streaming_request();
let mt = req.metadata.clone();

let en = encode(
encoder,
req.into_inner().map(Ok),
Expand All @@ -220,13 +231,18 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand All @@ -247,7 +263,7 @@ impl TripleClient {
&mut self,
req: impl IntoStreamingRequest<Message = M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -258,6 +274,8 @@ impl TripleClient {
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");
let req = req.into_streaming_request();
let mt = req.metadata.clone();

let en = encode(
encoder,
req.into_inner().map(Ok),
Expand All @@ -266,14 +284,19 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(en);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

// let mut conn = Connection::new().with_host(http_uri);
for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand Down Expand Up @@ -310,7 +333,7 @@ impl TripleClient {
&mut self,
req: Request<M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -320,7 +343,10 @@ impl TripleClient {
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");

let req = req.map(|m| stream::once(future::ready(m)));
let mt = req.metadata.clone();

let en = encode(
encoder,
req.into_inner().map(Ok),
Expand All @@ -329,13 +355,19 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(en);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand Down
20 changes: 13 additions & 7 deletions examples/echo/src/echo/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

use dubbo::codegen::*;
use dubbo::{codegen::*, invocation::Metadata};
use example_echo::generated::generated::{echo_client::EchoClient, EchoRequest};
use futures_util::StreamExt;

Expand All @@ -41,11 +41,15 @@ async fn main() {
let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
let resp = cli
.unary_echo(Request::new(EchoRequest {
let mut mtdata = Metadata::default();
mtdata = mtdata.insert("static_tag".to_string(), "red".to_string());
let req = Request::from_parts(
mtdata.clone(),
EchoRequest {
message: "message from client".to_string(),
}))
.await;
},
);
let resp = cli.unary_echo(req).await;
let resp = match resp {
Ok(resp) => resp,
Err(err) => return println!("{:?}", err),
Expand All @@ -64,7 +68,9 @@ async fn main() {
message: "msg3 from client streaming".to_string(),
},
];
let req = futures_util::stream::iter(data);
let mut mtdata = Metadata::default();
mtdata = mtdata.insert("client_streaming".to_string(), "true".to_string());
let req = Request::from_parts(mtdata, futures_util::stream::iter(data));
let resp = cli.client_streaming_echo(req).await;
let client_streaming_resp = match resp {
Ok(resp) => resp,
Expand All @@ -84,7 +90,7 @@ async fn main() {
message: "msg3 from client".to_string(),
},
];
let req = futures_util::stream::iter(data);
let req = Request::new(futures_util::stream::iter(data));

let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap();

Expand Down

0 comments on commit 6943c9f

Please sign in to comment.