diff --git a/src/core/BUILD b/src/core/BUILD index 506cd7cfde07e..85f82c3e65fb6 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -8274,6 +8274,7 @@ grpc_cc_library( "match_promise", "mpsc", "pipe", + "inter_activity_pipe", "//:promise", ], ) diff --git a/src/core/ext/transport/chaotic_good/frame_transport.h b/src/core/ext/transport/chaotic_good/frame_transport.h index c3dc36b869462..ba96c3dadf5ed 100644 --- a/src/core/ext/transport/chaotic_good/frame_transport.h +++ b/src/core/ext/transport/chaotic_good/frame_transport.h @@ -66,12 +66,13 @@ class IncomingFrame { class FrameTransport : public RefCounted { public: + using ReadFramePipe = InterActivityPipe; + // Spawn a read loop onto party - read frames from the wire, push them onto // frames. // TODO(ctiller): can likely use a buffered intra-party SpscSender here once // we write one. - virtual void StartReading(Party* party, - InterActivityPipe::Sender frames, + virtual void StartReading(Party* party, ReadFramePipe::Sender frames, absl::AnyInvocable on_done) = 0; // Spawn a write loop onto party - write frames from frames to the wire. virtual void StartWriting(Party* party, MpscReceiver frames, diff --git a/src/core/ext/transport/chaotic_good/tcp_frame_transport.cc b/src/core/ext/transport/chaotic_good/tcp_frame_transport.cc index cc318f990d2df..8f559094efa38 100644 --- a/src/core/ext/transport/chaotic_good/tcp_frame_transport.cc +++ b/src/core/ext/transport/chaotic_good/tcp_frame_transport.cc @@ -227,7 +227,7 @@ auto TcpFrameTransport::ReadFrameBytes() { } void TcpFrameTransport::StartReading( - Party* party, MpscSender frames, + Party* party, ReadFramePipe::Sender frames, absl::AnyInvocable on_done) { Crash("Not implemented"); } diff --git a/src/core/ext/transport/chaotic_good/tcp_frame_transport.h b/src/core/ext/transport/chaotic_good/tcp_frame_transport.h index f0afeaf638329..f092e7e6c750d 100644 --- a/src/core/ext/transport/chaotic_good/tcp_frame_transport.h +++ b/src/core/ext/transport/chaotic_good/tcp_frame_transport.h @@ -71,8 +71,7 @@ class TcpFrameTransport final : public FrameTransport { std::shared_ptr event_engine); - void StartReading(Party* party, - InterActivityPipe::Sender frames, + void StartReading(Party* party, ReadFramePipe::Sender frames, absl::AnyInvocable on_done) override; void StartWriting(Party* party, MpscReceiver frames, absl::AnyInvocable on_done) override; diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index 5d158fb815456..b6f86616afdbc 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -83,6 +83,7 @@ grpc_cc_test( grpc_cc_library( name = "mock_frame_transport", + testonly=1, srcs = [ "mock_frame_transport.cc", ], @@ -98,6 +99,7 @@ grpc_cc_library( "//src/core:inter_activity_latch", "//src/core:loop", "//src/core:try_seq", + "//test/core/promise:poll_matcher", ], ) diff --git a/test/core/transport/chaotic_good/mock_frame_transport.cc b/test/core/transport/chaotic_good/mock_frame_transport.cc index b76251d9efea4..7af5b985f04d9 100644 --- a/test/core/transport/chaotic_good/mock_frame_transport.cc +++ b/test/core/transport/chaotic_good/mock_frame_transport.cc @@ -18,6 +18,8 @@ #include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/race.h" #include "src/core/lib/promise/try_seq.h" +#include "gmock/gmock.h" +#include "test/core/promise/poll_matcher.h" namespace grpc_core { namespace chaotic_good { @@ -37,7 +39,7 @@ MockFrameTransport::~MockFrameTransport() { } void MockFrameTransport::StartReading( - Party*, InterActivityPipe::Sender frames, + Party*, ReadFramePipe::Sender frames, absl::AnyInvocable on_done) { reader_ = std::move(frames); on_read_done_ = std::move(on_done); @@ -79,7 +81,7 @@ void MockFrameTransport::Read(Frame frame) { LOG(INFO) << "Read " << frame_interface.ToString(); auto header = frame_interface.MakeHeader(); frame_interface.SerializePayload(buffer); - reader_.UnbufferedImmediateSend(IncomingFrame(header, std::move(buffer))); + EXPECT_THAT(reader_.Push(IncomingFrame(header, std::move(buffer)))(), IsReady()); } } // namespace testing diff --git a/test/core/transport/chaotic_good/mock_frame_transport.h b/test/core/transport/chaotic_good/mock_frame_transport.h index 348c0d966a629..f063cca1b47c0 100644 --- a/test/core/transport/chaotic_good/mock_frame_transport.h +++ b/test/core/transport/chaotic_good/mock_frame_transport.h @@ -30,8 +30,7 @@ class MockFrameTransport final : public FrameTransport { public: ~MockFrameTransport() override; - void StartReading(Party* party, - InterActivityPipe::Sender frames, + void StartReading(Party* party, ReadFramePipe::Sender frames, absl::AnyInvocable on_done) final; void StartWriting(Party* party, MpscReceiver frames, absl::AnyInvocable on_done) final; @@ -52,7 +51,7 @@ class MockFrameTransport final : public FrameTransport { std::queue expected_writes_; std::shared_ptr> end_writes_ = std::make_shared>(); - MpscSender reader_; +ReadFramePipe::Sender reader_; absl::AnyInvocable on_read_done_; };