diff --git a/grpc-sys/grpc_wrap.cc b/grpc-sys/grpc_wrap.cc index 0aba4338..786214b3 100644 --- a/grpc-sys/grpc_wrap.cc +++ b/grpc-sys/grpc_wrap.cc @@ -283,16 +283,6 @@ grpcwrap_request_call_context_destroy(grpcwrap_request_call_context* ctx) { GPR_EXPORT void GPR_CALLTYPE grpcwrap_batch_context_take_recv_initial_metadata( grpcwrap_batch_context* ctx, grpc_metadata_array* res) { grpcwrap_metadata_array_move(res, &(ctx->recv_initial_metadata)); - - /* According to the documentation for struct grpc_op in grpc_types.h, - * ownership of keys and values for - * metadata stays with the call object. This means we have ref each of the - * keys and values here. */ - size_t i; - for (i = 0; i < res->count; i++) { - grpc_slice_ref(res->metadata[i].key); - grpc_slice_ref(res->metadata[i].value); - } } GPR_EXPORT void GPR_CALLTYPE @@ -300,16 +290,6 @@ grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata( grpcwrap_batch_context* ctx, grpc_metadata_array* res) { grpcwrap_metadata_array_move(res, &(ctx->recv_status_on_client.trailing_metadata)); - - /* According to the documentation for struct grpc_op in grpc_types.h, - * ownership of keys and values for - * metadata stays with the call object. This means we have ref each of the - * keys and values here. */ - size_t i; - for (i = 0; i < res->count; i++) { - grpc_slice_ref(res->metadata[i].key); - grpc_slice_ref(res->metadata[i].value); - } } GPR_EXPORT const char* GPR_CALLTYPE diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 27cfdc94..d638d479 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -27,5 +27,5 @@ protobuf = "2" lazy_static = { version = "1.3", optional = true } [build-dependencies] -protobuf-build = { version = ">=0.12", default-features = false } +protobuf-build = { version = ">=0.13", default-features = false } walkdir = "2.2" diff --git a/src/buf.rs b/src/buf.rs index de8fe546..7bde4903 100644 --- a/src/buf.rs +++ b/src/buf.rs @@ -15,7 +15,7 @@ const INLINED_SIZE: usize = mem::size_of::() + mem::size_of::<*mut /// A convenient rust wrapper for the type `grpc_slice`. /// /// It's expected that the slice should be initialized. -#[repr(C)] +#[repr(transparent)] pub struct GrpcSlice(grpc_slice); impl GrpcSlice { diff --git a/src/call/client.rs b/src/call/client.rs index c87eac12..1032ad54 100644 --- a/src/call/client.rs +++ b/src/call/client.rs @@ -19,7 +19,7 @@ use crate::call::{check_run, Call, MessageReader, Method}; use crate::channel::Channel; use crate::codec::{DeserializeFn, SerializeFn}; use crate::error::{Error, Result}; -use crate::metadata::{Metadata, MetadataBuilder}; +use crate::metadata::{Metadata, UnownedMetadata}; use crate::task::{BatchFuture, BatchType}; /// Update the flag bit in res. @@ -225,8 +225,8 @@ pub struct ClientUnaryReceiver { resp_de: DeserializeFn, finished: bool, message: Option, - initial_metadata: Metadata, - trailing_metadata: Metadata, + initial_metadata: UnownedMetadata, + trailing_metadata: UnownedMetadata, } impl ClientUnaryReceiver { @@ -237,8 +237,8 @@ impl ClientUnaryReceiver { resp_de, finished: false, message: None, - initial_metadata: MetadataBuilder::new().build(), - trailing_metadata: MetadataBuilder::new().build(), + initial_metadata: UnownedMetadata::empty(), + trailing_metadata: UnownedMetadata::empty(), } } @@ -274,12 +274,14 @@ impl ClientUnaryReceiver { /// Get the initial metadata. pub async fn headers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.initial_metadata) + // Because we have a reference to call, so it's safe to read. + Ok(unsafe { self.initial_metadata.assume_valid() }) } pub async fn trailers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.trailing_metadata) + // Because we have a reference to call, so it's safe to read. + Ok(unsafe { self.trailing_metadata.assume_valid() }) } pub fn receive_sync(&mut self) -> Result<(Metadata, T, Metadata)> { @@ -325,8 +327,8 @@ pub struct ClientCStreamReceiver { resp_de: DeserializeFn, finished: bool, message: Option, - initial_metadata: Metadata, - trailing_metadata: Metadata, + initial_metadata: UnownedMetadata, + trailing_metadata: UnownedMetadata, } impl ClientCStreamReceiver { @@ -337,8 +339,8 @@ impl ClientCStreamReceiver { resp_de, finished: false, message: None, - initial_metadata: MetadataBuilder::new().build(), - trailing_metadata: MetadataBuilder::new().build(), + initial_metadata: UnownedMetadata::empty(), + trailing_metadata: UnownedMetadata::empty(), } } @@ -378,12 +380,14 @@ impl ClientCStreamReceiver { /// Get the initial metadata. pub async fn headers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.initial_metadata) + // We still have a reference in share call. + Ok(unsafe { self.initial_metadata.assume_valid() }) } pub async fn trailers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.trailing_metadata) + // We still have a reference in share call. + Ok(unsafe { self.trailing_metadata.assume_valid() }) } } @@ -550,7 +554,7 @@ struct ResponseStreamImpl { read_done: bool, finished: bool, resp_de: DeserializeFn, - headers_f: FutureOrValue, + headers_f: FutureOrValue, } impl ResponseStreamImpl { @@ -623,7 +627,8 @@ impl ResponseStreamImpl { self.headers_f = FutureOrValue::Value(Pin::new(f).await?.initial_metadata); } match &self.headers_f { - FutureOrValue::Value(v) => Ok(v), + // We still have reference to call. + FutureOrValue::Value(v) => Ok(unsafe { v.assume_valid() }), _ => unreachable!(), } } diff --git a/src/call/mod.rs b/src/call/mod.rs index 7aff2766..8a4f7e61 100644 --- a/src/call/mod.rs +++ b/src/call/mod.rs @@ -11,6 +11,7 @@ use std::task::{Context, Poll}; use std::{ptr, slice}; use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context}; +use crate::metadata::UnownedMetadata; use crate::{cq::CompletionQueue, Metadata, MetadataBuilder}; use futures_util::ready; use libc::c_void; @@ -291,8 +292,8 @@ impl BatchContext { /// /// If initial metadata is not fetched or the method has been called, empty metadata will be /// returned. - pub fn take_initial_metadata(&mut self) -> Metadata { - let mut res = MetadataBuilder::with_capacity(0).build(); + pub fn take_initial_metadata(&mut self) -> UnownedMetadata { + let mut res = UnownedMetadata::empty(); unsafe { grpcio_sys::grpcwrap_batch_context_take_recv_initial_metadata( self.ctx, @@ -306,8 +307,8 @@ impl BatchContext { /// /// If trailing metadata is not fetched or the method has been called, empty metadata will be /// returned. - pub fn take_trailing_metadata(&mut self) -> Metadata { - let mut res = MetadataBuilder::with_capacity(0).build(); + pub fn take_trailing_metadata(&mut self) -> UnownedMetadata { + let mut res = UnownedMetadata::empty(); unsafe { grpc_sys::grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata( self.ctx, diff --git a/src/metadata.rs b/src/metadata.rs index a30143b3..296c6bc2 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -147,7 +147,7 @@ impl MetadataBuilder { /// /// Metadata value can be ascii string or bytes. They are distinguish by the /// key suffix, key of bytes value should have suffix '-bin'. -#[repr(C)] +#[repr(transparent)] pub struct Metadata(grpc_metadata_array); impl Metadata { @@ -234,10 +234,6 @@ impl Metadata { } &[] } - - pub(crate) fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array { - &mut self.0 as _ - } } impl fmt::Debug for Metadata { @@ -273,6 +269,38 @@ impl Drop for Metadata { unsafe impl Send for Metadata {} unsafe impl Sync for Metadata {} +/// A special metadata that only for receiving metadata from remote. +/// +/// gRPC C Core manages metadata internally, it's unsafe to read them unless +/// call is not destroyed. +#[repr(transparent)] +pub struct UnownedMetadata(grpc_metadata_array); + +impl UnownedMetadata { + #[inline] + pub fn empty() -> UnownedMetadata { + unsafe { mem::transmute(Metadata::with_capacity(0)) } + } + #[inline] + pub unsafe fn assume_valid(&self) -> &Metadata { + mem::transmute(self) + } + + pub fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array { + &mut self.0 as _ + } +} + +impl Drop for UnownedMetadata { + #[inline] + fn drop(&mut self) { + unsafe { grpcio_sys::grpcwrap_metadata_array_destroy_metadata_only(&mut self.0) } + } +} + +unsafe impl Send for UnownedMetadata {} +unsafe impl Sync for UnownedMetadata {} + /// Immutable metadata iterator /// /// This struct is created by the iter method on `Metadata`. diff --git a/src/task/mod.rs b/src/task/mod.rs index 3c456db2..4f5654cd 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -22,7 +22,7 @@ use crate::error::{Error, Result}; use crate::server::RequestCallContext; pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork}; -pub use self::promise::BatchResult; +pub(crate) use self::promise::BatchResult; pub use self::promise::BatchType; /// A handle that is used to notify future that the task finishes. diff --git a/src/task/promise.rs b/src/task/promise.rs index 7d25c0c3..e9b36468 100644 --- a/src/task/promise.rs +++ b/src/task/promise.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use super::Inner; use crate::call::{BatchContext, MessageReader, RpcStatusCode}; use crate::error::Error; -use crate::{Metadata, MetadataBuilder}; +use crate::metadata::UnownedMetadata; /// Batch job type. #[derive(PartialEq, Debug)] @@ -22,25 +22,25 @@ pub enum BatchType { /// A promise result which stores a message reader with bundled metadata. pub struct BatchResult { pub message_reader: Option, - pub initial_metadata: Metadata, - pub trailing_metadata: Metadata, + pub initial_metadata: UnownedMetadata, + pub trailing_metadata: UnownedMetadata, } impl BatchResult { pub fn new( message_reader: Option, - initial_metadata: Option, - trailing_metadata: Option, + initial_metadata: Option, + trailing_metadata: Option, ) -> BatchResult { let initial_metadata = if let Some(m) = initial_metadata { m } else { - MetadataBuilder::new().build() + UnownedMetadata::empty() }; let trailing_metadata = if let Some(m) = trailing_metadata { m } else { - MetadataBuilder::new().build() + UnownedMetadata::empty() }; BatchResult { message_reader,