Skip to content

Commit

Permalink
metadata: avoid change metadata ref (#566)
Browse files Browse the repository at this point in the history
After metadata is received, every entry should be owned by the grpc c
core, application should clone the byte slice if necessary.

In the past, we get around the problem by increasing the reference
count of the slice. However, it's unsafe to do so as the slice is not
guaranteed to be accessible in the first place.

This PR fixes the problem by introducing an unowned metadata type. It
works just like Metadata, but accessing its content requires manual
check for the lifetime of associated call.

Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay authored Mar 28, 2022
1 parent ccd0fde commit 7767d95
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 54 deletions.
20 changes: 0 additions & 20 deletions grpc-sys/grpc_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,33 +283,13 @@ grpcwrap_request_call_context_destroy(grpcwrap_request_call_context* ctx) {
GPR_EXPORT void GPR_CALLTYPE grpcwrap_batch_context_take_recv_initial_metadata(
grpcwrap_batch_context* ctx, grpc_metadata_array* res) {
grpcwrap_metadata_array_move(res, &(ctx->recv_initial_metadata));

/* According to the documentation for struct grpc_op in grpc_types.h,
* ownership of keys and values for
* metadata stays with the call object. This means we have ref each of the
* keys and values here. */
size_t i;
for (i = 0; i < res->count; i++) {
grpc_slice_ref(res->metadata[i].key);
grpc_slice_ref(res->metadata[i].value);
}
}

GPR_EXPORT void GPR_CALLTYPE
grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata(
grpcwrap_batch_context* ctx, grpc_metadata_array* res) {
grpcwrap_metadata_array_move(res,
&(ctx->recv_status_on_client.trailing_metadata));

/* According to the documentation for struct grpc_op in grpc_types.h,
* ownership of keys and values for
* metadata stays with the call object. This means we have ref each of the
* keys and values here. */
size_t i;
for (i = 0; i < res->count; i++) {
grpc_slice_ref(res->metadata[i].key);
grpc_slice_ref(res->metadata[i].value);
}
}

GPR_EXPORT const char* GPR_CALLTYPE
Expand Down
2 changes: 1 addition & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ protobuf = "2"
lazy_static = { version = "1.3", optional = true }

[build-dependencies]
protobuf-build = { version = ">=0.12", default-features = false }
protobuf-build = { version = ">=0.13", default-features = false }
walkdir = "2.2"
2 changes: 1 addition & 1 deletion src/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut
/// A convenient rust wrapper for the type `grpc_slice`.
///
/// It's expected that the slice should be initialized.
#[repr(C)]
#[repr(transparent)]
pub struct GrpcSlice(grpc_slice);

impl GrpcSlice {
Expand Down
35 changes: 20 additions & 15 deletions src/call/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::call::{check_run, Call, MessageReader, Method};
use crate::channel::Channel;
use crate::codec::{DeserializeFn, SerializeFn};
use crate::error::{Error, Result};
use crate::metadata::{Metadata, MetadataBuilder};
use crate::metadata::{Metadata, UnownedMetadata};
use crate::task::{BatchFuture, BatchType};

/// Update the flag bit in res.
Expand Down Expand Up @@ -225,8 +225,8 @@ pub struct ClientUnaryReceiver<T> {
resp_de: DeserializeFn<T>,
finished: bool,
message: Option<T>,
initial_metadata: Metadata,
trailing_metadata: Metadata,
initial_metadata: UnownedMetadata,
trailing_metadata: UnownedMetadata,
}

impl<T> ClientUnaryReceiver<T> {
Expand All @@ -237,8 +237,8 @@ impl<T> ClientUnaryReceiver<T> {
resp_de,
finished: false,
message: None,
initial_metadata: MetadataBuilder::new().build(),
trailing_metadata: MetadataBuilder::new().build(),
initial_metadata: UnownedMetadata::empty(),
trailing_metadata: UnownedMetadata::empty(),
}
}

Expand Down Expand Up @@ -274,12 +274,14 @@ impl<T> ClientUnaryReceiver<T> {
/// Get the initial metadata.
pub async fn headers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.initial_metadata)
// Because we have a reference to call, so it's safe to read.
Ok(unsafe { self.initial_metadata.assume_valid() })
}

pub async fn trailers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.trailing_metadata)
// Because we have a reference to call, so it's safe to read.
Ok(unsafe { self.trailing_metadata.assume_valid() })
}

pub fn receive_sync(&mut self) -> Result<(Metadata, T, Metadata)> {
Expand Down Expand Up @@ -325,8 +327,8 @@ pub struct ClientCStreamReceiver<T> {
resp_de: DeserializeFn<T>,
finished: bool,
message: Option<T>,
initial_metadata: Metadata,
trailing_metadata: Metadata,
initial_metadata: UnownedMetadata,
trailing_metadata: UnownedMetadata,
}

impl<T> ClientCStreamReceiver<T> {
Expand All @@ -337,8 +339,8 @@ impl<T> ClientCStreamReceiver<T> {
resp_de,
finished: false,
message: None,
initial_metadata: MetadataBuilder::new().build(),
trailing_metadata: MetadataBuilder::new().build(),
initial_metadata: UnownedMetadata::empty(),
trailing_metadata: UnownedMetadata::empty(),
}
}

Expand Down Expand Up @@ -378,12 +380,14 @@ impl<T> ClientCStreamReceiver<T> {
/// Get the initial metadata.
pub async fn headers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.initial_metadata)
// We still have a reference in share call.
Ok(unsafe { self.initial_metadata.assume_valid() })
}

pub async fn trailers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.trailing_metadata)
// We still have a reference in share call.
Ok(unsafe { self.trailing_metadata.assume_valid() })
}
}

