Skip to content

Commit

Permalink
use flush to send
Browse files Browse the repository at this point in the history
  • Loading branch information
miamia0 committed Jan 10, 2024
1 parent 6045f13 commit a0a975a
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 124 deletions.
47 changes: 28 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio = { workspace = true, features = [
"parking_lot",
] }
tracing.workspace = true
tokio-condvar = "0.1.0"

[features]
default = []
Expand Down
15 changes: 8 additions & 7 deletions volo-thrift/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,10 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
}

#[derive(Clone)]
pub struct MessageService<Resp, MkT, MkC>
pub struct MessageService<Req, Resp, MkT, MkC>
where
Resp: EntryMessage + Send + 'static,
Req: EntryMessage + Send + 'static + Sync,
Resp: EntryMessage + Send + 'static + Sync,
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
{
Expand All @@ -455,13 +456,13 @@ where
#[cfg(feature = "multiplex")]
inner: motore::utils::Either<
pingpong::Client<Resp, MkT, MkC>,
crate::transport::multiplex::Client<Resp, MkT, MkC>,
crate::transport::multiplex::Client<Req, Resp, MkT, MkC>,
>,
}

impl<Req, Resp, MkT, MkC> Service<ClientContext, Req> for MessageService<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Service<ClientContext, Req> for MessageService<Req, Resp, MkT, MkC>
where
Req: EntryMessage + 'static + Send,
Req: Send + 'static + EntryMessage + Sync,
Resp: Send + 'static + EntryMessage + Sync,
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Expand Down Expand Up @@ -506,8 +507,8 @@ where
+ Clone
+ Sync,
Req: EntryMessage + Send + 'static + Sync + Clone,
Resp: EntryMessage + Send + 'static,
IL: Layer<MessageService<Resp, MkT, MkC>>,
Resp: EntryMessage + Send + 'static + Sync,
IL: Layer<MessageService<Req, Resp, MkT, MkC>>,
IL::Service:
Service<ClientContext, Req, Response = Option<Resp>> + Sync + Clone + Send + 'static,
<IL::Service as Service<ClientContext, Req>>::Error: Send + Into<Error>,
Expand Down
51 changes: 49 additions & 2 deletions volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ pub struct DefaultEncoder<E, W> {
impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
for DefaultEncoder<E, W>
{
#[inline]
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
async fn send<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
Expand Down Expand Up @@ -177,6 +176,54 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
}
// write_result
}

#[inline]
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> Result<(), crate::Error> {
cx.stats_mut().record_encode_start_at();

// first, we need to get the size of the message
let (real_size, malloc_size) = self.encoder.size(cx, &msg)?;
trace!(
"[VOLO] codec encode message real size: {}, malloc size: {}",
real_size,
malloc_size
);
cx.stats_mut().set_write_size(real_size);

// then we reserve the size of the message in the linked bytes
self.linked_bytes.reserve(malloc_size);
// after that, we encode the message into the linked bytes
self.encoder
.encode(cx, &mut self.linked_bytes, msg)
.map_err(|e| {
// record the error time
cx.stats_mut().record_encode_end_at();
e
})?;

cx.stats_mut().record_encode_end_at();
Ok(())
}

async fn flush(&mut self) -> Result<(), crate::Error> {
self.linked_bytes
.write_all_vectored(&mut self.writer)
.await
.map_err(TransportError::from)?;

match self.writer.flush().await.map_err(TransportError::from) {
Ok(()) => Ok(()),
Err(e) => Err(e.into()),
}
}

async fn reset(&mut self) {
self.linked_bytes.reset();
}
}

