Skip to content

Commit

Permalink
initial pass on commands
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Dec 2, 2024
1 parent 44ac773 commit a3206a4
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 0 deletions.
52 changes: 52 additions & 0 deletions src/core/ext/transport/chaotic_good/chaotic_good_frame.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,55 @@ message ServerMetadata {
message BeginMessage {
uint64 length = 1;
}

message CommandRequest {
string command = 1;
uint64 request_id = 2;
bytes payload = 3;
}

message CommandResponse {
uint64 request_id = 1;
bytes payload = 2;
}

message TDigest {
// TODO(ctiller): t-digest here
repeated uint64 latencies = 1;
}

message MeasureLatencyRequest {
}

message MeasureLatencyResponse {
map<uint32, TDigest> latencies = 1;
}

message OpenConnectionRequest {
uint32 connection_index = 1;
bytes connection_id = 2;
}

message OpenConnectionResponse {
}

message ParkConnectionRequest {
uint32 connection_index = 1;
}

message ParkConnectionResponse {
}

message UnparkConnectionRequest {
uint32 connection_index = 1;
}

message UnparkConnectionResponse {
}

message CloseConnectionRequest {
uint32 connection_index = 1;
}

message CloseConnectionResponse {
}
118 changes: 118 additions & 0 deletions src/core/ext/transport/chaotic_good/command.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_COMMAND_H
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_COMMAND_H

#include "absl/container/flat_hash_map.h"
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/util/sync.h"

namespace grpc_core {
namespace chaotic_good {

class CommandStub {
public:
template <typename Response>
struct PendingRequest {
CommandRequestFrame request_frame;
Promise<absl::StatusOr<Response>> promise;
};

PendingRequest<chaotic_good_frame::MeasureLatencyResponse> MeasureLatency(
chaotic_good_frame::MeasureLatencyRequest request) {
return MakePendingRequest("MeasureLatency", std::move(request));
}

PendingRequest<chaotic_good_frame::OpenConnectionResponse> OpenConnection(
chaotic_good_frame::OpenConnectionRequest request) {
return MakePendingRequest("OpenConnection", std::move(request));
}

PendingRequest<chaotic_good_frame::ParkConnectionResponse> ParkConnection(
chaotic_good_frame::ParkConnectionRequest request) {
return MakePendingRequest("ParkConnection", std::move(request));
}

PendingRequest<chaotic_good_frame::UnparkConnectionResponse> UnparkConnection(
chaotic_good_frame::UnparkConnectionRequest request) {
return MakePendingRequest("UnparkConnection", std::move(request));
}

PendingRequest<chaotic_good_frame::CloseConnectionResponse> CloseConnection(
chaotic_good_frame::CloseConnectionRequest request) {
return MakePendingRequest("CloseConnection", std::move(request));
}

absl::Status DispatchResponse(CommandResponseFrame response) {
MutexLock lock(&mu_);
auto ex = pending_requests_.extract(response.request_id());
if (ex.empty()) {
return absl::InternalError(absl::StrCat(
"No pending command request with id ", response.request_id()));
}
ex.mapped()->Set(std::move(response));
return absl::OkStatus();
}

private:
template <typename Response, typename Request>
PendingRequest<Response> MakePendingRequest(absl::string_view command,
Request request) {
CommandRequestFrame frame;
frame.set_command(command);
frame.set_payload(request.SerializeAsString());
auto latch = AddRequest(frame);
return PendingRequest<Response> {
std::move(frame),
Map(latch->Wait(), [latch](CommandResponseFrame response_frame) {
Response response;
bool ok = response.ParseFromArray(response_frame.payload().data(),
response_frame.payload().size());
if (!ok) {
return absl::InternalError(
"Failed to parse server command response");
}
return response;
});
}
}

std::shared_ptr<InterActivityLatch<CommandResponseFrame>> AddRequest(
CommandRequestFrame& frame) {
MutexLock lock(&mu_);
auto request_id = next_request_id_;
frame.set_request_id(request_id);
++next_request_id_;
auto latch = std::make_shared<InterActivityLatch<CommandResponseFrame>>();
pending_requests_.emplace(request_id, latch);
return latch;
}

Mutex mu_;
uint64_t next_request_id_ ABSL_GUARDED_BY(mu_);
absl::flat_hash_map<uint64_t,
std::shared_ptr<InterActivityLatch<CommandResponseFrame>>>
pending_requests_ ABSL_GUARDED_BY(mu_);
};

class CommandService {
public:
};

} // namespace chaotic_good
} // namespace grpc_core

#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_COMMAND_H
6 changes: 6 additions & 0 deletions src/core/ext/transport/chaotic_good/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ using ServerTrailingMetadataFrame =
ProtoStreamFrame<FrameType::kServerTrailingMetadata,
chaotic_good_frame::ServerMetadata>;
using CancelFrame = EmptyStreamFrame<FrameType::kCancel>;
using CommandRequestFrame =
ProtoTransportFrame<FrameType::kCommandRequest,
chaotic_good_frame::CommandRequest>;
using CommandResponseFrame =
ProtoTransportFrame<FrameType::kCommandResponse,
chaotic_good_frame::CommandResponse>;

struct MessageFrame final : public FrameInterface {
absl::Status Deserialize(const FrameHeader& header,
Expand Down
4 changes: 4 additions & 0 deletions src/core/ext/transport/chaotic_good/frame_header.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ std::string FrameTypeString(FrameType type) {
switch (type) {
case FrameType::kSettings:
return "Settings";
case FrameType::kCommandRequest:
return "CommandRequest";
case FrameType::kCommandResponse:
return "CommandResponse";
case FrameType::kClientInitialMetadata:
return "ClientInitialMetadata";
case FrameType::kClientEndOfStream:
Expand Down
2 changes: 2 additions & 0 deletions src/core/ext/transport/chaotic_good/frame_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ namespace chaotic_good {
// Remember to add new frame types to frame_fuzzer.cc
enum class FrameType : uint8_t {
kSettings = 0x00,
kCommandRequest = 0x01,
kCommandResponse = 0x02,
kClientInitialMetadata = 0x80,
kClientEndOfStream = 0x81,
kServerInitialMetadata = 0x91,
Expand Down

0 comments on commit a3206a4

Please sign in to comment.