Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 18, 2024
1 parent 1a96288 commit bbb7cdb
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 126 deletions.
5 changes: 3 additions & 2 deletions src/core/ext/transport/chaotic_good/message_reassembly.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ class MessageReassembly {
return If(
done,
[&]() {
auto message = Arena::MakePooled<Message>(
std::move(chunk_receiver_->incoming), 0);
auto message =
Arena::MakePooled<Message>(std::move(chunk_receiver_->incoming),
GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH);
chunk_receiver_.reset();
return sink.PushMessage(std::move(message));
},
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/promise/party.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ void Party::RunPartyAndUnref(uint64_t prev_state) {
while (wakeup_mask_ != 0) {
auto wakeup_mask = std::exchange(wakeup_mask_, 0);
while (wakeup_mask != 0) {
GRPC_LATENT_SEE_INNER_SCOPE("run_one_promise");
const uint64_t t = LowestOneBit(wakeup_mask);
const int i = absl::countr_zero(t);
wakeup_mask ^= t;
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/surface/client_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ void ClientCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
send.c_slice_buffer());
auto msg = arena()->MakePooled<Message>(
std::move(send),
op.flags | (is_last ? GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE : 0));
op.flags | (is_last ? GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH : 0));
return [this, msg = std::move(msg)]() mutable {
return started_call_initiator_.PushMessage(std::move(msg));
};
Expand Down
251 changes: 135 additions & 116 deletions src/core/lib/transport/call_filters.h
Original file line number Diff line number Diff line change
Expand Up @@ -1616,6 +1616,7 @@ class CallFilters {
private:
Poll<ValueOrFailure<Output>> FinishStep(
Poll<filters_detail::ResultOr<Input>> p) {
GRPC_LATENT_SEE_INNER_SCOPE("MetadataExecutor::FinishStep");
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
if (r->ok != nullptr) {
Expand Down Expand Up @@ -1672,6 +1673,7 @@ class CallFilters {

private:
Poll<NextMsg> FinishStep(Poll<filters_detail::ResultOr<MessageHandle>> p) {
GRPC_LATENT_SEE_INNER_SCOPE("MessageExecutor::FinishStep");
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
if (r->ok != nullptr) {
Expand Down Expand Up @@ -1699,12 +1701,14 @@ class CallFilters {
// Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle>
GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() {
call_state_.BeginPullClientInitialMetadata();
return MetadataExecutor<ClientMetadataHandle, ClientMetadataHandle,
&CallFilters::push_client_initial_metadata_,
&filters_detail::StackData::client_initial_metadata,
&CallState::FinishPullClientInitialMetadata,
StacksVector::const_iterator>(
this, stacks_.cbegin(), stacks_.cend());
return GRPC_LATENT_SEE_PROMISE(
"PullClientInitialMetadata",
(MetadataExecutor<ClientMetadataHandle, ClientMetadataHandle,
&CallFilters::push_client_initial_metadata_,
&filters_detail::StackData::client_initial_metadata,
&CallState::FinishPullClientInitialMetadata,
StacksVector::const_iterator>(this, stacks_.cbegin(),
stacks_.cend())));
}
// Server: Push server initial metadata
// Returns a promise that resolves to a StatusFlag indicating success
Expand All @@ -1715,122 +1719,133 @@ class CallFilters {
// Client: Fetch server initial metadata
// Returns a promise that resolves to ValueOrFailure<ServerMetadataHandle>
GRPC_MUST_USE_RESULT auto PullServerInitialMetadata() {
return Seq(
[this]() {
return call_state_.PollPullServerInitialMetadataAvailable();
},
[this](bool has_server_initial_metadata) {
return If(
has_server_initial_metadata,
[this]() {
return Map(
MetadataExecutor<
absl::optional<ServerMetadataHandle>,
ServerMetadataHandle,
&CallFilters::push_server_initial_metadata_,
&filters_detail::StackData::server_initial_metadata,
&CallState::FinishPullServerInitialMetadata,
StacksVector::const_reverse_iterator>(
this, stacks_.crbegin(), stacks_.crend()),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> r) {
if (r.ok()) return std::move(*r);
return absl::optional<ServerMetadataHandle>{};
});
},
[]() {
return Immediate(absl::optional<ServerMetadataHandle>{});
});
});
return GRPC_LATENT_SEE_PROMISE(
"PullServerInitialMetadata",
Seq(
[this]() {
return call_state_.PollPullServerInitialMetadataAvailable();
},
[this](bool has_server_initial_metadata) {
return If(
has_server_initial_metadata,
[this]() {
return Map(
MetadataExecutor<
absl::optional<ServerMetadataHandle>,
ServerMetadataHandle,
&CallFilters::push_server_initial_metadata_,
&filters_detail::StackData::server_initial_metadata,
&CallState::FinishPullServerInitialMetadata,
StacksVector::const_reverse_iterator>(
this, stacks_.crbegin(), stacks_.crend()),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>>
r) {
if (r.ok()) return std::move(*r);
return absl::optional<ServerMetadataHandle>{};
});
},
[]() {
return Immediate(absl::optional<ServerMetadataHandle>{});
});
}));
}
// Client: Push client to server message
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushClientToServerMessage(MessageHandle message) {
DCHECK_NE(message.get(), nullptr);
return Seq(
[this]() {
return call_state_.PollReadyForPushClientToServerMessage();
},
[this, message = std::move(message)]() mutable {
call_state_.BeginPushClientToServerMessage();
DCHECK_EQ(push_client_to_server_message_.get(), nullptr);
bool is_last =
(message->flags() & GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE) != 0;
push_client_to_server_message_ = std::move(message);
return If(
is_last, []() -> Poll<StatusFlag> { return Success{}; },
[this]() {
return call_state_.PollReadyForPushClientToServerMessage();
});
});
return GRPC_LATENT_SEE_PROMISE(
"PushClientToServerMessage",
Seq(
[this]() {
return call_state_.PollReadyForPushClientToServerMessage();
},
[this, message = std::move(message)]() mutable {
call_state_.BeginPushClientToServerMessage();
DCHECK_EQ(push_client_to_server_message_.get(), nullptr);
bool is_last =
(message->flags() & GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH) != 0;
push_client_to_server_message_ = std::move(message);
return If(
is_last, []() -> Poll<StatusFlag> { return Success{}; },
[this]() {
return call_state_.PollReadyForPushClientToServerMessage();
});
}));
}
// Client: Indicate that no more messages will be sent
void FinishClientToServerSends() { call_state_.ClientToServerHalfClose(); }
// Server: Fetch client to server message
// Returns a promise that resolves to ClientToServerNextMessage
GRPC_MUST_USE_RESULT auto PullClientToServerMessage() {
return TrySeq(
[this]() {
return call_state_.PollPullClientToServerMessageAvailable();
},
[this](bool message_available) {
return If(
message_available,
[this]() {
return MessageExecutor<
&CallFilters::push_client_to_server_message_,
&filters_detail::StackData::client_to_server_messages,
&CallState::FinishPullClientToServerMessage,
StacksVector::const_iterator>(this, stacks_.cbegin(),
stacks_.cend());
},
[]() -> ClientToServerNextMessage {
return ClientToServerNextMessage();
});
});
return GRPC_LATENT_SEE_PROMISE(
"PullClientToServerMessage",
TrySeq(
[this]() {
return call_state_.PollPullClientToServerMessageAvailable();
},
[this](bool message_available) {
return If(
message_available,
[this]() {
return MessageExecutor<
&CallFilters::push_client_to_server_message_,
&filters_detail::StackData::client_to_server_messages,
&CallState::FinishPullClientToServerMessage,
StacksVector::const_iterator>(this, stacks_.cbegin(),
stacks_.cend());
},
[]() -> ClientToServerNextMessage {
return ClientToServerNextMessage();
});
}));
}
// Server: Push server to client message
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushServerToClientMessage(MessageHandle message) {
DCHECK_NE(message.get(), nullptr);
return Seq(
[this]() {
return call_state_.PollReadyForPushServerToClientMessage();
},
[this, message = std::move(message)]() mutable {
call_state_.BeginPushServerToClientMessage();
DCHECK_EQ(push_server_to_client_message_.get(), nullptr);
bool is_last =
(message->flags() & GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE) != 0;
push_server_to_client_message_ = std::move(message);
return If(
is_last, []() -> Poll<StatusFlag> { return Success{}; },
[this]() {
return call_state_.PollReadyForPushServerToClientMessage();
});
});
return GRPC_LATENT_SEE_PROMISE(
"PushServerToClientMessage",
Seq(
[this]() {
return call_state_.PollReadyForPushServerToClientMessage();
},
[this, message = std::move(message)]() mutable {
call_state_.BeginPushServerToClientMessage();
DCHECK_EQ(push_server_to_client_message_.get(), nullptr);
bool is_last =
(message->flags() & GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH) != 0;
push_server_to_client_message_ = std::move(message);
return If(
is_last, []() -> Poll<StatusFlag> { return Success{}; },
[this]() {
return call_state_.PollReadyForPushServerToClientMessage();
});
}));
}
// Server: Fetch server to client message
// Returns a promise that resolves to ServerToClientNextMessage
GRPC_MUST_USE_RESULT auto PullServerToClientMessage() {
return TrySeq(
[this]() {
return call_state_.PollPullServerToClientMessageAvailable();
},
[this](bool message_available) {
return If(
message_available,
[this]() {
return MessageExecutor<
&CallFilters::push_server_to_client_message_,
&filters_detail::StackData::server_to_client_messages,
&CallState::FinishPullServerToClientMessage,
StacksVector::const_reverse_iterator>(
this, stacks_.crbegin(), stacks_.crend());
},
[]() -> ServerToClientNextMessage {
return ServerToClientNextMessage();
});
});
return GRPC_LATENT_SEE_PROMISE(
"PullServerToClientMessage",
TrySeq(
[this]() {
return call_state_.PollPullServerToClientMessageAvailable();
},
[this](bool message_available) {
return If(
message_available,
[this]() {
return MessageExecutor<
&CallFilters::push_server_to_client_message_,
&filters_detail::StackData::server_to_client_messages,
&CallState::FinishPullServerToClientMessage,
StacksVector::const_reverse_iterator>(
this, stacks_.crbegin(), stacks_.crend());
},
[]() -> ServerToClientNextMessage {
return ServerToClientNextMessage();
});
}));
}
// Server: Indicate end of response
// Closes the request entirely - no messages can be sent/received
Expand All @@ -1842,20 +1857,24 @@ class CallFilters {
// Client: Fetch server trailing metadata
// Returns a promise that resolves to ServerMetadataHandle
GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata() {
return Map(
[this]() { return call_state_.PollServerTrailingMetadataAvailable(); },
[this](Empty) {
auto value = std::move(push_server_trailing_metadata_);
if (call_data_ != nullptr) {
for (auto it = stacks_.crbegin(); it != stacks_.crend(); ++it) {
value = filters_detail::RunServerTrailingMetadata(
it->stack->data_.server_trailing_metadata,
filters_detail::Offset(call_data_, it->call_data_offset),
std::move(value));
}
}
return value;
});
return GRPC_LATENT_SEE_PROMISE(
"PullServerTrailingMetadata",
Map(
[this]() {
return call_state_.PollServerTrailingMetadataAvailable();
},
[this](Empty) {
auto value = std::move(push_server_trailing_metadata_);
if (call_data_ != nullptr) {
for (auto it = stacks_.crbegin(); it != stacks_.crend(); ++it) {
value = filters_detail::RunServerTrailingMetadata(
it->stack->data_.server_trailing_metadata,
filters_detail::Offset(call_data_, it->call_data_offset),
std::move(value));
}
}
return value;
}));
}
// Server: Wait for server trailing metadata to have been sent
// Returns a promise that resolves to a StatusFlag indicating whether the
Expand Down
12 changes: 8 additions & 4 deletions src/core/lib/transport/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@
/// to be decompressed by the message_decompress filter. (Does not apply for
/// stream compression.)
#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u)
/// Internal bit flag for when we know a particular message is the last one to
/// be sent: this allows various flow control optimizations.
#define GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE (0x20000000u)
/// Usually when we `Push` to a `CallInitiator` or `CallHandler` they block
/// until the message has been pulled. This flag signals that instead the push
/// should finish once the data is available to be pulled (this may still block
/// if a previous message is not yet claimed).
/// Setting this is appropriate for messages sent from the API that are known
/// to be the last message to be sent, OR for messages from the transport.
#define GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH (0x20000000u)
/// Mask of all valid internal flags.
#define GRPC_WRITE_INTERNAL_USED_MASK \
(GRPC_WRITE_INTERNAL_COMPRESS | \
GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED | \
GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE)
GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH)

namespace grpc_core {

Expand Down
6 changes: 3 additions & 3 deletions test/core/transport/call_spine_benchmarks.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) {
}),
Map(handler.PullMessage(),
[](ClientToServerNextMessage msg) { return msg.status(); }),
handler.PushMessage(fixture.MakePayload(
GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE))),
handler.PushMessage(
fixture.MakePayload(GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH))),
[&handler_done, &fixture, handler](StatusFlag status) mutable {
CHECK(status.ok());
handler.PushServerTrailingMetadata(
Expand All @@ -71,7 +71,7 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) {
return Map(
AllOk<StatusFlag>(
Map(initiator.PushMessage(fixture.MakePayload(
GRPC_WRITE_INTERNAL_KNOWN_LAST_MESSAGE)),
GRPC_WRITE_INTERNAL_IMMEDIATE_PUSH)),
[](StatusFlag) { return Success{}; }),
Map(initiator.PullServerInitialMetadata(),
[](absl::optional<ServerMetadataHandle> md) {
Expand Down

0 comments on commit bbb7cdb

Please sign in to comment.