From 406018be45d3705eb2130db8a4b0de5cf5521ee3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 12 Dec 2024 19:27:12 -0800 Subject: [PATCH] rubbly --- .../chaotic_good/chaotic_good_transport.h | 253 ------------------ src/core/ext/transport/chaotic_good/frame.cc | 30 +++ src/core/ext/transport/chaotic_good/frame.h | 3 + .../transport/chaotic_good/frame_header.cc | 42 +-- .../ext/transport/chaotic_good/frame_header.h | 18 -- .../transport/chaotic_good/frame_transport.h | 55 +++- .../chaotic_good/server_transport.cc | 98 ++----- .../transport/chaotic_good/server_transport.h | 24 +- .../chaotic_good/tcp_frame_transport.cc | 216 +++++++++++++++ .../chaotic_good/tcp_frame_transport.h | 64 ++++- 10 files changed, 393 insertions(+), 410 deletions(-) delete mode 100644 src/core/ext/transport/chaotic_good/chaotic_good_transport.h create mode 100644 src/core/ext/transport/chaotic_good/tcp_frame_transport.cc diff --git a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h b/src/core/ext/transport/chaotic_good/chaotic_good_transport.h deleted file mode 100644 index 4c83a57bd1867..0000000000000 --- a/src/core/ext/transport/chaotic_good/chaotic_good_transport.h +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H -#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H - -#include - -#include -#include -#include -#include - -#include "absl/strings/escaping.h" -#include "src/core/ext/transport/chaotic_good/control_endpoint.h" -#include "src/core/ext/transport/chaotic_good/data_endpoints.h" -#include "src/core/ext/transport/chaotic_good/frame.h" -#include "src/core/ext/transport/chaotic_good/frame_header.h" -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/event_engine/event_engine_context.h" -#include "src/core/lib/event_engine/tcp_socket_utils.h" -#include "src/core/lib/promise/loop.h" -#include "src/core/lib/promise/match_promise.h" -#include "src/core/lib/promise/mpsc.h" -#include "src/core/lib/promise/seq.h" -#include "src/core/lib/promise/try_join.h" -#include "src/core/lib/promise/try_seq.h" -#include "src/core/lib/transport/call_spine.h" -#include "src/core/lib/transport/promise_endpoint.h" - -namespace grpc_core { -namespace chaotic_good { - -inline std::vector OneDataEndpoint(PromiseEndpoint endpoint) { - std::vector ep; - ep.emplace_back(std::move(endpoint)); - return ep; -} - -// One received frame: the header, and the serialized bytes of the payload. -// The payload may not yet be received into memory, so the accessor for that -// returns a promise that will need to be resolved prior to inspecting the -// bytes. -// In this way we can pull bytes from various different data connections and -// read them in any order, but still have a trivial reassembly in the receiving -// call promise. -class IncomingFrame { - public: - template - IncomingFrame(FrameHeader header, T payload, size_t remove_padding) - : header_(header), - payload_(std::move(payload)), - remove_padding_(remove_padding) {} - - const FrameHeader& header() { return header_; } - - auto Payload() { - return Map( - MatchPromise( - std::move(payload_), - [](absl::StatusOr status) { return status; }, - [](DataEndpoints::ReadTicket ticket) { return ticket.Await(); }), - [remove_padding = - remove_padding_](absl::StatusOr payload) { - if (payload.ok()) payload->RemoveLastNBytesNoInline(remove_padding); - return payload; - }); - } - - private: - FrameHeader header_; - absl::variant, DataEndpoints::ReadTicket> - payload_; - size_t remove_padding_; -}; - -class ChaoticGoodTransport : public RefCounted { - public: - struct Options { - uint32_t encode_alignment = 64; - uint32_t decode_alignment = 64; - uint32_t inlined_payload_size_threshold = 8 * 1024; - }; - - ChaoticGoodTransport( - PromiseEndpoint control_endpoint, - std::vector pending_data_endpoints, - std::shared_ptr - event_engine, - Options options, bool enable_tracing) - : event_engine_(std::move(event_engine)), - control_endpoint_(std::move(control_endpoint), event_engine_.get()), - data_endpoints_(std::move(pending_data_endpoints), event_engine_.get(), - enable_tracing), - options_(options) {} - - auto WriteFrame(const FrameInterface& frame) { - FrameHeader header = frame.MakeHeader(); - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: WriteFrame to:" - << ResolvedAddressToString(control_endpoint_.GetPeerAddress()) - .value_or("<>") - << " " << frame.ToString(); - return If( - // If we have no data endpoints, OR this is a small payload - data_endpoints_.empty() || - header.payload_length <= options_.inlined_payload_size_threshold, - // ... then write it to the control endpoint - [this, &header, &frame]() { - SliceBuffer output; - header.Serialize(output.AddTiny(FrameHeader::kFrameHeaderSize)); - frame.SerializePayload(output); - return control_endpoint_.Write(std::move(output)); - }, - // ... otherwise write it to a data connection - [this, header, &frame]() mutable { - SliceBuffer payload; - // Temporarily give a bogus connection id to get padding right - header.payload_connection_id = 1; - const size_t padding = header.Padding(options_.encode_alignment); - frame.SerializePayload(payload); - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: Send " << payload.Length() - << "b payload on data channel; add " << padding << " bytes for " - << options_.encode_alignment << " alignment"; - if (padding != 0) { - auto slice = MutableSlice::CreateUninitialized(padding); - memset(slice.data(), 0, padding); - payload.AppendIndexed(Slice(std::move(slice))); - } - return Seq(data_endpoints_.Write(std::move(payload)), - [this, header](uint32_t connection_id) mutable { - header.payload_connection_id = connection_id + 1; - SliceBuffer header_frame; - header.Serialize( - header_frame.AddTiny(FrameHeader::kFrameHeaderSize)); - return control_endpoint_.Write(std::move(header_frame)); - }); - }); - } - - // Common outbound loop for both client and server (these vary only over the - // frame type). - template - auto TransportWriteLoop(MpscReceiver& outgoing_frames) { - return Loop([self = Ref(), &outgoing_frames] { - return TrySeq( - // Get next outgoing frame. - outgoing_frames.Next(), - // Serialize and write it out. - [self = self.get()](Frame client_frame) { - return self->WriteFrame( - absl::ConvertVariantTo(client_frame)); - }, - []() -> LoopCtl { - // The write failures will be caught in TrySeq and exit loop. - // Therefore, only need to return Continue() in the last lambda - // function. - return Continue(); - }); - }); - } - - // Read frame header and payloads for control and data portions of one frame. - // Resolves to StatusOr. - auto ReadFrameBytes() { - return TrySeq( - control_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize), - [this](Slice read_buffer) { - auto frame_header = - FrameHeader::Parse(reinterpret_cast( - GRPC_SLICE_START_PTR(read_buffer.c_slice()))); - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: ReadHeader from:" - << ResolvedAddressToString(control_endpoint_.GetPeerAddress()) - .value_or("<>") - << " " - << (frame_header.ok() ? frame_header->ToString() - : frame_header.status().ToString()); - return frame_header; - }, - [this](FrameHeader frame_header) { - return If( - // If the payload is on the connection frame - frame_header.payload_connection_id == 0, - // ... then read the data immediately and return an IncomingFrame - // that contains the payload. - // We need to do this here so that we do not create head of line - // blocking issues reading later control frames (but waiting for a - // call to get scheduled time to read the payload). - [this, frame_header]() { - return Map(control_endpoint_.Read(frame_header.payload_length), - [frame_header](absl::StatusOr payload) - -> absl::StatusOr { - if (!payload.ok()) return payload.status(); - return IncomingFrame(frame_header, - std::move(payload), 0); - }); - }, - // ... otherwise issue a read to the appropriate data endpoint, - // which will return a read ticket - which can be used later - // in the call promise to asynchronously wait for those bytes - // to be available. - [this, frame_header]() -> absl::StatusOr { - const auto padding = - frame_header.Padding(options_.decode_alignment); - return IncomingFrame( - frame_header, - data_endpoints_.Read(frame_header.payload_connection_id - 1, - frame_header.payload_length + padding), - padding); - }); - }); - } - - template - absl::StatusOr DeserializeFrame(const FrameHeader& header, - SliceBuffer payload) { - T frame; - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: Deserialize " << header << " with payload " - << absl::CEscape(payload.JoinIntoString()); - CHECK_EQ(header.payload_length, payload.Length()); - auto s = frame.Deserialize(header, std::move(payload)); - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: DeserializeFrame " - << (s.ok() ? frame.ToString() : s.ToString()); - if (s.ok()) return std::move(frame); - return std::move(s); - } - - private: - std::shared_ptr event_engine_; - ControlEndpoint control_endpoint_; - DataEndpoints data_endpoints_; - const Options options_; -}; - -} // namespace chaotic_good -} // namespace grpc_core - -#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index 8504cd7f2afcb..cb77f5527e8ad 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -29,6 +29,7 @@ #include "src/core/ext/transport/chaotic_good/chaotic_good_frame.pb.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/lib/promise/context.h" +#include "src/core/lib/promise/switch.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" @@ -286,5 +287,34 @@ std::string MessageChunkFrame::ToString() const { ", payload=", payload.Length(), "b}"); } +namespace { +template +absl::StatusOr DeserializeFrame(const FrameHeader& header, + SliceBuffer payload) { + T frame; + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: Deserialize " << header << " with payload " + << absl::CEscape(payload.JoinIntoString()); + CHECK_EQ(header.payload_length, payload.Length()); + auto s = frame.Deserialize(header, std::move(payload)); + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: DeserializeFrame " + << (s.ok() ? frame.ToString() : s.ToString()); + if (s.ok()) return std::move(frame); + return std::move(s); +} +} // namespace + +absl::StatusOr ParseFrame(const FrameHeader& header, + SliceBuffer payload) { + return Switch(header.type, Case([&]() { + return DeserializeFrame(header, + std::move(payload)); + }), + Default([]() -> absl::StatusOr { + return absl::InternalError("bah"); + })); +} + } // namespace chaotic_good } // namespace grpc_core diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index 6105c215db5d2..548c9abd1fd1a 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -184,6 +184,9 @@ using Frame = ServerTrailingMetadataFrame, MessageFrame, BeginMessageFrame, MessageChunkFrame, ClientEndOfStream, CancelFrame>; +absl::StatusOr ParseFrame(const FrameHeader& header, + SliceBuffer payload); + } // namespace chaotic_good } // namespace grpc_core diff --git a/src/core/ext/transport/chaotic_good/frame_header.cc b/src/core/ext/transport/chaotic_good/frame_header.cc index 5833897987083..1957d93fbbc57 100644 --- a/src/core/ext/transport/chaotic_good/frame_header.cc +++ b/src/core/ext/transport/chaotic_good/frame_header.cc @@ -24,48 +24,8 @@ namespace grpc_core { namespace chaotic_good { -namespace { -void WriteLittleEndianUint32(uint32_t value, uint8_t* data) { - data[0] = static_cast(value); - data[1] = static_cast(value >> 8); - data[2] = static_cast(value >> 16); - data[3] = static_cast(value >> 24); -} - -uint32_t ReadLittleEndianUint32(const uint8_t* data) { - return static_cast(data[0]) | - (static_cast(data[1]) << 8) | - (static_cast(data[2]) << 16) | - (static_cast(data[3]) << 24); -} -} // namespace - -// Serializes a frame header into a buffer of 24 bytes. -void FrameHeader::Serialize(uint8_t* data) const { - WriteLittleEndianUint32((static_cast(type) << 16) | - static_cast(payload_connection_id), - data); - WriteLittleEndianUint32(stream_id, data + 4); - WriteLittleEndianUint32(payload_length, data + 8); -} - -// Parses a frame header from a buffer of 24 bytes. All 24 bytes are consumed. -absl::StatusOr FrameHeader::Parse(const uint8_t* data) { - FrameHeader header; - const uint32_t type_and_conn_id = ReadLittleEndianUint32(data); - if (type_and_conn_id & 0xff000000u) { - return absl::InternalError("Non-zero reserved byte received"); - } - header.type = static_cast(type_and_conn_id >> 16); - header.payload_connection_id = type_and_conn_id & 0xffff; - header.stream_id = ReadLittleEndianUint32(data + 4); - header.payload_length = ReadLittleEndianUint32(data + 8); - return header; -} - std::string FrameHeader::ToString() const { - return absl::StrCat("[type:", type, " conn:", payload_connection_id, - " stream_id:", stream_id, + return absl::StrCat("[type:", type, " stream_id:", stream_id, " payload_length:", payload_length, "]"); } diff --git a/src/core/ext/transport/chaotic_good/frame_header.h b/src/core/ext/transport/chaotic_good/frame_header.h index c88a49807f267..199440458974b 100644 --- a/src/core/ext/transport/chaotic_good/frame_header.h +++ b/src/core/ext/transport/chaotic_good/frame_header.h @@ -52,34 +52,16 @@ void AbslStringify(Sink& sink, FrameType type) { struct FrameHeader { FrameType type = FrameType::kCancel; - uint16_t payload_connection_id = 0; uint32_t stream_id = 0; uint32_t payload_length = 0; - // Parses a frame header from a buffer of 12 bytes. All 12 bytes are consumed. - static absl::StatusOr Parse(const uint8_t* data); - // Serializes a frame header into a buffer of 12 bytes. - void Serialize(uint8_t* data) const; // Report contents as a string std::string ToString() const; - // Required padding to maintain alignment. - uint32_t Padding(uint32_t alignment) const { - if (payload_connection_id == 0) { - return 0; - } - if (payload_length % alignment == 0) { - return 0; - } - return alignment - (payload_length % alignment); - } bool operator==(const FrameHeader& h) const { return type == h.type && stream_id == h.stream_id && - payload_connection_id == h.payload_connection_id && payload_length == h.payload_length; } - // Frame header size is fixed to 12 bytes. - enum { kFrameHeaderSize = 12 }; }; inline std::ostream& operator<<(std::ostream& out, const FrameHeader& h) { diff --git a/src/core/ext/transport/chaotic_good/frame_transport.h b/src/core/ext/transport/chaotic_good/frame_transport.h index df369ea9d3d8f..350ba3c28655b 100644 --- a/src/core/ext/transport/chaotic_good/frame_transport.h +++ b/src/core/ext/transport/chaotic_good/frame_transport.h @@ -16,16 +16,63 @@ #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_FRAME_TRANSPORT_H #include "src/core/ext/transport/chaotic_good/frame.h" +#include "src/core/lib/promise/map.h" +#include "src/core/lib/promise/match_promise.h" #include "src/core/lib/promise/mpsc.h" #include "src/core/lib/promise/party.h" - +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/promise.h" namespace grpc_core { namespace chaotic_good { -class FrameTransport { +// One received frame: the header, and the serialized bytes of the payload. +// The payload may not yet be received into memory, so the accessor for that +// returns a promise that will need to be resolved prior to inspecting the +// bytes. +// In this way we can pull bytes from various different data connections and +// read them in any order, but still have a trivial reassembly in the receiving +// call promise. +class IncomingFrame { + public: + IncomingFrame(FrameHeader header, absl::StatusOr payload) + : header_(header), payload_(std::move(payload)) {} + IncomingFrame(FrameHeader header, + Promise> payload) + : header_(header), payload_(std::move(payload)) {} + + const FrameHeader& header() { return header_; } + + auto Payload() { + return Map( + MatchPromise( + std::move(payload_), + [](absl::StatusOr status) { return status; }, + [](Promise> promise) { + return promise; + }), + [this](absl::StatusOr payload) -> absl::StatusOr { + if (!payload.ok()) return payload.status(); + return ParseFrame(header_, std::move(*payload)); + }); + } + + private: + FrameHeader header_; + absl::variant, + Promise>> + payload_; +}; + +class FrameTransport : public RefCounted { public: - virtual void StartReading(Party* party, MpscSender frames) = 0; - virtual void StartWriting(Party* party, MpscReceiver frames) = 0; + // Spawn a read loop onto party - read frames from the wire, push them onto + // frames. + // TODO(ctiller): this use case probably warrants a buffered pipe type. + virtual void StartReading(Party* party, PipeSender frames, + absl::AnyInvocable on_done) = 0; + // Spawn a write loop onto party - write frames from frames to the wire. + virtual void StartWriting(Party* party, MpscReceiver frames, + absl::AnyInvocable on_done) = 0; }; } // namespace chaotic_good diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index fbef5d8b745ac..4a0e01d93a0ea 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -90,99 +90,60 @@ auto ChaoticGoodServerTransport::PushFrameIntoCall(RefCountedPtr stream, } template -auto ChaoticGoodServerTransport::DispatchFrame( - RefCountedPtr transport, IncomingFrame frame) { +auto ChaoticGoodServerTransport::DispatchFrame(IncomingFrame frame) { auto stream = LookupStream(frame.header().stream_id); return If( stream != nullptr, - [this, &stream, &frame, &transport]() { + [this, &stream, &frame]() { // TODO(ctiller): instead of SpawnWaitable here we probably want a // small queue to push into, so that the call can proceed // asynchronously to other calls regardless of frame ordering. return stream->call.SpawnWaitable( - "push-frame", [this, stream, frame = std::move(frame), - transport = std::move(transport)]() mutable { + "push-frame", [this, stream, frame = std::move(frame)]() mutable { return TrySeq( frame.Payload(), - [transport = std::move(transport), - header = frame.header()](SliceBuffer payload) { - return transport->DeserializeFrame(header, - std::move(payload)); - }, - [stream = std::move(stream), this](T frame) mutable { + [stream = std::move(stream), this](Frame frame) mutable { auto& call = stream->call; - return Map(call.CancelIfFails(PushFrameIntoCall( - std::move(stream), std::move(frame))), - [](auto) { return absl::OkStatus(); }); + return Map( + call.CancelIfFails(PushFrameIntoCall( + std::move(stream), std::move(absl::get(frame)))), + [](auto) { return absl::OkStatus(); }); }); }); }, []() { return absl::OkStatus(); }); } -namespace { -auto BooleanSuccessToTransportErrorCapturingInitiator(CallInitiator initiator) { - return [initiator = std::move(initiator)](bool success) { - return success ? absl::OkStatus() - : absl::UnavailableError("Transport closed."); - }; -} -} // namespace - -auto ChaoticGoodServerTransport::SendFrame(Frame frame, - MpscSender outgoing_frames, - CallInitiator call_initiator) { - // Capture the call_initiator to ensure the underlying call spine is alive - // until the outgoing_frames.Send promise completes. - return Map(outgoing_frames.Send(std::move(frame)), - BooleanSuccessToTransportErrorCapturingInitiator( - std::move(call_initiator))); -} - -auto ChaoticGoodServerTransport::SendFrameAcked( - Frame frame, MpscSender outgoing_frames, - CallInitiator call_initiator) { - // Capture the call_initiator to ensure the underlying call spine is alive - // until the outgoing_frames.Send promise completes. - return Map(outgoing_frames.SendAcked(std::move(frame)), - BooleanSuccessToTransportErrorCapturingInitiator( - std::move(call_initiator))); -} - auto ChaoticGoodServerTransport::SendCallBody(uint32_t stream_id, - MpscSender outgoing_frames, CallInitiator call_initiator) { // Continuously send client frame with client to server messages. return ForEach(MessagesFrom(call_initiator), - [this, stream_id, outgoing_frames = std::move(outgoing_frames), - call_initiator](MessageHandle message) mutable { - return Map(message_chunker_.Send(std::move(message), - stream_id, outgoing_frames), - BooleanSuccessToTransportErrorCapturingInitiator( - std::move(call_initiator))); + [this, stream_id](MessageHandle message) mutable { + return Map( + message_chunker_.Send(std::move(message), stream_id, + outgoing_frames_), + [](bool r) { return StatusFlag(r); }); }); } auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( - uint32_t stream_id, MpscSender outgoing_frames, - CallInitiator call_initiator) { + uint32_t stream_id, CallInitiator call_initiator) { return TrySeq( // Wait for initial metadata then send it out. call_initiator.PullServerInitialMetadata(), - [stream_id, outgoing_frames, call_initiator, + [stream_id, call_initiator, this](absl::optional md) mutable { GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD: SendCallInitialMetadataAndBody: md=" << (md.has_value() ? (*md)->DebugString() : "null"); return If( md.has_value(), - [&md, stream_id, &outgoing_frames, &call_initiator, this]() { + [&md, stream_id, &call_initiator, this]() { ServerInitialMetadataFrame frame; frame.body = ServerMetadataProtoFromGrpc(**md); frame.stream_id = stream_id; - return TrySeq( - SendFrame(std::move(frame), outgoing_frames, call_initiator), - SendCallBody(stream_id, outgoing_frames, call_initiator)); + return TrySeq(outgoing_frames_.Send(std::move(frame)), + SendCallBody(stream_id, call_initiator)); }, []() { return absl::OkStatus(); }); }); @@ -190,11 +151,9 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( auto ChaoticGoodServerTransport::CallOutboundLoop( uint32_t stream_id, CallInitiator call_initiator) { - auto outgoing_frames = outgoing_frames_.MakeSender(); return GRPC_LATENT_SEE_PROMISE( "CallOutboundLoop", - Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames, - call_initiator), + Seq(Map(SendCallInitialMetadataAndBody(stream_id, call_initiator), [stream_id](absl::Status main_body_result) { GRPC_TRACE_VLOG(chaotic_good, 2) << "CHAOTIC_GOOD: CallOutboundLoop: stream_id=" << stream_id @@ -204,26 +163,18 @@ auto ChaoticGoodServerTransport::CallOutboundLoop( call_initiator.PullServerTrailingMetadata(), // Capture the call_initiator to ensure the underlying call_spine // is alive until the SendFragment promise completes. - [this, stream_id, outgoing_frames, - call_initiator](ServerMetadataHandle md) mutable { + [this, stream_id](ServerMetadataHandle md) mutable { ServerTrailingMetadataFrame frame; frame.body = ServerMetadataProtoFromGrpc(*md); frame.stream_id = stream_id; - return SendFrame(std::move(frame), outgoing_frames, call_initiator); + return outgoing_frames_.Send(std::move(frame)); })); } absl::Status ChaoticGoodServerTransport::NewStream( - ChaoticGoodTransport& transport, const FrameHeader& header, - SliceBuffer payload) { - CHECK_EQ(header.payload_length, payload.Length()); - auto client_initial_metadata_frame = - transport.DeserializeFrame( - header, std::move(payload)); - if (!client_initial_metadata_frame.ok()) { - return client_initial_metadata_frame.status(); - } - auto md = ClientMetadataGrpcFromProto(client_initial_metadata_frame->body); + uint32_t stream_id, + ClientInitialMetadataFrame client_initial_metadata_frame) { + auto md = ClientMetadataGrpcFromProto(client_initial_metadata_frame.body); if (!md.ok()) { return md.status(); } @@ -233,7 +184,6 @@ absl::Status ChaoticGoodServerTransport::NewStream( absl::optional call_initiator; auto call = MakeCallPair(std::move(*md), std::move(arena)); call_initiator.emplace(std::move(call.initiator)); - const auto stream_id = client_initial_metadata_frame->stream_id; auto add_result = NewStream(stream_id, *call_initiator); if (!add_result.ok()) { call_initiator.reset(); diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index 6cea394a83ea2..6779c4e048430 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -42,6 +42,7 @@ #include "absl/status/statusor.h" #include "absl/types/optional.h" #include "absl/types/variant.h" +#include "frame_transport.h" #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" #include "src/core/ext/transport/chaotic_good/config.h" #include "src/core/ext/transport/chaotic_good/frame.h" @@ -79,8 +80,7 @@ namespace chaotic_good { class ChaoticGoodServerTransport final : public ServerTransport { public: ChaoticGoodServerTransport(const ChannelArgs& args, - PromiseEndpoint control_endpoint, Config config, - RefCountedPtr); + FrameTransport& frame_transport); FilterStackTransport* filter_stack_transport() override { return nullptr; } ClientTransport* client_transport() override { return nullptr; } @@ -107,10 +107,8 @@ class ChaoticGoodServerTransport final : public ServerTransport { RefCountedPtr LookupStream(uint32_t stream_id); RefCountedPtr ExtractStream(uint32_t stream_id); auto SendCallInitialMetadataAndBody(uint32_t stream_id, - MpscSender outgoing_frames, CallInitiator call_initiator); - auto SendCallBody(uint32_t stream_id, MpscSender outgoing_frames, - CallInitiator call_initiator); + auto SendCallBody(uint32_t stream_id, CallInitiator call_initiator); auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator); auto OnTransportActivityDone(absl::string_view activity); auto TransportReadLoop(RefCountedPtr transport); @@ -120,27 +118,23 @@ class ChaoticGoodServerTransport final : public ServerTransport { // Resolves to a StatusOr> auto ReadFrameBody(Slice read_buffer); void SendCancel(uint32_t stream_id, absl::Status why); - absl::Status NewStream(ChaoticGoodTransport& transport, - const FrameHeader& header, - SliceBuffer initial_metadata_payload); + absl::Status NewStream( + uint32_t stream_id, + ClientInitialMetadataFrame client_initial_metadata_frame); template - auto DispatchFrame(RefCountedPtr transport, - IncomingFrame frame); + auto DispatchFrame(IncomingFrame frame); auto PushFrameIntoCall(RefCountedPtr stream, MessageFrame frame); auto PushFrameIntoCall(RefCountedPtr stream, ClientEndOfStream frame); auto PushFrameIntoCall(RefCountedPtr stream, BeginMessageFrame frame); auto PushFrameIntoCall(RefCountedPtr stream, MessageChunkFrame frame); - auto SendFrame(Frame frame, MpscSender outgoing_frames, - CallInitiator call_initiator); - auto SendFrameAcked(Frame frame, MpscSender outgoing_frames, - CallInitiator call_initiator); RefCountedPtr call_destination_; const RefCountedPtr call_arena_allocator_; const std::shared_ptr event_engine_; InterActivityLatch got_acceptor_; - MpscReceiver outgoing_frames_; + MpscSender outgoing_frames_; + PipeReceiver incoming_frames_; Mutex mu_; // Map of stream incoming server frames, key is stream_id. StreamMap stream_map_ ABSL_GUARDED_BY(mu_); diff --git a/src/core/ext/transport/chaotic_good/tcp_frame_transport.cc b/src/core/ext/transport/chaotic_good/tcp_frame_transport.cc new file mode 100644 index 0000000000000..331448078bc10 --- /dev/null +++ b/src/core/ext/transport/chaotic_good/tcp_frame_transport.cc @@ -0,0 +1,216 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/ext/transport/chaotic_good/tcp_frame_transport.h" + +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/promise/if.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/try_seq.h" + +namespace grpc_core { +namespace chaotic_good { + +/////////////////////////////////////////////////////////////////////////////// +// TcpFrameHeader + +namespace { +void WriteLittleEndianUint32(uint32_t value, uint8_t* data) { + data[0] = static_cast(value); + data[1] = static_cast(value >> 8); + data[2] = static_cast(value >> 16); + data[3] = static_cast(value >> 24); +} + +uint32_t ReadLittleEndianUint32(const uint8_t* data) { + return static_cast(data[0]) | + (static_cast(data[1]) << 8) | + (static_cast(data[2]) << 16) | + (static_cast(data[3]) << 24); +} + +uint32_t DataConnectionPadding(uint32_t payload_length, uint32_t alignment) { + if (payload_length % alignment == 0) return 0; + return alignment - (payload_length % alignment); +} +} // namespace + +// Serializes a frame header into a buffer of 24 bytes. +void TcpFrameHeader::Serialize(uint8_t* data) const { + WriteLittleEndianUint32((static_cast(header.type) << 16) | + static_cast(payload_connection_id), + data); + WriteLittleEndianUint32(header.stream_id, data + 4); + WriteLittleEndianUint32(header.payload_length, data + 8); +} + +// Parses a frame header from a buffer of 24 bytes. All 24 bytes are consumed. +absl::StatusOr TcpFrameHeader::Parse(const uint8_t* data) { + TcpFrameHeader tcp_header; + const uint32_t type_and_conn_id = ReadLittleEndianUint32(data); + if (type_and_conn_id & 0xff000000u) { + return absl::InternalError("Non-zero reserved byte received"); + } + tcp_header.header.type = static_cast(type_and_conn_id >> 16); + tcp_header.payload_connection_id = type_and_conn_id & 0xffff; + tcp_header.header.stream_id = ReadLittleEndianUint32(data + 4); + tcp_header.header.payload_length = ReadLittleEndianUint32(data + 8); + return tcp_header; +} + +uint32_t TcpFrameHeader::Padding(uint32_t alignment) const { + if (payload_connection_id == 0) return 0; + return DataConnectionPadding(header.payload_length, alignment); +} + +/////////////////////////////////////////////////////////////////////////////// +// TcpFrameTransport + +auto TcpFrameTransport::WriteFrame(const FrameInterface& frame) { + FrameHeader header = frame.MakeHeader(); + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: WriteFrame to:" + << ResolvedAddressToString(control_endpoint_.GetPeerAddress()) + .value_or("<>") + << " " << frame.ToString(); + return If( + // If we have no data endpoints, OR this is a small payload + data_endpoints_.empty() || + header.payload_length <= options_.inlined_payload_size_threshold, + // ... then write it to the control endpoint + [this, &header, &frame]() { + SliceBuffer output; + TcpFrameHeader{header, 0}.Serialize( + output.AddTiny(TcpFrameHeader::kFrameHeaderSize)); + frame.SerializePayload(output); + return control_endpoint_.Write(std::move(output)); + }, + // ... otherwise write it to a data connection + [this, header, &frame]() mutable { + SliceBuffer payload; + const size_t padding = DataConnectionPadding(header.payload_length, + options_.encode_alignment); + frame.SerializePayload(payload); + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: Send " << payload.Length() + << "b payload on data channel; add " << padding << " bytes for " + << options_.encode_alignment << " alignment"; + if (padding != 0) { + auto slice = MutableSlice::CreateUninitialized(padding); + memset(slice.data(), 0, padding); + payload.AppendIndexed(Slice(std::move(slice))); + } + return Seq( + data_endpoints_.Write(std::move(payload)), + [this, header](uint32_t connection_id) mutable { + SliceBuffer header_frame; + TcpFrameHeader{header, connection_id + 1}.Serialize( + header_frame.AddTiny(TcpFrameHeader::kFrameHeaderSize)); + return control_endpoint_.Write(std::move(header_frame)); + }); + }); +} + +auto TcpFrameTransport::WriteLoop(MpscReceiver frames) { + return Loop([self = RefAsSubclass(), + frames = std::move(frames)]() mutable { + return TrySeq( + // Get next outgoing frame. + frames.Next(), + // Serialize and write it out. + [self = self.get()](Frame client_frame) { + return self->WriteFrame( + absl::ConvertVariantTo(client_frame)); + }, + []() -> LoopCtl { + // The write failures will be caught in TrySeq and exit + // loop. Therefore, only need to return Continue() in the + // last lambda function. + return Continue(); + }); + }); +} + +void TcpFrameTransport::StartWriting( + Party* party, MpscReceiver frames, + absl::AnyInvocable on_done) { + party->Spawn( + "tcp-write", + [self = RefAsSubclass(), + frames = std::move(frames)]() mutable { + return self->WriteLoop(std::move(frames)); + }, + std::move(on_done)); +} + +auto TcpFrameTransport::ReadFrameBytes() { + return TrySeq( + control_endpoint_.ReadSlice(TcpFrameHeader::kFrameHeaderSize), + [this](Slice read_buffer) { + auto frame_header = + TcpFrameHeader::Parse(reinterpret_cast( + GRPC_SLICE_START_PTR(read_buffer.c_slice()))); + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: ReadHeader from:" + << ResolvedAddressToString(control_endpoint_.GetPeerAddress()) + .value_or("<>") + << " " + << (frame_header.ok() ? frame_header->ToString() + : frame_header.status().ToString()); + return frame_header; + }, + [this](TcpFrameHeader frame_header) { + return If( + // If the payload is on the connection frame + frame_header.payload_connection_id == 0, + // ... then read the data immediately and return an IncomingFrame + // that contains the payload. + // We need to do this here so that we do not create head of line + // blocking issues reading later control frames (but waiting for a + // call to get scheduled time to read the payload). + [this, frame_header]() { + return Map( + control_endpoint_.Read(frame_header.header.payload_length), + [frame_header](absl::StatusOr payload) + -> absl::StatusOr { + if (!payload.ok()) return payload.status(); + return IncomingFrame(frame_header.header, + std::move(payload)); + }); + }, + // ... otherwise issue a read to the appropriate data endpoint, + // which will return a read ticket - which can be used later + // in the call promise to asynchronously wait for those bytes + // to be available. + [this, frame_header]() -> absl::StatusOr { + const auto padding = + frame_header.Padding(options_.decode_alignment); + return IncomingFrame( + frame_header.header, + Map(data_endpoints_ + .Read(frame_header.payload_connection_id - 1, + frame_header.header.payload_length + padding) + .Await(), + [padding](absl::StatusOr payload) + -> absl::StatusOr { + if (payload.ok()) payload->RemoveLastNBytes(padding); + return payload; + })); + }); + }); +} + +} // namespace chaotic_good +} // namespace grpc_core \ No newline at end of file diff --git a/src/core/ext/transport/chaotic_good/tcp_frame_transport.h b/src/core/ext/transport/chaotic_good/tcp_frame_transport.h index e5b8a1d94f541..74ae352d971df 100644 --- a/src/core/ext/transport/chaotic_good/tcp_frame_transport.h +++ b/src/core/ext/transport/chaotic_good/tcp_frame_transport.h @@ -17,20 +17,74 @@ #include -#include "pending_connection.h" +#include "frame_header.h" +#include "src/core/ext/transport/chaotic_good/control_endpoint.h" +#include "src/core/ext/transport/chaotic_good/data_endpoints.h" #include "src/core/ext/transport/chaotic_good/frame_transport.h" +#include "src/core/ext/transport/chaotic_good/pending_connection.h" namespace grpc_core { namespace chaotic_good { +struct TcpFrameHeader { + // Frame header size is fixed to 12 bytes. + enum { kFrameHeaderSize = 12 }; + + FrameHeader header; + uint32_t payload_connection_id = 0; + + // Parses a frame header from a buffer of 12 bytes. All 12 bytes are consumed. + static absl::StatusOr Parse(const uint8_t* data); + // Serializes a frame header into a buffer of 12 bytes. + void Serialize(uint8_t* data) const; + + // Report contents as a string + std::string ToString() const; + + bool operator==(const TcpFrameHeader& h) const { + return header == h.header && + payload_connection_id == h.payload_connection_id; + } + + // Required padding to maintain alignment. + uint32_t Padding(uint32_t alignment) const; +}; + +inline std::vector OneDataEndpoint(PromiseEndpoint endpoint) { + std::vector ep; + ep.emplace_back(std::move(endpoint)); + return ep; +} + class TcpFrameTransport final : public FrameTransport { public: - TcpFrameTransport(PromiseEndpoint control_endpoint, - std::vector pending_data_endpoints); + struct Options { + uint32_t encode_alignment = 64; + uint32_t decode_alignment = 64; + uint32_t inlined_payload_size_threshold = 8 * 1024; + }; + + TcpFrameTransport( + Options options, PromiseEndpoint control_endpoint, + std::vector pending_data_endpoints, + std::shared_ptr + event_engine); + + void StartReading(Party* party, PipeSender frames, + absl::AnyInvocable on_done) override; + void StartWriting(Party* party, MpscReceiver frames, + absl::AnyInvocable on_done) override; private: - Poll PollWrite(Frame& frame) override; - Poll> PollRead() override; + auto WriteFrame(const FrameInterface& frame); + auto WriteLoop(MpscReceiver frames); + // Read frame header and payloads for control and data portions of one frame. + // Resolves to StatusOr. + auto ReadFrameBytes(); + + ControlEndpoint control_endpoint_; + DataEndpoints data_endpoints_; + Options options_; }; } // namespace chaotic_good