Skip to content

Commit

Permalink
Merge branch 'rubble' of ssh://github.com/ctiller/grpc into rubble
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 18, 2024
2 parents 9e9bd64 + 8d9823f commit ea966c3
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 151 deletions.
95 changes: 95 additions & 0 deletions CMakeLists.txt

Large diffs are not rendered by default.

381 changes: 286 additions & 95 deletions build_autogenerated.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,27 @@ class SettingsHandshake : public RefCounted<SettingsHandshake> {
SettingsFrame frame;
frame.body = client_settings;
SliceBuffer send_buffer;
frame.MakeHeader().Serialize(
send_buffer.AddTiny(FrameHeader::kFrameHeaderSize));
TcpFrameHeader{frame.MakeHeader(), 0}.Serialize(
send_buffer.AddTiny(TcpFrameHeader::kFrameHeaderSize));
frame.SerializePayload(send_buffer);
return TrySeq(
connect_result_.endpoint.Write(std::move(send_buffer)),
[this]() {
return connect_result_.endpoint.ReadSlice(
FrameHeader::kFrameHeaderSize);
TcpFrameHeader::kFrameHeaderSize);
},
[](Slice frame_header) {
return FrameHeader::Parse(frame_header.data());
return TcpFrameHeader::Parse(frame_header.data());
},
[this](FrameHeader frame_header) {
server_header_ = frame_header;
return connect_result_.endpoint.Read(frame_header.payload_length);
[this](TcpFrameHeader frame_header) {
if (frame_header.payload_connection_id != 0) {
return absl::InternalError("Unexpected connection id in frame");
}
server_header_ = frame_header.header;
return absl::OkStatus();
},
[this]() {
return connect_result_.endpoint.Read(server_header_.payload_length);
},
[this](SliceBuffer payload) {
return server_frame_.Deserialize(server_header_, std::move(payload));
Expand Down Expand Up @@ -245,10 +251,15 @@ void ChaoticGoodConnector::Connect(const Args& args, Result* result,
if (!parse_status.ok()) {
return parse_status;
}
auto transport = MakeOrphanable<ChaoticGoodClientTransport>(
result.connect_result.channel_args,
auto frame_transport = MakeRefCounted<TcpFrameTransport>(
result_notifier_ptr->config.MakeTransportOptions(),
std::move(result.connect_result.endpoint),
std::move(result_notifier_ptr->config), std::move(connector));
result_notifier_ptr->config.TakePendingDataEndpoints(),
result_notifier_ptr->args.channel_args
.GetObjectRef<EventEngine>());
auto transport = MakeOrphanable<ChaoticGoodClientTransport>(
result_notifier_ptr->args.channel_args, *frame_transport,
result_notifier_ptr->config.MakeMessageChunker());
result_notifier_ptr->result->transport = transport.release();
result_notifier_ptr->result->channel_args =
result.connect_result.channel_args;
Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/transport/chaotic_good/client_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
self = std::move(self)]() {
return Map(
self->CallOutboundLoop(stream_id, call_handler),
[self, stream_id](StatusFlag result) mutable {
[self, stream_id](StatusFlag result) -> StatusFlag {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id
<< " finished with " << result.ToString();
Expand All @@ -326,7 +326,7 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
return result;
});
},
[]() { return absl::OkStatus(); });
[]() -> Poll<StatusFlag> { return Success{}; });
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/chaotic_good/client_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "frame_transport.h"
#include "src/core/ext/transport/chaotic_good/config.h"
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chaotic_good/frame_transport.h"
#include "src/core/ext/transport/chaotic_good/message_reassembly.h"
#include "src/core/ext/transport/chaotic_good/pending_connection.h"
#include "src/core/lib/promise/activity.h"
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/chaotic_good/frame_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ class FrameTransport : public RefCounted<FrameTransport> {
} // namespace chaotic_good
} // namespace grpc_core

#endif
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_FRAME_TRANSPORT_H
46 changes: 29 additions & 17 deletions src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,27 +289,34 @@ void ChaoticGoodServerListener::ActiveConnection::HandshakingState::Start(
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EndpointReadSettingsFrame(RefCountedPtr<HandshakingState> self) {
return TrySeq(
self->connection_->endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
self->connection_->endpoint_.ReadSlice(TcpFrameHeader::kFrameHeaderSize),
[self](Slice slice) {
// Parse frame header
auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(slice.c_slice())));
if (frame_header.ok() && frame_header->type != FrameType::kSettings) {
frame_header = absl::InternalError("Not a settings frame");
auto frame_header =
TcpFrameHeader::Parse(reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(slice.c_slice())));
if (frame_header.ok()) {
if (frame_header->header.type != FrameType::kSettings) {
frame_header = absl::InternalError("Not a settings frame");
} else if (frame_header->payload_connection_id != 0) {
frame_header = absl::InternalError("Unexpected connection id");
} else if (frame_header->header.stream_id != 0) {
frame_header = absl::InternalError("Unexpected stream id");
}
}
return If(
frame_header.ok(),
[self, &frame_header]() {
return TrySeq(
self->connection_->endpoint_.Read(
frame_header->payload_length),
frame_header->header.payload_length),
[frame_header = *frame_header,
self](SliceBuffer buffer) -> absl::StatusOr<bool> {
// Read Setting frame.
SettingsFrame frame;
// Deserialize frame from read buffer.
auto status =
frame.Deserialize(frame_header, std::move(buffer));
auto status = frame.Deserialize(frame_header.header,
std::move(buffer));
if (!status.ok()) return status;
if (frame.body.data_channel()) {
if (frame.body.connection_id().empty()) {
Expand Down Expand Up @@ -360,17 +367,22 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
absl::get<ControlConnection>(self->data_)
.config.PrepareServerOutgoingSettings(frame.body);
SliceBuffer write_buffer;
frame.MakeHeader().Serialize(
write_buffer.AddTiny(FrameHeader::kFrameHeaderSize));
TcpFrameHeader{frame.MakeHeader(), 0}.Serialize(
write_buffer.AddTiny(TcpFrameHeader::kFrameHeaderSize));
frame.SerializePayload(write_buffer);
return TrySeq(
self->connection_->endpoint_.Write(std::move(write_buffer)), [self]() {
auto config =
std::move(absl::get<ControlConnection>(self->data_).config);
auto frame_transport = MakeRefCounted<TcpFrameTransport>(
config.MakeTransportOptions(),
std::move(self->connection_->endpoint_),
config.TakePendingDataEndpoints(),
self->connection_->args().GetObjectRef<EventEngine>());
return self->connection_->listener_->server_->SetupTransport(
new ChaoticGoodServerTransport(
self->connection_->args(),
std::move(self->connection_->endpoint_),
std::move(absl::get<ControlConnection>(self->data_).config),
self->connection_->listener_->data_connection_listener_),
new ChaoticGoodServerTransport(self->connection_->args(),
*frame_transport,
config.MakeMessageChunker()),
nullptr, self->connection_->args(), nullptr);
});
}
Expand All @@ -381,8 +393,8 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
SettingsFrame frame;
frame.body.set_data_channel(true);
SliceBuffer write_buffer;
frame.MakeHeader().Serialize(
write_buffer.AddTiny(FrameHeader::kFrameHeaderSize));
TcpFrameHeader{frame.MakeHeader(), 0}.Serialize(
write_buffer.AddTiny(TcpFrameHeader::kFrameHeaderSize));
frame.SerializePayload(write_buffer);
// ignore encoding errors: they will be logged separately already
return TrySeq(self->connection_->endpoint_.Write(std::move(write_buffer)),
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/chaotic_good/server_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "frame_transport.h"
#include "src/core/ext/transport/chaotic_good/config.h"
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chaotic_good/frame_transport.h"
#include "src/core/ext/transport/chaotic_good/message_reassembly.h"
#include "src/core/ext/transport/chaotic_good/pending_connection.h"
#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/chaotic_good/tcp_frame_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ class TcpFrameTransport final : public FrameTransport {
} // namespace chaotic_good
} // namespace grpc_core

#endif
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_TCP_FRAME_TRANSPORT_H
17 changes: 4 additions & 13 deletions src/core/ext/transport/chaotic_good_legacy/client_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,28 +268,19 @@ uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) {
return stream_id;
}

namespace {
absl::Status BooleanSuccessToTransportError(bool success) {
return success ? absl::OkStatus()
: absl::UnavailableError("Transport closed.");
}
} // namespace

auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
CallHandler call_handler) {
auto send_fragment = [stream_id,
outgoing_frames = outgoing_frames_.MakeSender()](
ClientFragmentFrame frame) mutable {
frame.stream_id = stream_id;
return Map(outgoing_frames.Send(std::move(frame)),
BooleanSuccessToTransportError);
return outgoing_frames.Send(std::move(frame));
};
auto send_fragment_acked = [stream_id,
outgoing_frames = outgoing_frames_.MakeSender()](
ClientFragmentFrame frame) mutable {
frame.stream_id = stream_id;
return Map(outgoing_frames.SendAcked(std::move(frame)),
BooleanSuccessToTransportError);
return outgoing_frames.SendAcked(std::move(frame));
};
return GRPC_LATENT_SEE_PROMISE(
"CallOutboundLoop",
Expand Down Expand Up @@ -343,7 +334,7 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
return Map(
self->CallOutboundLoop(stream_id, call_handler),
[stream_id, sender = self->outgoing_frames_.MakeSender()](
absl::Status result) mutable {
StatusFlag result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id
<< " finished with " << result.ToString();
Expand All @@ -359,7 +350,7 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
return result;
});
},
[]() { return absl::OkStatus(); });
[]() -> Poll<StatusFlag> { return Success{}; });
});
}

