From 6459cdca02bd926b07d4afe1073c22ca54da018a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 13 Dec 2024 20:05:25 -0800 Subject: [PATCH] x --- .../server/chaotic_good_server.cc | 46 ++++++++++++------- .../chaotic_good_legacy/server_transport.cc | 20 ++++---- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc index 4c2a14cf75144..734090eb8c05d 100644 --- a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc +++ b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc @@ -289,27 +289,34 @@ void ChaoticGoodServerListener::ActiveConnection::HandshakingState::Start( auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: EndpointReadSettingsFrame(RefCountedPtr self) { return TrySeq( - self->connection_->endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize), + self->connection_->endpoint_.ReadSlice(TcpFrameHeader::kFrameHeaderSize), [self](Slice slice) { // Parse frame header - auto frame_header = FrameHeader::Parse(reinterpret_cast( - GRPC_SLICE_START_PTR(slice.c_slice()))); - if (frame_header.ok() && frame_header->type != FrameType::kSettings) { - frame_header = absl::InternalError("Not a settings frame"); + auto frame_header = + TcpFrameHeader::Parse(reinterpret_cast( + GRPC_SLICE_START_PTR(slice.c_slice()))); + if (frame_header.ok()) { + if (frame_header->header.type != FrameType::kSettings) { + frame_header = absl::InternalError("Not a settings frame"); + } else if (frame_header->payload_connection_id != 0) { + frame_header = absl::InternalError("Unexpected connection id"); + } else if (frame_header->header.stream_id != 0) { + frame_header = absl::InternalError("Unexpected stream id"); + } } return If( frame_header.ok(), [self, &frame_header]() { return TrySeq( self->connection_->endpoint_.Read( - frame_header->payload_length), + frame_header->header.payload_length), [frame_header = *frame_header, self](SliceBuffer buffer) -> absl::StatusOr { // Read Setting frame. SettingsFrame frame; // Deserialize frame from read buffer. - auto status = - frame.Deserialize(frame_header, std::move(buffer)); + auto status = frame.Deserialize(frame_header.header, + std::move(buffer)); if (!status.ok()) return status; if (frame.body.data_channel()) { if (frame.body.connection_id().empty()) { @@ -360,17 +367,22 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: absl::get(self->data_) .config.PrepareServerOutgoingSettings(frame.body); SliceBuffer write_buffer; - frame.MakeHeader().Serialize( - write_buffer.AddTiny(FrameHeader::kFrameHeaderSize)); + TcpFrameHeader{frame.MakeHeader(), 0}.Serialize( + write_buffer.AddTiny(TcpFrameHeader::kFrameHeaderSize)); frame.SerializePayload(write_buffer); return TrySeq( self->connection_->endpoint_.Write(std::move(write_buffer)), [self]() { + auto config = + std::move(absl::get(self->data_).config); + auto frame_transport = MakeRefCounted( + config.MakeTransportOptions(), + std::move(self->connection_->endpoint_), + config.TakePendingDataEndpoints(), + self->connection_->args().GetObjectRef()); return self->connection_->listener_->server_->SetupTransport( - new ChaoticGoodServerTransport( - self->connection_->args(), - std::move(self->connection_->endpoint_), - std::move(absl::get(self->data_).config), - self->connection_->listener_->data_connection_listener_), + new ChaoticGoodServerTransport(self->connection_->args(), + *frame_transport, + config.MakeMessageChunker()), nullptr, self->connection_->args(), nullptr); }); } @@ -381,8 +393,8 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: SettingsFrame frame; frame.body.set_data_channel(true); SliceBuffer write_buffer; - frame.MakeHeader().Serialize( - write_buffer.AddTiny(FrameHeader::kFrameHeaderSize)); + TcpFrameHeader{frame.MakeHeader(), 0}.Serialize( + write_buffer.AddTiny(TcpFrameHeader::kFrameHeaderSize)); frame.SerializePayload(write_buffer); // ignore encoding errors: they will be logged separately already return TrySeq(self->connection_->endpoint_.Write(std::move(write_buffer)), diff --git a/src/core/ext/transport/chaotic_good_legacy/server_transport.cc b/src/core/ext/transport/chaotic_good_legacy/server_transport.cc index d94fb589b62a0..78d4d9b5ace38 100644 --- a/src/core/ext/transport/chaotic_good_legacy/server_transport.cc +++ b/src/core/ext/transport/chaotic_good_legacy/server_transport.cc @@ -126,10 +126,10 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( } namespace { -auto BooleanSuccessToTransportErrorCapturingInitiator(CallInitiator initiator) { - return [initiator = std::move(initiator)](bool success) { - return success ? absl::OkStatus() - : absl::UnavailableError("Transport closed."); +auto StatusFlagToTransportErrorCapturingInitiator(CallInitiator initiator) { + return [initiator = std::move(initiator)](StatusFlag status) { + return status.ok() ? absl::OkStatus() + : absl::UnavailableError("Transport closed."); }; } } // namespace @@ -141,9 +141,9 @@ auto ChaoticGoodServerTransport::SendFragment( << "CHAOTIC_GOOD: SendFragment: frame=" << frame.ToString(); // Capture the call_initiator to ensure the underlying call spine is alive // until the outgoing_frames.Send promise completes. - return Map(outgoing_frames.Send(std::move(frame)), - BooleanSuccessToTransportErrorCapturingInitiator( - std::move(call_initiator))); + return Map( + outgoing_frames.Send(std::move(frame)), + StatusFlagToTransportErrorCapturingInitiator(std::move(call_initiator))); } auto ChaoticGoodServerTransport::SendFragmentAcked( @@ -153,9 +153,9 @@ auto ChaoticGoodServerTransport::SendFragmentAcked( << "CHAOTIC_GOOD: SendFragmentAcked: frame=" << frame.ToString(); // Capture the call_initiator to ensure the underlying call spine is alive // until the outgoing_frames.Send promise completes. - return Map(outgoing_frames.SendAcked(std::move(frame)), - BooleanSuccessToTransportErrorCapturingInitiator( - std::move(call_initiator))); + return Map( + outgoing_frames.SendAcked(std::move(frame)), + StatusFlagToTransportErrorCapturingInitiator(std::move(call_initiator))); } auto ChaoticGoodServerTransport::SendCallBody(