Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 21, 2024
1 parent 8554dc0 commit bcf787a
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 535 deletions.
86 changes: 42 additions & 44 deletions src/core/ext/transport/chaotic_good/client_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "frame_transport.h"
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/lib/event_engine/event_engine_context.h"
Expand Down Expand Up @@ -150,40 +151,37 @@ auto ChaoticGoodClientTransport::DispatchFrame(IncomingFrame incoming_frame) {
[]() { return absl::OkStatus(); }));
}

auto ChaoticGoodClientTransport::TransportReadLoop() {
return Loop([this] {
return TrySeq(
incoming_frames_.Next(),
[this](IncomingFrame incoming_frame) mutable {
return Switch(
incoming_frame.header().type,
Case<FrameType, FrameType::kServerInitialMetadata>([&, this]() {
return DispatchFrame<ServerInitialMetadataFrame>(
std::move(incoming_frame));
}),
Case<FrameType, FrameType::kServerTrailingMetadata>([&, this]() {
return DispatchFrame<ServerTrailingMetadataFrame>(
std::move(incoming_frame));
}),
Case<FrameType, FrameType::kMessage>([&, this]() {
return DispatchFrame<MessageFrame>(std::move(incoming_frame));
}),
Case<FrameType, FrameType::kBeginMessage>([&, this]() {
return DispatchFrame<BeginMessageFrame>(
std::move(incoming_frame));
}),
Case<FrameType, FrameType::kMessageChunk>([&, this]() {
return DispatchFrame<MessageChunkFrame>(
std::move(incoming_frame));
}),
Default([&]() {
LOG_EVERY_N_SEC(INFO, 10)
<< "Bad frame type: " << incoming_frame.header().ToString();
return absl::OkStatus();
}));
},
[]() -> LoopCtl<absl::Status> { return Continue{}; });
});
auto ChaoticGoodClientTransport::TransportReadLoop(
FrameTransport::ReadFramePipe::Receiver incoming_frames) {
return ForEach(
std::move(incoming_frames), [this](IncomingFrame incoming_frame) mutable {
return Switch(
incoming_frame.header().type,
Case<FrameType, FrameType::kServerInitialMetadata>([&, this]() {
return DispatchFrame<ServerInitialMetadataFrame>(
std::move(incoming_frame));
}),
Case<FrameType, FrameType::kServerTrailingMetadata>([&, this]() {
return DispatchFrame<ServerTrailingMetadataFrame>(
std::move(incoming_frame));
}),
Case<FrameType, FrameType::kMessage>([&, this]() {
return DispatchFrame<MessageFrame>(std::move(incoming_frame));
}),
Case<FrameType, FrameType::kBeginMessage>([&, this]() {
return DispatchFrame<BeginMessageFrame>(
std::move(incoming_frame));
}),
Case<FrameType, FrameType::kMessageChunk>([&, this]() {
return DispatchFrame<MessageChunkFrame>(
std::move(incoming_frame));
}),
Default([&]() {
LOG_EVERY_N_SEC(INFO, 10)
<< "Bad frame type: " << incoming_frame.header().ToString();
return absl::OkStatus();
}));
});
}

auto ChaoticGoodClientTransport::OnTransportActivityDone(
Expand All @@ -200,34 +198,34 @@ auto ChaoticGoodClientTransport::OnTransportActivityDone(
ChaoticGoodClientTransport::ChaoticGoodClientTransport(
const ChannelArgs& args, FrameTransport& frame_transport,
MessageChunker message_chunker)
: event_engine_( args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
allocator_(args.GetObject<ResourceQuota>()
: event_engine_(
args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
allocator_(args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryAllocator("chaotic-good")),
incoming_frames_(8),
message_chunker_(message_chunker) {
FrameTransport::ReadFramePipe incoming_frames;
auto party_arena = SimpleArenaAllocator(0)->MakeArena();
party_arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine_.get());
party_ = Party::Make(std::move(party_arena));
MpscReceiver<Frame> outgoing_frames{8};
outgoing_frames_ = outgoing_frames.MakeSender();
frame_transport.StartReading(party_.get(), incoming_frames_.MakeSender(),
frame_transport.StartReading(party_.get(), std::move(incoming_frames.sender),
OnTransportActivityDone("frame_read_loop"));
frame_transport.StartWriting(party_.get(), std::move(outgoing_frames),
OnTransportActivityDone("frame_write_loop"));
party_->Spawn(
"client-chaotic-reader",
GRPC_LATENT_SEE_PROMISE("ClientTransportReadLoop", TransportReadLoop()),
OnTransportActivityDone("read_loop"));
party_->Spawn("client-chaotic-reader",
GRPC_LATENT_SEE_PROMISE(
"ClientTransportReadLoop",
TransportReadLoop(std::move(incoming_frames.receiver))),
OnTransportActivityDone("read_loop"));
}

ChaoticGoodClientTransport::~ChaoticGoodClientTransport() { party_.reset(); }

void ChaoticGoodClientTransport::AbortWithError() {
// Mark transport as unavailable when the endpoint write/read failed.
// Close all the available pipes.
incoming_frames_.MarkClosed();
ReleasableMutexLock lock(&mu_);
StreamMap stream_map = std::move(stream_map_);
stream_map_.clear();
Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/transport/chaotic_good/client_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class ChaoticGoodClientTransport final : public ClientTransport {
auto OnTransportActivityDone(absl::string_view what);
template <typename T>
auto DispatchFrame(IncomingFrame incoming_frame);
auto TransportReadLoop();
auto TransportReadLoop(
FrameTransport::ReadFramePipe::Receiver incoming_frames);
// Push one frame into a call
auto PushFrameIntoCall(ServerInitialMetadataFrame frame,
RefCountedPtr<Stream> stream);
Expand All @@ -110,7 +111,6 @@ class ChaoticGoodClientTransport final : public ClientTransport {
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
grpc_event_engine::experimental::MemoryAllocator allocator_;
MpscSender<Frame> outgoing_frames_;
MpscReceiver<IncomingFrame> incoming_frames_{8};
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Map of stream incoming server frames, key is stream_id.
Expand Down
18 changes: 15 additions & 3 deletions src/core/lib/promise/inter_activity_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "absl/types/optional.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/util/manual_constructor.h"
#include "src/core/util/ref_counted.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/sync.h"
Expand Down Expand Up @@ -54,14 +55,22 @@ class InterActivityPipe {
private:
class Center : public RefCounted<Center, NonPolymorphicRefCount> {
public:
~Center() {
while (count_ > 0) {
queue_[first_].Destroy();
first_ = (first_ + 1) % kQueueSize;
--count_;
}
}

Poll<bool> Push(T& value) {
ReleasableMutexLock lock(&mu_);
if (closed_) return false;
if (count_ == kQueueSize) {
on_available_ = GetContext<Activity>()->MakeNonOwningWaker();
return Pending{};
}
queue_[(first_ + count_) % kQueueSize] = std::move(value);
queue_[(first_ + count_) % kQueueSize].Init(std::move(value));
++count_;
if (count_ == 1) {
auto on_occupied = std::move(on_occupied_);
Expand All @@ -78,7 +87,8 @@ class InterActivityPipe {
on_occupied_ = GetContext<Activity>()->MakeNonOwningWaker();
return Pending{};
}
auto value = std::move(queue_[first_]);
auto value = std::move(*queue_[first_]);
queue_[first_].Destroy();
first_ = (first_ + 1) % kQueueSize;
--count_;
if (count_ == kQueueSize - 1) {
Expand Down Expand Up @@ -106,7 +116,7 @@ class InterActivityPipe {

private:
Mutex mu_;
std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
std::array<ManualConstructor<T>, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
bool closed_ ABSL_GUARDED_BY(mu_) = false;
uint8_t first_ ABSL_GUARDED_BY(mu_) = 0;
uint8_t count_ ABSL_GUARDED_BY(mu_) = 0;
Expand All @@ -120,6 +130,7 @@ class InterActivityPipe {
public:
explicit Sender(RefCountedPtr<Center> center)
: center_(std::move(center)) {}
Sender() = default;
Sender(const Sender&) = delete;
Sender& operator=(const Sender&) = delete;
Sender(Sender&&) noexcept = default;
Expand Down Expand Up @@ -149,6 +160,7 @@ class InterActivityPipe {
public:
explicit Receiver(RefCountedPtr<Center> center)
: center_(std::move(center)) {}
Receiver() = default;
Receiver(const Receiver&) = delete;
Receiver& operator=(const Receiver&) = delete;
Receiver(Receiver&&) noexcept = default;
Expand Down
39 changes: 0 additions & 39 deletions test/core/transport/chaotic_good/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -165,45 +165,6 @@ grpc_cc_test(
],
)

grpc_cc_test(
name = "client_transport_error_test",
srcs = ["client_transport_error_test.cc"],
external_deps = [
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings:str_format",
"absl/types:optional",
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"mock_frame_transport",
"//:grpc_public_hdrs",
"//:grpc_unsecure",
"//:iomgr_timer",
"//:ref_counted_ptr",
"//src/core:activity",
"//src/core:arena",
"//src/core:chaotic_good_client_transport",
"//src/core:event_engine_wakeup_scheduler",
"//src/core:grpc_promise_endpoint",
"//src/core:if",
"//src/core:join",
"//src/core:loop",
"//src/core:memory_quota",
"//src/core:pipe",
"//src/core:resource_quota",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_cc_proto",
],
)

grpc_cc_test(
name = "server_transport_test",
srcs = ["server_transport_test.cc"],
Expand Down
Loading

0 comments on commit bcf787a

Please sign in to comment.