Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 19, 2024
1 parent 1161a19 commit b7e31c2
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8274,6 +8274,7 @@ grpc_cc_library(
"match_promise",
"mpsc",
"pipe",
"inter_activity_pipe",
"//:promise",
],
)
Expand Down
5 changes: 3 additions & 2 deletions src/core/ext/transport/chaotic_good/frame_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ class IncomingFrame {

class FrameTransport : public RefCounted<FrameTransport> {
public:
using ReadFramePipe = InterActivityPipe<IncomingFrame, 8>;

// Spawn a read loop onto party - read frames from the wire, push them onto
// frames.
// TODO(ctiller): can likely use a buffered intra-party SpscSender here once
// we write one.
virtual void StartReading(Party* party,
InterActivityPipe<IncomingFrame>::Sender frames,
virtual void StartReading(Party* party, ReadFramePipe::Sender frames,
absl::AnyInvocable<void(absl::Status)> on_done) = 0;
// Spawn a write loop onto party - write frames from frames to the wire.
virtual void StartWriting(Party* party, MpscReceiver<Frame> frames,
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/chaotic_good/tcp_frame_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ auto TcpFrameTransport::ReadFrameBytes() {
}

void TcpFrameTransport::StartReading(
Party* party, MpscSender<IncomingFrame> frames,
Party* party, ReadFramePipe::Sender frames,
absl::AnyInvocable<void(absl::Status)> on_done) {
Crash("Not implemented");
}
Expand Down
3 changes: 1 addition & 2 deletions src/core/ext/transport/chaotic_good/tcp_frame_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ class TcpFrameTransport final : public FrameTransport {
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine);

void StartReading(Party* party,
InterActivityPipe<IncomingFrame>::Sender frames,
void StartReading(Party* party, ReadFramePipe::Sender frames,
absl::AnyInvocable<void(absl::Status)> on_done) override;
void StartWriting(Party* party, MpscReceiver<Frame> frames,
absl::AnyInvocable<void(absl::Status)> on_done) override;
Expand Down
2 changes: 2 additions & 0 deletions test/core/transport/chaotic_good/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ grpc_cc_test(

grpc_cc_library(
name = "mock_frame_transport",
testonly=1,
srcs = [
"mock_frame_transport.cc",
],
Expand All @@ -98,6 +99,7 @@ grpc_cc_library(
"//src/core:inter_activity_latch",
"//src/core:loop",
"//src/core:try_seq",
"//test/core/promise:poll_matcher",
],
)

Expand Down
6 changes: 4 additions & 2 deletions test/core/transport/chaotic_good/mock_frame_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/try_seq.h"
#include "gmock/gmock.h"
#include "test/core/promise/poll_matcher.h"

namespace grpc_core {
namespace chaotic_good {
Expand All @@ -37,7 +39,7 @@ MockFrameTransport::~MockFrameTransport() {
}

void MockFrameTransport::StartReading(
Party*, InterActivityPipe<IncomingFrame>::Sender frames,
Party*, ReadFramePipe::Sender frames,
absl::AnyInvocable<void(absl::Status)> on_done) {
reader_ = std::move(frames);
on_read_done_ = std::move(on_done);
Expand Down Expand Up @@ -79,7 +81,7 @@ void MockFrameTransport::Read(Frame frame) {
LOG(INFO) << "Read " << frame_interface.ToString();
auto header = frame_interface.MakeHeader();
frame_interface.SerializePayload(buffer);
reader_.UnbufferedImmediateSend(IncomingFrame(header, std::move(buffer)));
EXPECT_THAT(reader_.Push(IncomingFrame(header, std::move(buffer)))(), IsReady());
}

} // namespace testing
Expand Down
5 changes: 2 additions & 3 deletions test/core/transport/chaotic_good/mock_frame_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class MockFrameTransport final : public FrameTransport {
public:
~MockFrameTransport() override;

void StartReading(Party* party,
InterActivityPipe<IncomingFrame>::Sender frames,
void StartReading(Party* party, ReadFramePipe::Sender frames,
absl::AnyInvocable<void(absl::Status)> on_done) final;
void StartWriting(Party* party, MpscReceiver<Frame> frames,
absl::AnyInvocable<void(absl::Status)> on_done) final;
Expand All @@ -52,7 +51,7 @@ class MockFrameTransport final : public FrameTransport {
std::queue<ExpectedWrite> expected_writes_;
std::shared_ptr<InterActivityLatch<absl::Status>> end_writes_ =
std::make_shared<InterActivityLatch<absl::Status>>();
MpscSender<IncomingFrame> reader_;
ReadFramePipe::Sender reader_;
absl::AnyInvocable<void(absl::Status)> on_read_done_;
};

Expand Down

0 comments on commit b7e31c2

Please sign in to comment.