pub struct DefaultDecoder<D, R> {
Expand Down
8 changes: 8 additions & 0 deletions volo-thrift/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ pub trait Decoder: Send + 'static {
///
/// Note: [`Encoder`] should be designed to be ready for reuse.
pub trait Encoder: Send + 'static {
fn reset(&mut self) -> impl Future<Output = ()> + Send;
fn send<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> impl Future<Output = Result<(), crate::Error>> + Send;
fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> impl Future<Output = Result<(), crate::Error>> + Send;

fn flush(&mut self) -> impl Future<Output = Result<(), crate::Error>> + Send;
}

/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a
Expand Down
1 change: 0 additions & 1 deletion volo-thrift/src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub(crate) mod incoming;
#[cfg(feature = "multiplex")]
pub mod multiplex;
pub mod pingpong;
pub mod pool;
Expand Down
38 changes: 21 additions & 17 deletions volo-thrift/src/transport/multiplex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ use crate::{
EntryMessage, Error, ThriftMessage,
};

pub struct MakeClientTransport<MkT, MkC, Resp>
pub struct MakeClientTransport<MkT, MkC, Req, Resp>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>,
{
make_transport: MkT,
make_codec: MkC,
_phantom: PhantomData<fn() -> Resp>,
_phantom: PhantomData<(fn() -> Resp, fn() -> Req)>,
}

impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Resp> Clone
for MakeClientTransport<MkT, MkC, Resp>
impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Req, Resp> Clone
for MakeClientTransport<MkT, MkC, Req, Resp>
{
fn clone(&self) -> Self {
Self {
Expand All @@ -37,7 +37,7 @@ impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Resp> Cl
}
}

impl<MkT, MkC, Resp> MakeClientTransport<MkT, MkC, Resp>
impl<MkT, MkC, Req, Resp> MakeClientTransport<MkT, MkC, Req, Resp>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>,
Expand All @@ -52,13 +52,14 @@ where
}
}

impl<MkT, MkC, Resp> UnaryService<Address> for MakeClientTransport<MkT, MkC, Resp>
impl<MkT, MkC, Req, Resp> UnaryService<Address> for MakeClientTransport<MkT, MkC, Req, Resp>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
type Response = ThriftTransport<MkC::Encoder, Resp>;
type Response = ThriftTransport<MkC::Encoder, Req, Resp>;
type Error = io::Error;

async fn call(&self, target: Address) -> Result<Self::Response, Self::Error> {
Expand All @@ -73,22 +74,24 @@ where
}
}

pub struct Client<Resp, MkT, MkC>
pub struct Client<Req, Resp, MkT, MkC>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
#[allow(clippy::type_complexity)]
make_transport: PooledMakeTransport<MakeClientTransport<MkT, MkC, Resp>, Address>,
make_transport: PooledMakeTransport<MakeClientTransport<MkT, MkC, Req, Resp>, Address>,
_marker: PhantomData<Resp>,
}

impl<Resp, MkT, MkC> Clone for Client<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Clone for Client<Req, Resp, MkT, MkC>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
fn clone(&self) -> Self {
Self {
Expand All @@ -98,11 +101,12 @@ where
}
}

impl<Resp, MkT, MkC> Client<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Client<Req, Resp, MkT, MkC>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
pub fn new(make_transport: MkT, pool_cfg: Option<Config>, make_codec: MkC) -> Self {
let make_transport = MakeClientTransport::new(make_transport, make_codec);
Expand All @@ -114,9 +118,9 @@ where
}
}

impl<Req, Resp, MkT, MkC> Service<ClientContext, ThriftMessage<Req>> for Client<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Service<ClientContext, ThriftMessage<Req>> for Client<Req, Resp, MkT, MkC>
where
Req: Send + 'static + EntryMessage,
Req: Send + 'static + EntryMessage + Sync,
Resp: EntryMessage + Send + 'static + Sync,
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Expand Down
1 change: 1 addition & 0 deletions volo-thrift/src/transport/multiplex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod client;
mod server;
mod thrift_transport;
pub mod utils;

pub use client::Client;
pub use server::serve;
Loading

0 comments on commit a0a975a

Please sign in to comment.