Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rft: Optimize the relation of metadata and headers #193

Merged
merged 2 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dubbo/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use tower_service::Service;

pub use super::{
empty_body,
invocation::{IntoStreamingRequest, Request, Response, RpcInvocation},
invocation::{IntoStreamingRequest, Metadata, Request, Response, RpcInvocation},
protocol::{triple::triple_invoker::TripleInvoker, Invoker},
triple::{
client::TripleClient,
Expand Down
33 changes: 31 additions & 2 deletions dubbo/src/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,23 @@ where

type Message = T::Item;

fn into_streaming_request(self) -> Request<Self::Stream> {
fn into_streaming_request(self) -> Request<Self> {
Request::new(self)
}
}

impl<T> IntoStreamingRequest for Request<T>
where
T: Stream + Send + 'static,
{
type Stream = T;
type Message = T::Item;

fn into_streaming_request(self) -> Self {
self
}
}

// impl<T> sealed::Sealed for T {}

// pub mod sealed {
Expand All @@ -167,6 +179,11 @@ impl Metadata {
}
}

pub fn insert(mut self, key: String, value: String) -> Self {
self.inner.insert(key, value);
self
}

pub fn from_headers(headers: http::HeaderMap) -> Self {
let mut h: HashMap<String, String> = HashMap::new();
for (k, v) in headers.into_iter() {
Expand Down Expand Up @@ -196,10 +213,12 @@ pub trait Invocation {
fn get_method_name(&self) -> String;
}

#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
pub struct RpcInvocation {
target_service_unique_name: String,
method_name: String,

metadata: Metadata,
}

impl RpcInvocation {
Expand All @@ -211,9 +230,19 @@ impl RpcInvocation {
self.method_name = method_name;
self
}

pub fn with_metadata(mut self, metadata: Metadata) -> Self {
self.metadata = metadata;
self
}

pub fn unique_fingerprint(&self) -> String {
format!("{}#{}", self.target_service_unique_name, self.method_name)
}

pub fn get_metadata(&self) -> Metadata {
self.metadata.clone()
}
}

impl Invocation for RpcInvocation {
Expand Down
4 changes: 4 additions & 0 deletions dubbo/src/protocol/triple/triple_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl TripleInvoker {
.body(body)
.unwrap();

// add header of source
for (k, v) in parts.headers.iter() {
req.headers_mut().insert(k, v.to_owned());
}
// *req.version_mut() = http::Version::HTTP_2;
req.headers_mut()
.insert("method", HeaderValue::from_static("POST"));
Expand Down
50 changes: 41 additions & 9 deletions dubbo/src/triple/client/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl TripleClient {
&mut self,
req: Request<M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -147,6 +147,9 @@ impl TripleClient {
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");

let mt = req.metadata.clone();

let req = req.map(|m| stream::once(future::ready(m)));
let body_stream = encode(
encoder,
Expand All @@ -157,13 +160,18 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand Down Expand Up @@ -200,7 +208,7 @@ impl TripleClient {
&mut self,
req: impl IntoStreamingRequest<Message = M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -210,7 +218,10 @@ impl TripleClient {
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");

let req = req.into_streaming_request();
let mt = req.metadata.clone();

let en = encode(
encoder,
req.into_inner().map(Ok),
Expand All @@ -220,13 +231,18 @@ impl TripleClient {
.into_stream();
let body = hyper::Body::wrap_stream(en);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand All @@ -247,7 +263,7 @@ impl TripleClient {
&mut self,
req: impl IntoStreamingRequest<Message = M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -258,6 +274,8 @@ impl TripleClient {
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");
let req = req.into_streaming_request();
let mt = req.metadata.clone();

let en = encode(
encoder,
req.into_inner().map(Ok),
Expand All @@ -266,14 +284,19 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(en);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

// let mut conn = Connection::new().with_host(http_uri);
for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand Down Expand Up @@ -310,7 +333,7 @@ impl TripleClient {
&mut self,
req: Request<M1>,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
mut invocation: RpcInvocation,
) -> Result<Response<Decoding<M2>>, crate::status::Status>
where
M1: Message + Send + Sync + 'static + Serialize,
Expand All @@ -320,7 +343,10 @@ impl TripleClient {
Box<dyn Decoder<Item = M2, Error = Status> + Send + 'static>,
Box<dyn Encoder<Error = Status, Item = M1> + Send + 'static>,
) = get_codec("application/grpc+proto");

let req = req.map(|m| stream::once(future::ready(m)));
let mt = req.metadata.clone();

let en = encode(
encoder,
req.into_inner().map(Ok),
Expand All @@ -329,13 +355,19 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(en);

invocation = invocation.with_metadata(mt.clone());
let mut invoker = self.mk.new_service(invocation);

let request = http::Request::builder()
let mut request = http::Request::builder()
.header("path", path.to_string())
.body(body)
.unwrap();

for (k, v) in mt.into_headers().iter() {
request.headers_mut().insert(k, v.to_owned());
}

let response = invoker
.call(request)
.await
Expand Down
20 changes: 13 additions & 7 deletions examples/echo/src/echo/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

use dubbo::codegen::*;
use dubbo::{codegen::*, invocation::Metadata};
use example_echo::generated::generated::{echo_client::EchoClient, EchoRequest};
use futures_util::StreamExt;

Expand All @@ -41,11 +41,15 @@ async fn main() {
let mut cli = EchoClient::new(builder);
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
let resp = cli
.unary_echo(Request::new(EchoRequest {
let mut mtdata = Metadata::default();
mtdata = mtdata.insert("static_tag".to_string(), "red".to_string());
let req = Request::from_parts(
mtdata.clone(),
EchoRequest {
message: "message from client".to_string(),
}))
.await;
},
);
let resp = cli.unary_echo(req).await;
let resp = match resp {
Ok(resp) => resp,
Err(err) => return println!("{:?}", err),
Expand All @@ -64,7 +68,9 @@ async fn main() {
message: "msg3 from client streaming".to_string(),
},
];
let req = futures_util::stream::iter(data);
let mut mtdata = Metadata::default();
mtdata = mtdata.insert("client_streaming".to_string(), "true".to_string());
let req = Request::from_parts(mtdata, futures_util::stream::iter(data));
let resp = cli.client_streaming_echo(req).await;
let client_streaming_resp = match resp {
Ok(resp) => resp,
Expand All @@ -84,7 +90,7 @@ async fn main() {
message: "msg3 from client".to_string(),
},
];
let req = futures_util::stream::iter(data);
let req = Request::new(futures_util::stream::iter(data));

let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap();

Expand Down
Loading