Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 19, 2024
1 parent 4d159a8 commit 5a8a1e7
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 79 deletions.
19 changes: 19 additions & 0 deletions src/core/ext/transport/chaotic_good/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ struct ProtoTransportFrame final : public FrameInterface {
return absl::StrCat(FrameTypeString(frame_type), "{",
body.ShortDebugString(), "}");
}
bool operator==(const ProtoTransportFrame& other) const {
return body.ShortDebugString() == other.body.ShortDebugString();
}

Body body;
};
Expand All @@ -118,6 +121,10 @@ struct ProtoStreamFrame final : public FrameInterface {
return absl::StrCat(FrameTypeString(frame_type), "{@", stream_id, "; ",
body.ShortDebugString(), "}");
}
bool operator==(const ProtoStreamFrame& other) const {
return body.ShortDebugString() == other.body.ShortDebugString() &&
stream_id == other.stream_id;
}

Body body;
uint32_t stream_id;
Expand All @@ -137,6 +144,9 @@ struct EmptyStreamFrame final : public FrameInterface {
}
void SerializePayload(SliceBuffer&) const override {}
std::string ToString() const override { return FrameTypeString(frame_type); }
bool operator==(const EmptyStreamFrame& other) const {
return stream_id == other.stream_id;
}

uint32_t stream_id;
};
Expand All @@ -163,6 +173,11 @@ struct MessageFrame final : public FrameInterface {
FrameHeader MakeHeader() const override;
void SerializePayload(SliceBuffer& payload) const override;
std::string ToString() const override;
bool operator==(const MessageFrame& other) const {
return stream_id == other.stream_id &&
message->payload()->JoinIntoString() ==
other.message->payload()->JoinIntoString();
}

uint32_t stream_id;
MessageHandle message;
Expand All @@ -174,6 +189,10 @@ struct MessageChunkFrame final : public FrameInterface {
FrameHeader MakeHeader() const override;
void SerializePayload(SliceBuffer& payload) const override;
std::string ToString() const override;
bool operator==(const MessageChunkFrame& other) const {
return stream_id == other.stream_id &&
payload.JoinIntoString() == other.payload.JoinIntoString();
}

uint32_t stream_id;
SliceBuffer payload;
Expand Down
21 changes: 19 additions & 2 deletions test/core/transport/chaotic_good/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ grpc_cc_test(
],
)

grpc_cc_library(
name = "mock_frame_transport",
srcs = [
"mock_frame_transport.cc",
],
hdrs = [
"mock_frame_transport.h",
],
external_deps = [
"gtest",
"protobuf_headers",
],
deps = ["//src/core:chaotic_good_frame_transport"],
)

grpc_proto_fuzzer(
name = "tcp_frame_fuzzer",
srcs = ["tcp_frame_fuzzer.cc"],
Expand All @@ -105,12 +120,12 @@ grpc_proto_fuzzer(
"//src/core:arena",
"//src/core:chaotic_good_frame",
"//src/core:chaotic_good_frame_header",
"//src/core:chaotic_good_tcp_frame_transport",
"//src/core:event_engine_memory_allocator",
"//src/core:memory_quota",
"//src/core:resource_quota",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:chaotic_good_tcp_frame_transport",
"//test/core/promise:test_context",
],
)
Expand Down Expand Up @@ -195,21 +210,23 @@ grpc_cc_test(
uses_event_engine = False,
uses_polling = False,
deps = [
"mock_frame_transport",
"transport_test",
"//:grpc",
"//:grpc_public_hdrs",
"//:iomgr_timer",
"//:ref_counted_ptr",
"//src/core:arena",
"//src/core:chaotic_good_frame_cc_proto",
"//src/core:chaotic_good_server_transport",
"//src/core:memory_quota",
"//src/core:metadata_batch",
"//src/core:resource_quota",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_cc_proto",
"//test/core/transport/util:mock_promise_endpoint",
],
)

Expand Down
83 changes: 83 additions & 0 deletions test/core/transport/chaotic_good/mock_frame_transport.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "test/core/transport/chaotic_good/mock_frame_transport.h"

#include "gtest/gtest.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/try_seq.h"

namespace grpc_core {
namespace chaotic_good {
namespace testing {

MockFrameTransport::~MockFrameTransport() {
while (!expected_writes_.empty()) {
auto& w = expected_writes_.front();
ADD_FAILURE_AT(w.whence.file(), w.whence.line())
<< "Expected write of "
<< absl::ConvertVariantTo<FrameInterface&>(w.frame).ToString();
expected_writes_.pop();
}
if (on_read_done_ != nullptr) {
on_read_done_(absl::OkStatus());
}
}

void MockFrameTransport::StartReading(
Party*, MpscSender<IncomingFrame> frames,
absl::AnyInvocable<void(absl::Status)> on_done) {
reader_ = std::move(frames);
on_read_done_ = std::move(on_done);
}

void MockFrameTransport::StartWriting(
Party* party, MpscReceiver<Frame> frames,
absl::AnyInvocable<void(absl::Status)> on_done) {
party->Spawn(
"MockFrameTransport_Writer",
[this, frames = std::move(frames)]() mutable {
return Loop([this, frames = std::move(frames)]() mutable {
return TrySeq(
frames.Next(), [this](Frame frame) -> LoopCtl<absl::Status> {
if (expected_writes_.empty()) {
ADD_FAILURE()
<< "Unexpected write of "
<< absl::ConvertVariantTo<FrameInterface&>(frame)
.ToString();
return Continue{};
}
auto expected = std::move(expected_writes_.front());
expected_writes_.pop();
EXPECT_EQ(expected.frame, frame)
<< " from " << expected.whence.file() << ":"
<< expected.whence.line();
return Continue{};
});
});
},
std::move(on_done));
}

void MockFrameTransport::Read(Frame frame) {
SliceBuffer buffer;
auto& frame_interface = absl::ConvertVariantTo<FrameInterface&>(frame);
frame_interface.SerializePayload(buffer);
reader_.UnbufferedImmediateSend(
IncomingFrame(frame_interface.MakeHeader(), std::move(buffer)));
}

} // namespace testing
} // namespace chaotic_good
} // namespace grpc_core
81 changes: 81 additions & 0 deletions test/core/transport/chaotic_good/mock_frame_transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GRPC_TEST_CORE_TRANSPORT_CHAOTIC_GOOD_MOCK_FRAME_TRANSPORT_H
#define GRPC_TEST_CORE_TRANSPORT_CHAOTIC_GOOD_MOCK_FRAME_TRANSPORT_H

#include <google/protobuf/text_format.h>

#include <queue>

#include "src/core/ext/transport/chaotic_good/frame_transport.h"

namespace grpc_core {
namespace chaotic_good {
namespace testing {

class MockFrameTransport final : public FrameTransport {
public:
~MockFrameTransport() override;

void StartReading(Party* party, MpscSender<IncomingFrame> frames,
absl::AnyInvocable<void(absl::Status)> on_done) final;
void StartWriting(Party* party, MpscReceiver<Frame> frames,
absl::AnyInvocable<void(absl::Status)> on_done) final;

void ExpectWrite(Frame frame, SourceLocation whence = {}) {
expected_writes_.emplace(std::move(frame), whence);
}
void Read(Frame frame);

private:
struct ExpectedWrite {
ExpectedWrite(Frame frame, SourceLocation whence)
: frame(std::move(frame)), whence(whence) {}
Frame frame;
SourceLocation whence;
};
std::queue<ExpectedWrite> expected_writes_;
MpscSender<IncomingFrame> reader_;
absl::AnyInvocable<void(absl::Status)> on_read_done_;
};

template <typename T>
Frame MakeProtoFrame(std::string body) {
T frame;
CHECK(google::protobuf::TextFormat::ParseFromString(body, &frame.body));
return frame;
}

template <typename T>
Frame MakeProtoFrame(uint32_t stream_id, std::string body) {
T frame;
CHECK(google::protobuf::TextFormat::ParseFromString(body, &frame.body));
frame.stream_id = stream_id;
return frame;
}

inline Frame MakeMessageFrame(uint32_t stream_id, absl::string_view payload) {
MessageFrame frame;
frame.message = Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(payload)), 0);
frame.stream_id = stream_id;
return frame;
}

} // namespace testing
} // namespace chaotic_good
} // namespace grpc_core

