diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index 9276de08b554f..241b2627e07fc 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -94,6 +94,9 @@ struct ProtoTransportFrame final : public FrameInterface { return absl::StrCat(FrameTypeString(frame_type), "{", body.ShortDebugString(), "}"); } + bool operator==(const ProtoTransportFrame& other) const { + return body.ShortDebugString() == other.body.ShortDebugString(); + } Body body; }; @@ -118,6 +121,10 @@ struct ProtoStreamFrame final : public FrameInterface { return absl::StrCat(FrameTypeString(frame_type), "{@", stream_id, "; ", body.ShortDebugString(), "}"); } + bool operator==(const ProtoStreamFrame& other) const { + return body.ShortDebugString() == other.body.ShortDebugString() && + stream_id == other.stream_id; + } Body body; uint32_t stream_id; @@ -137,6 +144,9 @@ struct EmptyStreamFrame final : public FrameInterface { } void SerializePayload(SliceBuffer&) const override {} std::string ToString() const override { return FrameTypeString(frame_type); } + bool operator==(const EmptyStreamFrame& other) const { + return stream_id == other.stream_id; + } uint32_t stream_id; }; @@ -163,6 +173,11 @@ struct MessageFrame final : public FrameInterface { FrameHeader MakeHeader() const override; void SerializePayload(SliceBuffer& payload) const override; std::string ToString() const override; + bool operator==(const MessageFrame& other) const { + return stream_id == other.stream_id && + message->payload()->JoinIntoString() == + other.message->payload()->JoinIntoString(); + } uint32_t stream_id; MessageHandle message; @@ -174,6 +189,10 @@ struct MessageChunkFrame final : public FrameInterface { FrameHeader MakeHeader() const override; void SerializePayload(SliceBuffer& payload) const override; std::string ToString() const override; + bool operator==(const MessageChunkFrame& other) const { + return stream_id == other.stream_id && + payload.JoinIntoString() == other.payload.JoinIntoString(); + } uint32_t stream_id; SliceBuffer payload; diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index 1d2d4bdcfa515..f5671eaf252e4 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -81,6 +81,21 @@ grpc_cc_test( ], ) +grpc_cc_library( + name = "mock_frame_transport", + srcs = [ + "mock_frame_transport.cc", + ], + hdrs = [ + "mock_frame_transport.h", + ], + external_deps = [ + "gtest", + "protobuf_headers", + ], + deps = ["//src/core:chaotic_good_frame_transport"], +) + grpc_proto_fuzzer( name = "tcp_frame_fuzzer", srcs = ["tcp_frame_fuzzer.cc"], @@ -105,12 +120,12 @@ grpc_proto_fuzzer( "//src/core:arena", "//src/core:chaotic_good_frame", "//src/core:chaotic_good_frame_header", + "//src/core:chaotic_good_tcp_frame_transport", "//src/core:event_engine_memory_allocator", "//src/core:memory_quota", "//src/core:resource_quota", "//src/core:slice", "//src/core:slice_buffer", - "//src/core:chaotic_good_tcp_frame_transport", "//test/core/promise:test_context", ], ) @@ -195,21 +210,23 @@ grpc_cc_test( uses_event_engine = False, uses_polling = False, deps = [ + "mock_frame_transport", "transport_test", "//:grpc", "//:grpc_public_hdrs", "//:iomgr_timer", "//:ref_counted_ptr", "//src/core:arena", + "//src/core:chaotic_good_frame_cc_proto", "//src/core:chaotic_good_server_transport", "//src/core:memory_quota", + "//src/core:metadata_batch", "//src/core:resource_quota", "//src/core:seq", "//src/core:slice", "//src/core:slice_buffer", "//test/core/event_engine/fuzzing_event_engine", "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_cc_proto", - "//test/core/transport/util:mock_promise_endpoint", ], ) diff --git a/test/core/transport/chaotic_good/mock_frame_transport.cc b/test/core/transport/chaotic_good/mock_frame_transport.cc new file mode 100644 index 0000000000000..e70b16285f9f7 --- /dev/null +++ b/test/core/transport/chaotic_good/mock_frame_transport.cc @@ -0,0 +1,83 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/chaotic_good/mock_frame_transport.h" + +#include "gtest/gtest.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/try_seq.h" + +namespace grpc_core { +namespace chaotic_good { +namespace testing { + +MockFrameTransport::~MockFrameTransport() { + while (!expected_writes_.empty()) { + auto& w = expected_writes_.front(); + ADD_FAILURE_AT(w.whence.file(), w.whence.line()) + << "Expected write of " + << absl::ConvertVariantTo(w.frame).ToString(); + expected_writes_.pop(); + } + if (on_read_done_ != nullptr) { + on_read_done_(absl::OkStatus()); + } +} + +void MockFrameTransport::StartReading( + Party*, MpscSender frames, + absl::AnyInvocable on_done) { + reader_ = std::move(frames); + on_read_done_ = std::move(on_done); +} + +void MockFrameTransport::StartWriting( + Party* party, MpscReceiver frames, + absl::AnyInvocable on_done) { + party->Spawn( + "MockFrameTransport_Writer", + [this, frames = std::move(frames)]() mutable { + return Loop([this, frames = std::move(frames)]() mutable { + return TrySeq( + frames.Next(), [this](Frame frame) -> LoopCtl { + if (expected_writes_.empty()) { + ADD_FAILURE() + << "Unexpected write of " + << absl::ConvertVariantTo(frame) + .ToString(); + return Continue{}; + } + auto expected = std::move(expected_writes_.front()); + expected_writes_.pop(); + EXPECT_EQ(expected.frame, frame) + << " from " << expected.whence.file() << ":" + << expected.whence.line(); + return Continue{}; + }); + }); + }, + std::move(on_done)); +} + +void MockFrameTransport::Read(Frame frame) { + SliceBuffer buffer; + auto& frame_interface = absl::ConvertVariantTo(frame); + frame_interface.SerializePayload(buffer); + reader_.UnbufferedImmediateSend( + IncomingFrame(frame_interface.MakeHeader(), std::move(buffer))); +} + +} // namespace testing +} // namespace chaotic_good +} // namespace grpc_core diff --git a/test/core/transport/chaotic_good/mock_frame_transport.h b/test/core/transport/chaotic_good/mock_frame_transport.h new file mode 100644 index 0000000000000..51ea398adf25e --- /dev/null +++ b/test/core/transport/chaotic_good/mock_frame_transport.h @@ -0,0 +1,81 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_TRANSPORT_CHAOTIC_GOOD_MOCK_FRAME_TRANSPORT_H +#define GRPC_TEST_CORE_TRANSPORT_CHAOTIC_GOOD_MOCK_FRAME_TRANSPORT_H + +#include + +#include + +#include "src/core/ext/transport/chaotic_good/frame_transport.h" + +namespace grpc_core { +namespace chaotic_good { +namespace testing { + +class MockFrameTransport final : public FrameTransport { + public: + ~MockFrameTransport() override; + + void StartReading(Party* party, MpscSender frames, + absl::AnyInvocable on_done) final; + void StartWriting(Party* party, MpscReceiver frames, + absl::AnyInvocable on_done) final; + + void ExpectWrite(Frame frame, SourceLocation whence = {}) { + expected_writes_.emplace(std::move(frame), whence); + } + void Read(Frame frame); + + private: + struct ExpectedWrite { + ExpectedWrite(Frame frame, SourceLocation whence) + : frame(std::move(frame)), whence(whence) {} + Frame frame; + SourceLocation whence; + }; + std::queue expected_writes_; + MpscSender reader_; + absl::AnyInvocable on_read_done_; +}; + +template +Frame MakeProtoFrame(std::string body) { + T frame; + CHECK(google::protobuf::TextFormat::ParseFromString(body, &frame.body)); + return frame; +} + +template +Frame MakeProtoFrame(uint32_t stream_id, std::string body) { + T frame; + CHECK(google::protobuf::TextFormat::ParseFromString(body, &frame.body)); + frame.stream_id = stream_id; + return frame; +} + +inline Frame MakeMessageFrame(uint32_t stream_id, absl::string_view payload) { + MessageFrame frame; + frame.message = Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString(payload)), 0); + frame.stream_id = stream_id; + return frame; +} + +} // namespace testing +} // namespace chaotic_good +} // namespace grpc_core + +#endif diff --git a/test/core/transport/chaotic_good/server_transport_test.cc b/test/core/transport/chaotic_good/server_transport_test.cc index 77c1cff09e8d2..fdb7379efce0c 100644 --- a/test/core/transport/chaotic_good/server_transport_test.cc +++ b/test/core/transport/chaotic_good/server_transport_test.cc @@ -45,8 +45,8 @@ #include "src/core/util/ref_counted_ptr.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" +#include "test/core/transport/chaotic_good/mock_frame_transport.h" #include "test/core/transport/chaotic_good/transport_test.h" -#include "test/core/transport/util/mock_promise_endpoint.h" using testing::_; using testing::MockFunction; @@ -109,45 +109,13 @@ class MockServerConnectionFactory : public ServerConnectionFactory { }; TEST_F(TransportTest, ReadAndWriteOneMessage) { - MockPromiseEndpoint control_endpoint(1); - MockPromiseEndpoint data_endpoint(2); - auto server_connection_factory = - MakeRefCounted>(); + MockFrameTransport frame_transport; auto call_destination = MakeRefCounted>(); EXPECT_CALL(*call_destination, Orphaned()).Times(1); auto channel_args = MakeChannelArgs(event_engine()); auto transport = MakeOrphanable( - channel_args, std::move(control_endpoint.promise_endpoint), - MakeConfig(channel_args, std::move(data_endpoint.promise_endpoint)), - server_connection_factory); - const auto server_initial_metadata = - EncodeProto("message: 'hello'"); - const auto server_trailing_metadata = - EncodeProto("status: 0"); - const auto client_initial_metadata = - EncodeProto( - "path: '/demo.Service/Step'"); - // Once we set the acceptor, expect to read some frames. - // We'll return a new request with a payload of "12345678". - control_endpoint.ExpectRead( - {SerializedFrameHeader(FrameType::kClientInitialMetadata, 0, 1, - client_initial_metadata.length()), - client_initial_metadata.Copy(), - SerializedFrameHeader(FrameType::kMessage, 0, 1, 8), - EventEngineSlice::FromCopiedString("12345678"), - SerializedFrameHeader(FrameType::kClientEndOfStream, 0, 1, 0)}, - event_engine().get()); - // Once that's read we'll create a new call + channel_args, frame_transport, MessageChunker(0, 1)); StrictMock> on_done; - auto control_address = - grpc_event_engine::experimental::URIToResolvedAddress("ipv4:1.2.3.4:5678") - .value(); - EXPECT_CALL(*control_endpoint.endpoint, GetPeerAddress) - .WillRepeatedly( - [&control_address]() -> const grpc_event_engine::experimental:: - EventEngine::ResolvedAddress& { - return control_address; - }); EXPECT_CALL(*call_destination, StartCall(_)) .WillOnce(WithArgs<0>([&on_done]( UnstartedCallHandler unstarted_call_handler) { @@ -191,24 +159,18 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) { }); })); transport->SetCallDestination(call_destination); + frame_transport.ExpectWrite( + MakeProtoFrame(1, "")); + frame_transport.ExpectWrite(MakeMessageFrame(1, "87654321")); + frame_transport.ExpectWrite( + MakeProtoFrame(1, "")); + frame_transport.Read(MakeProtoFrame( + 1, "path: '/demo.Service/Step'")); + frame_transport.Read(MakeMessageFrame(1, "12345678")); + frame_transport.Read(ClientEndOfStream()); EXPECT_CALL(on_done, Call()); - EXPECT_CALL(*control_endpoint.endpoint, Read) - .InSequence(control_endpoint.read_sequence) - .WillOnce(Return(false)); - control_endpoint.ExpectWrite( - {SerializedFrameHeader(FrameType::kServerInitialMetadata, 0, 1, - server_initial_metadata.length()), - server_initial_metadata.Copy(), - SerializedFrameHeader(FrameType::kMessage, 0, 1, 8), - EventEngineSlice::FromCopiedString("87654321"), - SerializedFrameHeader(FrameType::kServerTrailingMetadata, 0, 1, - server_trailing_metadata.length()), - server_trailing_metadata.Copy()}, - nullptr); // Wait until ClientTransport's internal activities to finish. event_engine()->TickUntilIdle(); - ::testing::Mock::VerifyAndClearExpectations(control_endpoint.endpoint); - ::testing::Mock::VerifyAndClearExpectations(data_endpoint.endpoint); event_engine()->UnsetGlobalHooks(); } diff --git a/test/core/transport/chaotic_good/tcp_frame_transport_test.cc b/test/core/transport/chaotic_good/tcp_frame_transport_test.cc new file mode 100644 index 0000000000000..bd95da09fe1d5 --- /dev/null +++ b/test/core/transport/chaotic_good/tcp_frame_transport_test.cc @@ -0,0 +1,50 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/chaotic_good/transport_test.h" + +namespace grpc_core { +namespace chaotic_good { +namespace testing { + +grpc_event_engine::experimental::Slice SerializedFrameHeader( + FrameType type, uint16_t payload_connection_id, uint32_t stream_id, + uint32_t payload_length) { + uint8_t buffer[FrameHeader::kFrameHeaderSize] = { + static_cast(payload_connection_id), + static_cast(payload_connection_id >> 16), + static_cast(type), + 0, + static_cast(stream_id), + static_cast(stream_id >> 8), + static_cast(stream_id >> 16), + static_cast(stream_id >> 24), + static_cast(payload_length), + static_cast(payload_length >> 8), + static_cast(payload_length >> 16), + static_cast(payload_length >> 24), + }; + return grpc_event_engine::experimental::Slice::FromCopiedBuffer( + buffer, sizeof(buffer)); +} + +grpc_event_engine::experimental::Slice Zeros(uint32_t length) { + std::string zeros(length, 0); + return grpc_event_engine::experimental::Slice::FromCopiedBuffer(zeros.data(), + length); +} + +} // namespace testing +} // namespace chaotic_good +} // namespace grpc_core diff --git a/test/core/transport/chaotic_good/transport_test.cc b/test/core/transport/chaotic_good/transport_test.cc index 2a5522b2cc963..a7d1caf7a19ba 100644 --- a/test/core/transport/chaotic_good/transport_test.cc +++ b/test/core/transport/chaotic_good/transport_test.cc @@ -18,33 +18,6 @@ namespace grpc_core { namespace chaotic_good { namespace testing { -grpc_event_engine::experimental::Slice SerializedFrameHeader( - FrameType type, uint16_t payload_connection_id, uint32_t stream_id, - uint32_t payload_length) { - uint8_t buffer[FrameHeader::kFrameHeaderSize] = { - static_cast(payload_connection_id), - static_cast(payload_connection_id >> 16), - static_cast(type), - 0, - static_cast(stream_id), - static_cast(stream_id >> 8), - static_cast(stream_id >> 16), - static_cast(stream_id >> 24), - static_cast(payload_length), - static_cast(payload_length >> 8), - static_cast(payload_length >> 16), - static_cast(payload_length >> 24), - }; - return grpc_event_engine::experimental::Slice::FromCopiedBuffer( - buffer, sizeof(buffer)); -} - -grpc_event_engine::experimental::Slice Zeros(uint32_t length) { - std::string zeros(length, 0); - return grpc_event_engine::experimental::Slice::FromCopiedBuffer(zeros.data(), - length); -} - } // namespace testing } // namespace chaotic_good } // namespace grpc_core