From bbb7cdbcc7ed29a0950c1e378391282080cea7e4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 18 Dec 2024 09:56:02 -0800 Subject: [PATCH] x --- .../chaotic_good/message_reassembly.h | 5 +- src/core/lib/promise/party.cc | 1 + src/core/lib/surface/client_call.cc | 2 +- src/core/lib/transport/call_filters.h | 251 ++++++++++-------- src/core/lib/transport/message.h | 12 +- test/core/transport/call_spine_benchmarks.h | 6 +- 6 files changed, 151 insertions(+), 126 deletions(-) diff --git a/src/core/ext/transport/chaotic_good/message_reassembly.h b/src/core/ext/transport/chaotic_good/message_reassembly.h index 796c90ea116ee..1a564bab407a3 100644 --- a/src/core/ext/transport/chaotic_good/message_reassembly.h +++ b/src/core/ext/transport/chaotic_good/message_reassembly.h @@ -94,8 +94,9 @@ class MessageReassembly { return If( done, [&]() { - auto message = Arena::MakePooled( - std::move(chunk_receiver_->incoming), 0); + auto message = + Arena::MakePooled(std::move(chunk_receiver_->incoming), + GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH); chunk_receiver_.reset(); return sink.PushMessage(std::move(message)); }, diff --git a/src/core/lib/promise/party.cc b/src/core/lib/promise/party.cc index b7c38baaf671d..f4f7638301ee3 100644 --- a/src/core/lib/promise/party.cc +++ b/src/core/lib/promise/party.cc @@ -292,6 +292,7 @@ void Party::RunPartyAndUnref(uint64_t prev_state) { while (wakeup_mask_ != 0) { auto wakeup_mask = std::exchange(wakeup_mask_, 0); while (wakeup_mask != 0) { + GRPC_LATENT_SEE_INNER_SCOPE("run_one_promise"); const uint64_t t = LowestOneBit(wakeup_mask); const int i = absl::countr_zero(t); wakeup_mask ^= t; diff --git a/src/core/lib/surface/client_call.cc b/src/core/lib/surface/client_call.cc index 4bc4dbe7a89a4..f97cdc0647b1f 100644 --- a/src/core/lib/surface/client_call.cc +++ b/src/core/lib/surface/client_call.cc @@ -296,7 +296,7 @@ void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, send.c_slice_buffer()); auto msg = arena()->MakePooled( std::move(send), - op.flags | (is_last ? GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE : 0)); + op.flags | (is_last ? GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH : 0)); return [this, msg = std::move(msg)]() mutable { return started_call_initiator_.PushMessage(std::move(msg)); }; diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h index ba6df65f276ac..c0434742262e8 100644 --- a/src/core/lib/transport/call_filters.h +++ b/src/core/lib/transport/call_filters.h @@ -1616,6 +1616,7 @@ class CallFilters { private: Poll> FinishStep( Poll> p) { + GRPC_LATENT_SEE_INNER_SCOPE("MetadataExecutor::FinishStep"); auto* r = p.value_if_ready(); if (r == nullptr) return Pending{}; if (r->ok != nullptr) { @@ -1672,6 +1673,7 @@ class CallFilters { private: Poll FinishStep(Poll> p) { + GRPC_LATENT_SEE_INNER_SCOPE("MessageExecutor::FinishStep"); auto* r = p.value_if_ready(); if (r == nullptr) return Pending{}; if (r->ok != nullptr) { @@ -1699,12 +1701,14 @@ class CallFilters { // Returns a promise that resolves to ValueOrFailure GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { call_state_.BeginPullClientInitialMetadata(); - return MetadataExecutor( - this, stacks_.cbegin(), stacks_.cend()); + return GRPC_LATENT_SEE_PROMISE( + "PullClientInitialMetadata", + (MetadataExecutor(this, stacks_.cbegin(), + stacks_.cend()))); } // Server: Push server initial metadata // Returns a promise that resolves to a StatusFlag indicating success @@ -1715,122 +1719,133 @@ class CallFilters { // Client: Fetch server initial metadata // Returns a promise that resolves to ValueOrFailure GRPC_MUST_USE_RESULT auto PullServerInitialMetadata() { - return Seq( - [this]() { - return call_state_.PollPullServerInitialMetadataAvailable(); - }, - [this](bool has_server_initial_metadata) { - return If( - has_server_initial_metadata, - [this]() { - return Map( - MetadataExecutor< - absl::optional, - ServerMetadataHandle, - &CallFilters::push_server_initial_metadata_, - &filters_detail::StackData::server_initial_metadata, - &CallState::FinishPullServerInitialMetadata, - StacksVector::const_reverse_iterator>( - this, stacks_.crbegin(), stacks_.crend()), - [](ValueOrFailure> r) { - if (r.ok()) return std::move(*r); - return absl::optional{}; - }); - }, - []() { - return Immediate(absl::optional{}); - }); - }); + return GRPC_LATENT_SEE_PROMISE( + "PullServerInitialMetadata", + Seq( + [this]() { + return call_state_.PollPullServerInitialMetadataAvailable(); + }, + [this](bool has_server_initial_metadata) { + return If( + has_server_initial_metadata, + [this]() { + return Map( + MetadataExecutor< + absl::optional, + ServerMetadataHandle, + &CallFilters::push_server_initial_metadata_, + &filters_detail::StackData::server_initial_metadata, + &CallState::FinishPullServerInitialMetadata, + StacksVector::const_reverse_iterator>( + this, stacks_.crbegin(), stacks_.crend()), + [](ValueOrFailure> + r) { + if (r.ok()) return std::move(*r); + return absl::optional{}; + }); + }, + []() { + return Immediate(absl::optional{}); + }); + })); } // Client: Push client to server message // Returns a promise that resolves to a StatusFlag indicating success GRPC_MUST_USE_RESULT auto PushClientToServerMessage(MessageHandle message) { DCHECK_NE(message.get(), nullptr); - return Seq( - [this]() { - return call_state_.PollReadyForPushClientToServerMessage(); - }, - [this, message = std::move(message)]() mutable { - call_state_.BeginPushClientToServerMessage(); - DCHECK_EQ(push_client_to_server_message_.get(), nullptr); - bool is_last = - (message->flags() & GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE) != 0; - push_client_to_server_message_ = std::move(message); - return If( - is_last, []() -> Poll { return Success{}; }, - [this]() { - return call_state_.PollReadyForPushClientToServerMessage(); - }); - }); + return GRPC_LATENT_SEE_PROMISE( + "PushClientToServerMessage", + Seq( + [this]() { + return call_state_.PollReadyForPushClientToServerMessage(); + }, + [this, message = std::move(message)]() mutable { + call_state_.BeginPushClientToServerMessage(); + DCHECK_EQ(push_client_to_server_message_.get(), nullptr); + bool is_last = + (message->flags() & GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH) != 0; + push_client_to_server_message_ = std::move(message); + return If( + is_last, []() -> Poll { return Success{}; }, + [this]() { + return call_state_.PollReadyForPushClientToServerMessage(); + }); + })); } // Client: Indicate that no more messages will be sent void FinishClientToServerSends() { call_state_.ClientToServerHalfClose(); } // Server: Fetch client to server message // Returns a promise that resolves to ClientToServerNextMessage GRPC_MUST_USE_RESULT auto PullClientToServerMessage() { - return TrySeq( - [this]() { - return call_state_.PollPullClientToServerMessageAvailable(); - }, - [this](bool message_available) { - return If( - message_available, - [this]() { - return MessageExecutor< - &CallFilters::push_client_to_server_message_, - &filters_detail::StackData::client_to_server_messages, - &CallState::FinishPullClientToServerMessage, - StacksVector::const_iterator>(this, stacks_.cbegin(), - stacks_.cend()); - }, - []() -> ClientToServerNextMessage { - return ClientToServerNextMessage(); - }); - }); + return GRPC_LATENT_SEE_PROMISE( + "PullClientToServerMessage", + TrySeq( + [this]() { + return call_state_.PollPullClientToServerMessageAvailable(); + }, + [this](bool message_available) { + return If( + message_available, + [this]() { + return MessageExecutor< + &CallFilters::push_client_to_server_message_, + &filters_detail::StackData::client_to_server_messages, + &CallState::FinishPullClientToServerMessage, + StacksVector::const_iterator>(this, stacks_.cbegin(), + stacks_.cend()); + }, + []() -> ClientToServerNextMessage { + return ClientToServerNextMessage(); + }); + })); } // Server: Push server to client message // Returns a promise that resolves to a StatusFlag indicating success GRPC_MUST_USE_RESULT auto PushServerToClientMessage(MessageHandle message) { DCHECK_NE(message.get(), nullptr); - return Seq( - [this]() { - return call_state_.PollReadyForPushServerToClientMessage(); - }, - [this, message = std::move(message)]() mutable { - call_state_.BeginPushServerToClientMessage(); - DCHECK_EQ(push_server_to_client_message_.get(), nullptr); - bool is_last = - (message->flags() & GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE) != 0; - push_server_to_client_message_ = std::move(message); - return If( - is_last, []() -> Poll { return Success{}; }, - [this]() { - return call_state_.PollReadyForPushServerToClientMessage(); - }); - }); + return GRPC_LATENT_SEE_PROMISE( + "PushServerToClientMessage", + Seq( + [this]() { + return call_state_.PollReadyForPushServerToClientMessage(); + }, + [this, message = std::move(message)]() mutable { + call_state_.BeginPushServerToClientMessage(); + DCHECK_EQ(push_server_to_client_message_.get(), nullptr); + bool is_last = + (message->flags() & GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH) != 0; + push_server_to_client_message_ = std::move(message); + return If( + is_last, []() -> Poll { return Success{}; }, + [this]() { + return call_state_.PollReadyForPushServerToClientMessage(); + }); + })); } // Server: Fetch server to client message // Returns a promise that resolves to ServerToClientNextMessage GRPC_MUST_USE_RESULT auto PullServerToClientMessage() { - return TrySeq( - [this]() { - return call_state_.PollPullServerToClientMessageAvailable(); - }, - [this](bool message_available) { - return If( - message_available, - [this]() { - return MessageExecutor< - &CallFilters::push_server_to_client_message_, - &filters_detail::StackData::server_to_client_messages, - &CallState::FinishPullServerToClientMessage, - StacksVector::const_reverse_iterator>( - this, stacks_.crbegin(), stacks_.crend()); - }, - []() -> ServerToClientNextMessage { - return ServerToClientNextMessage(); - }); - }); + return GRPC_LATENT_SEE_PROMISE( + "PullServerToClientMessage", + TrySeq( + [this]() { + return call_state_.PollPullServerToClientMessageAvailable(); + }, + [this](bool message_available) { + return If( + message_available, + [this]() { + return MessageExecutor< + &CallFilters::push_server_to_client_message_, + &filters_detail::StackData::server_to_client_messages, + &CallState::FinishPullServerToClientMessage, + StacksVector::const_reverse_iterator>( + this, stacks_.crbegin(), stacks_.crend()); + }, + []() -> ServerToClientNextMessage { + return ServerToClientNextMessage(); + }); + })); } // Server: Indicate end of response // Closes the request entirely - no messages can be sent/received @@ -1842,20 +1857,24 @@ class CallFilters { // Client: Fetch server trailing metadata // Returns a promise that resolves to ServerMetadataHandle GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata() { - return Map( - [this]() { return call_state_.PollServerTrailingMetadataAvailable(); }, - [this](Empty) { - auto value = std::move(push_server_trailing_metadata_); - if (call_data_ != nullptr) { - for (auto it = stacks_.crbegin(); it != stacks_.crend(); ++it) { - value = filters_detail::RunServerTrailingMetadata( - it->stack->data_.server_trailing_metadata, - filters_detail::Offset(call_data_, it->call_data_offset), - std::move(value)); - } - } - return value; - }); + return GRPC_LATENT_SEE_PROMISE( + "PullServerTrailingMetadata", + Map( + [this]() { + return call_state_.PollServerTrailingMetadataAvailable(); + }, + [this](Empty) { + auto value = std::move(push_server_trailing_metadata_); + if (call_data_ != nullptr) { + for (auto it = stacks_.crbegin(); it != stacks_.crend(); ++it) { + value = filters_detail::RunServerTrailingMetadata( + it->stack->data_.server_trailing_metadata, + filters_detail::Offset(call_data_, it->call_data_offset), + std::move(value)); + } + } + return value; + })); } // Server: Wait for server trailing metadata to have been sent // Returns a promise that resolves to a StatusFlag indicating whether the diff --git a/src/core/lib/transport/message.h b/src/core/lib/transport/message.h index d9b0b20aaeab5..30516539530d3 100644 --- a/src/core/lib/transport/message.h +++ b/src/core/lib/transport/message.h @@ -27,14 +27,18 @@ /// to be decompressed by the message_decompress filter. (Does not apply for /// stream compression.) #define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u) -/// Internal bit flag for when we know a particular message is the last one to -/// be sent: this allows various flow control optimizations. -#define GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE (0x20000000u) +/// Usually when we `Push` to a `CallInitiator` or `CallHandler` they block +/// until the message has been pulled. This flag signals that instead the push +/// should finish once the data is available to be pulled (this may still block +/// if a previous message is not yet claimed). +/// Setting this is appropriate for messages sent from the API that are known +/// to be the last message to be sent, OR for messages from the transport. +#define GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH (0x20000000u) /// Mask of all valid internal flags. #define GRPC_WRITE_INTERNAL_USED_MASK \ (GRPC_WRITE_INTERNAL_COMPRESS | \ GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED | \ - GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE) + GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH) namespace grpc_core { diff --git a/test/core/transport/call_spine_benchmarks.h b/test/core/transport/call_spine_benchmarks.h index 5315c32dd5895..9d08fd4fef45a 100644 --- a/test/core/transport/call_spine_benchmarks.h +++ b/test/core/transport/call_spine_benchmarks.h @@ -56,8 +56,8 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) { }), Map(handler.PullMessage(), [](ClientToServerNextMessage msg) { return msg.status(); }), - handler.PushMessage(fixture.MakePayload( - GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE))), + handler.PushMessage( + fixture.MakePayload(GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH))), [&handler_done, &fixture, handler](StatusFlag status) mutable { CHECK(status.ok()); handler.PushServerTrailingMetadata( @@ -71,7 +71,7 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) { return Map( AllOk( Map(initiator.PushMessage(fixture.MakePayload( - GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE)), + GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH)), [](StatusFlag) { return Success{}; }), Map(initiator.PullServerInitialMetadata(), [](absl::optional md) {