Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 14, 2024
1 parent 44374bb commit 6459cdc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 27 deletions.
46 changes: 29 additions & 17 deletions src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,27 +289,34 @@ void ChaoticGoodServerListener::ActiveConnection::HandshakingState::Start(
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EndpointReadSettingsFrame(RefCountedPtr<HandshakingState> self) {
return TrySeq(
self->connection_->endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
self->connection_->endpoint_.ReadSlice(TcpFrameHeader::kFrameHeaderSize),
[self](Slice slice) {
// Parse frame header
auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(slice.c_slice())));
if (frame_header.ok() && frame_header->type != FrameType::kSettings) {
frame_header = absl::InternalError("Not a settings frame");
auto frame_header =
TcpFrameHeader::Parse(reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(slice.c_slice())));
if (frame_header.ok()) {
if (frame_header->header.type != FrameType::kSettings) {
frame_header = absl::InternalError("Not a settings frame");
} else if (frame_header->payload_connection_id != 0) {
frame_header = absl::InternalError("Unexpected connection id");
} else if (frame_header->header.stream_id != 0) {
frame_header = absl::InternalError("Unexpected stream id");
}
}
return If(
frame_header.ok(),
[self, &frame_header]() {
return TrySeq(
self->connection_->endpoint_.Read(
frame_header->payload_length),
frame_header->header.payload_length),
[frame_header = *frame_header,
self](SliceBuffer buffer) -> absl::StatusOr<bool> {
// Read Setting frame.
SettingsFrame frame;
// Deserialize frame from read buffer.
auto status =
frame.Deserialize(frame_header, std::move(buffer));
auto status = frame.Deserialize(frame_header.header,
std::move(buffer));
if (!status.ok()) return status;
if (frame.body.data_channel()) {
if (frame.body.connection_id().empty()) {
Expand Down Expand Up @@ -360,17 +367,22 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
absl::get<ControlConnection>(self->data_)
.config.PrepareServerOutgoingSettings(frame.body);
SliceBuffer write_buffer;
frame.MakeHeader().Serialize(
write_buffer.AddTiny(FrameHeader::kFrameHeaderSize));
TcpFrameHeader{frame.MakeHeader(), 0}.Serialize(
write_buffer.AddTiny(TcpFrameHeader::kFrameHeaderSize));
frame.SerializePayload(write_buffer);
return TrySeq(
self->connection_->endpoint_.Write(std::move(write_buffer)), [self]() {
auto config =
std::move(absl::get<ControlConnection>(self->data_).config);
auto frame_transport = MakeRefCounted<TcpFrameTransport>(
config.MakeTransportOptions(),
std::move(self->connection_->endpoint_),
config.TakePendingDataEndpoints(),
self->connection_->args().GetObjectRef<EventEngine>());
return self->connection_->listener_->server_->SetupTransport(
new ChaoticGoodServerTransport(
self->connection_->args(),
std::move(self->connection_->endpoint_),
std::move(absl::get<ControlConnection>(self->data_).config),
self->connection_->listener_->data_connection_listener_),
new ChaoticGoodServerTransport(self->connection_->args(),
*frame_transport,
config.MakeMessageChunker()),
nullptr, self->connection_->args(), nullptr);
});
}
Expand All @@ -381,8 +393,8 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
SettingsFrame frame;
frame.body.set_data_channel(true);
SliceBuffer write_buffer;
frame.MakeHeader().Serialize(
write_buffer.AddTiny(FrameHeader::kFrameHeaderSize));
TcpFrameHeader{frame.MakeHeader(), 0}.Serialize(
write_buffer.AddTiny(TcpFrameHeader::kFrameHeaderSize));
frame.SerializePayload(write_buffer);
// ignore encoding errors: they will be logged separately already
return TrySeq(self->connection_->endpoint_.Write(std::move(write_buffer)),
Expand Down
20 changes: 10 additions & 10 deletions src/core/ext/transport/chaotic_good_legacy/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
}

namespace {
auto BooleanSuccessToTransportErrorCapturingInitiator(CallInitiator initiator) {
return [initiator = std::move(initiator)](bool success) {
return success ? absl::OkStatus()
: absl::UnavailableError("Transport closed.");
auto StatusFlagToTransportErrorCapturingInitiator(CallInitiator initiator) {
return [initiator = std::move(initiator)](StatusFlag status) {
return status.ok() ? absl::OkStatus()
: absl::UnavailableError("Transport closed.");
};
}
} // namespace
Expand All @@ -141,9 +141,9 @@ auto ChaoticGoodServerTransport::SendFragment(
<< "CHAOTIC_GOOD: SendFragment: frame=" << frame.ToString();
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.Send(std::move(frame)),
BooleanSuccessToTransportErrorCapturingInitiator(
std::move(call_initiator)));
return Map(
outgoing_frames.Send(std::move(frame)),
StatusFlagToTransportErrorCapturingInitiator(std::move(call_initiator)));
}

auto ChaoticGoodServerTransport::SendFragmentAcked(
Expand All @@ -153,9 +153,9 @@ auto ChaoticGoodServerTransport::SendFragmentAcked(
<< "CHAOTIC_GOOD: SendFragmentAcked: frame=" << frame.ToString();
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.SendAcked(std::move(frame)),
BooleanSuccessToTransportErrorCapturingInitiator(
std::move(call_initiator)));
return Map(
outgoing_frames.SendAcked(std::move(frame)),
StatusFlagToTransportErrorCapturingInitiator(std::move(call_initiator)));
}

auto ChaoticGoodServerTransport::SendCallBody(
Expand Down

0 comments on commit 6459cdc

Please sign in to comment.