Expand Down Expand Up @@ -550,7 +554,7 @@ struct ResponseStreamImpl<H, T> {
read_done: bool,
finished: bool,
resp_de: DeserializeFn<T>,
headers_f: FutureOrValue<BatchFuture, Metadata>,
headers_f: FutureOrValue<BatchFuture, UnownedMetadata>,
}

impl<H: ShareCallHolder + Unpin, T> ResponseStreamImpl<H, T> {
Expand Down Expand Up @@ -623,7 +627,8 @@ impl<H: ShareCallHolder + Unpin, T> ResponseStreamImpl<H, T> {
self.headers_f = FutureOrValue::Value(Pin::new(f).await?.initial_metadata);
}
match &self.headers_f {
FutureOrValue::Value(v) => Ok(v),
// We still have reference to call.
FutureOrValue::Value(v) => Ok(unsafe { v.assume_valid() }),
_ => unreachable!(),
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/call/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::task::{Context, Poll};
use std::{ptr, slice};

use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
use crate::metadata::UnownedMetadata;
use crate::{cq::CompletionQueue, Metadata, MetadataBuilder};
use futures_util::ready;
use libc::c_void;
Expand Down Expand Up @@ -291,8 +292,8 @@ impl BatchContext {
///
/// If initial metadata is not fetched or the method has been called, empty metadata will be
/// returned.
pub fn take_initial_metadata(&mut self) -> Metadata {
let mut res = MetadataBuilder::with_capacity(0).build();
pub fn take_initial_metadata(&mut self) -> UnownedMetadata {
let mut res = UnownedMetadata::empty();
unsafe {
grpcio_sys::grpcwrap_batch_context_take_recv_initial_metadata(
self.ctx,
Expand All @@ -306,8 +307,8 @@ impl BatchContext {
///
/// If trailing metadata is not fetched or the method has been called, empty metadata will be
/// returned.
pub fn take_trailing_metadata(&mut self) -> Metadata {
let mut res = MetadataBuilder::with_capacity(0).build();
pub fn take_trailing_metadata(&mut self) -> UnownedMetadata {
let mut res = UnownedMetadata::empty();
unsafe {
grpc_sys::grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata(
self.ctx,
Expand Down
38 changes: 33 additions & 5 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl MetadataBuilder {
///
/// Metadata value can be ascii string or bytes. They are distinguish by the
/// key suffix, key of bytes value should have suffix '-bin'.
#[repr(C)]
#[repr(transparent)]
pub struct Metadata(grpc_metadata_array);

impl Metadata {
Expand Down Expand Up @@ -234,10 +234,6 @@ impl Metadata {
}
&[]
}

pub(crate) fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array {
&mut self.0 as _
}
}

impl fmt::Debug for Metadata {
Expand Down Expand Up @@ -273,6 +269,38 @@ impl Drop for Metadata {
unsafe impl Send for Metadata {}
unsafe impl Sync for Metadata {}

/// A special metadata that only for receiving metadata from remote.
///
/// gRPC C Core manages metadata internally, it's unsafe to read them unless
/// call is not destroyed.
#[repr(transparent)]
pub struct UnownedMetadata(grpc_metadata_array);

impl UnownedMetadata {
#[inline]
pub fn empty() -> UnownedMetadata {
unsafe { mem::transmute(Metadata::with_capacity(0)) }
}
#[inline]
pub unsafe fn assume_valid(&self) -> &Metadata {
mem::transmute(self)
}

pub fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array {
&mut self.0 as _
}
}

impl Drop for UnownedMetadata {
#[inline]
fn drop(&mut self) {
unsafe { grpcio_sys::grpcwrap_metadata_array_destroy_metadata_only(&mut self.0) }
}
}

unsafe impl Send for UnownedMetadata {}
unsafe impl Sync for UnownedMetadata {}

/// Immutable metadata iterator
///
/// This struct is created by the iter method on `Metadata`.
Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::error::{Error, Result};
use crate::server::RequestCallContext;

pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
pub use self::promise::BatchResult;
pub(crate) use self::promise::BatchResult;
pub use self::promise::BatchType;

/// A handle that is used to notify future that the task finishes.
Expand Down
14 changes: 7 additions & 7 deletions src/task/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use super::Inner;
use crate::call::{BatchContext, MessageReader, RpcStatusCode};
use crate::error::Error;
use crate::{Metadata, MetadataBuilder};
use crate::metadata::UnownedMetadata;

/// Batch job type.
#[derive(PartialEq, Debug)]
Expand All @@ -22,25 +22,25 @@ pub enum BatchType {
/// A promise result which stores a message reader with bundled metadata.
pub struct BatchResult {
pub message_reader: Option<MessageReader>,
pub initial_metadata: Metadata,
pub trailing_metadata: Metadata,
pub initial_metadata: UnownedMetadata,
pub trailing_metadata: UnownedMetadata,
}

impl BatchResult {
pub fn new(
message_reader: Option<MessageReader>,
initial_metadata: Option<Metadata>,
trailing_metadata: Option<Metadata>,
initial_metadata: Option<UnownedMetadata>,
trailing_metadata: Option<UnownedMetadata>,
) -> BatchResult {
let initial_metadata = if let Some(m) = initial_metadata {
m
} else {
MetadataBuilder::new().build()
UnownedMetadata::empty()
};
let trailing_metadata = if let Some(m) = trailing_metadata {
m
} else {
MetadataBuilder::new().build()
UnownedMetadata::empty()
};
BatchResult {
message_reader,
Expand Down

0 comments on commit 7767d95

Please sign in to comment.