Expand Down
20 changes: 10 additions & 10 deletions src/core/ext/transport/chaotic_good_legacy/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
}

namespace {
auto BooleanSuccessToTransportErrorCapturingInitiator(CallInitiator initiator) {
return [initiator = std::move(initiator)](bool success) {
return success ? absl::OkStatus()
: absl::UnavailableError("Transport closed.");
auto StatusFlagToTransportErrorCapturingInitiator(CallInitiator initiator) {
return [initiator = std::move(initiator)](StatusFlag status) {
return status.ok() ? absl::OkStatus()
: absl::UnavailableError("Transport closed.");
};
}
} // namespace
Expand All @@ -141,9 +141,9 @@ auto ChaoticGoodServerTransport::SendFragment(
<< "CHAOTIC_GOOD: SendFragment: frame=" << frame.ToString();
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.Send(std::move(frame)),
BooleanSuccessToTransportErrorCapturingInitiator(
std::move(call_initiator)));
return Map(
outgoing_frames.Send(std::move(frame)),
StatusFlagToTransportErrorCapturingInitiator(std::move(call_initiator)));
}

auto ChaoticGoodServerTransport::SendFragmentAcked(
Expand All @@ -153,9 +153,9 @@ auto ChaoticGoodServerTransport::SendFragmentAcked(
<< "CHAOTIC_GOOD: SendFragmentAcked: frame=" << frame.ToString();
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.SendAcked(std::move(frame)),
BooleanSuccessToTransportErrorCapturingInitiator(
std::move(call_initiator)));
return Map(
outgoing_frames.SendAcked(std::move(frame)),
StatusFlagToTransportErrorCapturingInitiator(std::move(call_initiator)));
}

auto ChaoticGoodServerTransport::SendCallBody(
Expand Down

0 comments on commit ea966c3

Please sign in to comment.