#endif
62 changes: 12 additions & 50 deletions test/core/transport/chaotic_good/server_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
#include "src/core/util/ref_counted_ptr.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/transport/chaotic_good/mock_frame_transport.h"
#include "test/core/transport/chaotic_good/transport_test.h"
#include "test/core/transport/util/mock_promise_endpoint.h"

using testing::_;
using testing::MockFunction;
Expand Down Expand Up @@ -109,45 +109,13 @@ class MockServerConnectionFactory : public ServerConnectionFactory {
};

TEST_F(TransportTest, ReadAndWriteOneMessage) {
MockPromiseEndpoint control_endpoint(1);
MockPromiseEndpoint data_endpoint(2);
auto server_connection_factory =
MakeRefCounted<StrictMock<MockServerConnectionFactory>>();
MockFrameTransport frame_transport;
auto call_destination = MakeRefCounted<StrictMock<MockCallDestination>>();
EXPECT_CALL(*call_destination, Orphaned()).Times(1);
auto channel_args = MakeChannelArgs(event_engine());
auto transport = MakeOrphanable<ChaoticGoodServerTransport>(
channel_args, std::move(control_endpoint.promise_endpoint),
MakeConfig(channel_args, std::move(data_endpoint.promise_endpoint)),
server_connection_factory);
const auto server_initial_metadata =
EncodeProto<chaotic_good_frame::ServerMetadata>("message: 'hello'");
const auto server_trailing_metadata =
EncodeProto<chaotic_good_frame::ServerMetadata>("status: 0");
const auto client_initial_metadata =
EncodeProto<chaotic_good_frame::ClientMetadata>(
"path: '/demo.Service/Step'");
// Once we set the acceptor, expect to read some frames.
// We'll return a new request with a payload of "12345678".
control_endpoint.ExpectRead(
{SerializedFrameHeader(FrameType::kClientInitialMetadata, 0, 1,
client_initial_metadata.length()),
client_initial_metadata.Copy(),
SerializedFrameHeader(FrameType::kMessage, 0, 1, 8),
EventEngineSlice::FromCopiedString("12345678"),
SerializedFrameHeader(FrameType::kClientEndOfStream, 0, 1, 0)},
event_engine().get());
// Once that's read we'll create a new call
channel_args, frame_transport, MessageChunker(0, 1));
StrictMock<MockFunction<void()>> on_done;
auto control_address =
grpc_event_engine::experimental::URIToResolvedAddress("ipv4:1.2.3.4:5678")
.value();
EXPECT_CALL(*control_endpoint.endpoint, GetPeerAddress)
.WillRepeatedly(
[&control_address]() -> const grpc_event_engine::experimental::
EventEngine::ResolvedAddress& {
return control_address;
});
EXPECT_CALL(*call_destination, StartCall(_))
.WillOnce(WithArgs<0>([&on_done](
UnstartedCallHandler unstarted_call_handler) {
Expand Down Expand Up @@ -191,24 +159,18 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
});
}));
transport->SetCallDestination(call_destination);
frame_transport.ExpectWrite(
MakeProtoFrame<ServerInitialMetadataFrame>(1, ""));
frame_transport.ExpectWrite(MakeMessageFrame(1, "87654321"));
frame_transport.ExpectWrite(
MakeProtoFrame<ServerTrailingMetadataFrame>(1, ""));
frame_transport.Read(MakeProtoFrame<ClientInitialMetadataFrame>(
1, "path: '/demo.Service/Step'"));
frame_transport.Read(MakeMessageFrame(1, "12345678"));
frame_transport.Read(ClientEndOfStream());
EXPECT_CALL(on_done, Call());
EXPECT_CALL(*control_endpoint.endpoint, Read)
.InSequence(control_endpoint.read_sequence)
.WillOnce(Return(false));
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kServerInitialMetadata, 0, 1,
server_initial_metadata.length()),
server_initial_metadata.Copy(),
SerializedFrameHeader(FrameType::kMessage, 0, 1, 8),
EventEngineSlice::FromCopiedString("87654321"),
SerializedFrameHeader(FrameType::kServerTrailingMetadata, 0, 1,
server_trailing_metadata.length()),
server_trailing_metadata.Copy()},
nullptr);
// Wait until ClientTransport's internal activities to finish.
event_engine()->TickUntilIdle();
::testing::Mock::VerifyAndClearExpectations(control_endpoint.endpoint);
::testing::Mock::VerifyAndClearExpectations(data_endpoint.endpoint);
event_engine()->UnsetGlobalHooks();
}

Expand Down
Loading

0 comments on commit 5a8a1e7

Please sign in to comment.