Skip to content

Commit

Permalink
Use grpc arena
Browse files Browse the repository at this point in the history
This changes allow the usage of the c++ grpc arena feature to allocate
the messages from the same location, the initial motivation of this was
save all the message strings in the same memory portion but seems that
the open source version of the grpc don't allow it and like google has
this internal feature isn't possible create a PR. Please google, if you
dont mind, could you spend a little of your engineering team time to open
source it?

protocolbuffers/protobuf#1896
  • Loading branch information
Gonlo2 committed May 26, 2020
1 parent 90b389e commit e9ee3d6
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 96 deletions.
42 changes: 23 additions & 19 deletions src/mhconfig/api/request/get_request_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace request


GetRequestImpl::GetRequestImpl()
: Request(),
responder_(&ctx_)
: responder_(&ctx_)
{
request_ = google::protobuf::Arena::CreateMessage<mhconfig::proto::GetRequest>(&arena_);
response_ = google::protobuf::Arena::CreateMessage<mhconfig::proto::GetResponse>(&arena_);
}

GetRequestImpl::~GetRequestImpl() {
Expand All @@ -22,19 +23,19 @@ const std::string GetRequestImpl::name() const {
}

const std::string& GetRequestImpl::root_path() const {
return request_.root_path();
return request_->root_path();
}

const uint32_t GetRequestImpl::version() const {
return request_.version();
return request_->version();
}

const std::vector<std::string>& GetRequestImpl::overrides() const {
return overrides_;
}

const std::string& GetRequestImpl::document() const {
return request_.document();
return request_->document();
}

const std::vector<std::string>& GetRequestImpl::key() const {
Expand All @@ -44,38 +45,38 @@ const std::vector<std::string>& GetRequestImpl::key() const {
void GetRequestImpl::set_status(get_request::Status status) {
switch (status) {
case get_request::Status::OK:
response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_OK);
response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_OK);
break;
case get_request::Status::ERROR:
response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_ERROR);
response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_ERROR);
break;
case get_request::Status::INVALID_VERSION:
response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_INVALID_VERSION);
response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_INVALID_VERSION);
break;
case get_request::Status::REF_GRAPH_IS_NOT_DAG:
response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_REF_GRAPH_IS_NOT_DAG);
response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_REF_GRAPH_IS_NOT_DAG);
break;
}
}

void GetRequestImpl::set_namespace_id(uint64_t namespace_id) {
response_.set_namespace_id(namespace_id);
response_->set_namespace_id(namespace_id);
}

void GetRequestImpl::set_version(uint32_t version) {
response_.set_version(version);
response_->set_version(version);
}

void GetRequestImpl::set_element(mhconfig::ElementRef element) {
elements_data_.clear();
response_.clear_elements();
mhconfig::api::config::fill_elements(element, &response_, response_.add_elements());
response_->clear_elements();
mhconfig::api::config::fill_elements(element, response_, response_->add_elements());
}

void GetRequestImpl::set_element_bytes(const char* data, size_t len) {
elements_data_.clear();
elements_data_.write(data, len);
response_.clear_elements();
response_->clear_elements();
}

bool GetRequestImpl::commit() {
Expand All @@ -99,10 +100,13 @@ void GetRequestImpl::subscribe(
void GetRequestImpl::request(
SchedulerQueue::Sender* scheduler_sender
) {
bool ok = parse_from_byte_buffer(raw_request_, request_);
if (ok) {
overrides_ = to_vector(request_.overrides());
key_ = to_vector(request_.key());
auto status = grpc::SerializationTraits<mhconfig::proto::GetRequest>::Deserialize(
&raw_request_,
request_
);
if (status.ok()) {
overrides_ = to_vector(request_->overrides());
key_ = to_vector(request_->key());

scheduler_sender->push(
std::make_unique<scheduler::command::ApiGetCommand>(
Expand All @@ -116,7 +120,7 @@ void GetRequestImpl::request(
}

void GetRequestImpl::finish() {
bool ok = response_.SerializeToOstream(&elements_data_);
bool ok = response_->SerializeToOstream(&elements_data_);
if (ok) {
grpc::Slice slice(elements_data_.str());
grpc::ByteBuffer raw_response(&slice, 1);
Expand Down
7 changes: 5 additions & 2 deletions src/mhconfig/api/request/get_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "mhconfig/scheduler/command/command.h"
#include "mhconfig/scheduler/command/api_get_command.h"

#include <grpcpp/impl/codegen/serialization_traits.h>

namespace mhconfig
{
namespace api
Expand Down Expand Up @@ -46,12 +48,13 @@ class GetRequestImpl : public Request, public GetRequest, public std::enable_sha
bool commit() override;

protected:
google::protobuf::Arena arena_;
grpc::ServerAsyncResponseWriter<grpc::ByteBuffer> responder_;

grpc::ByteBuffer raw_request_;

mhconfig::proto::GetRequest request_;
mhconfig::proto::GetResponse response_;
mhconfig::proto::GetRequest* request_;
mhconfig::proto::GetResponse* response_;

std::stringstream elements_data_;

Expand Down
13 changes: 7 additions & 6 deletions src/mhconfig/api/request/run_gc_request_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ namespace request
{

RunGCRequestImpl::RunGCRequestImpl()
: Request(),
responder_(&ctx_)
: responder_(&ctx_)
{
request_ = google::protobuf::Arena::CreateMessage<mhconfig::proto::RunGCRequest>(&arena_);
response_ = google::protobuf::Arena::CreateMessage<mhconfig::proto::RunGCResponse>(&arena_);
}

RunGCRequestImpl::~RunGCRequestImpl() {
Expand All @@ -31,7 +32,7 @@ void RunGCRequestImpl::subscribe(
CustomService* service,
grpc::ServerCompletionQueue* cq
) {
service->RequestRunGC(&ctx_, &request_, &responder_, cq, cq, tag());
service->RequestRunGC(&ctx_, request_, &responder_, cq, cq, tag());
}

bool RunGCRequestImpl::commit() {
Expand All @@ -52,7 +53,7 @@ void RunGCRequestImpl::request(
}

run_gc::Type RunGCRequestImpl::type() {
switch (request_.type()) {
switch (request_->type()) {
case mhconfig::proto::RunGCRequest::Type::RunGCRequest_Type_CACHE_GENERATION_0:
return scheduler::command::run_gc::Type::CACHE_GENERATION_0;
case mhconfig::proto::RunGCRequest::Type::RunGCRequest_Type_CACHE_GENERATION_1:
Expand All @@ -69,11 +70,11 @@ run_gc::Type RunGCRequestImpl::type() {
}

uint32_t RunGCRequestImpl::max_live_in_seconds() {
return request_.max_live_in_seconds();
return request_->max_live_in_seconds();
}

void RunGCRequestImpl::finish() {
responder_.Finish(response_, grpc::Status::OK, tag());
responder_.Finish(*response_, grpc::Status::OK, tag());
}


Expand Down
5 changes: 3 additions & 2 deletions src/mhconfig/api/request/run_gc_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class RunGCRequestImpl : public Request, public RunGCRequest, public std::enable
bool commit() override;

protected:
google::protobuf::Arena arena_;
grpc::ServerAsyncResponseWriter<mhconfig::proto::RunGCResponse> responder_;

mhconfig::proto::RunGCRequest request_;
mhconfig::proto::RunGCResponse response_;
mhconfig::proto::RunGCRequest* request_;
mhconfig::proto::RunGCResponse* response_;

void request(
SchedulerQueue::Sender* scheduler_sender
Expand Down
21 changes: 11 additions & 10 deletions src/mhconfig/api/request/update_request_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace request


UpdateRequestImpl::UpdateRequestImpl()
: Request(),
responder_(&ctx_)
: responder_(&ctx_)
{
request_ = google::protobuf::Arena::CreateMessage<mhconfig::proto::UpdateRequest>(&arena_);
response_ = google::protobuf::Arena::CreateMessage<mhconfig::proto::UpdateResponse>(&arena_);
}

UpdateRequestImpl::~UpdateRequestImpl() {
Expand All @@ -22,7 +23,7 @@ const std::string UpdateRequestImpl::name() const {
}

const std::string& UpdateRequestImpl::root_path() const {
return request_.root_path();
return request_->root_path();
}

const std::vector<std::string>& UpdateRequestImpl::relative_paths() const {
Expand All @@ -31,22 +32,22 @@ const std::vector<std::string>& UpdateRequestImpl::relative_paths() const {


void UpdateRequestImpl::set_namespace_id(uint64_t namespace_id) {
response_.set_namespace_id(namespace_id);
response_->set_namespace_id(namespace_id);
}

void UpdateRequestImpl::set_status(update_request::Status status) {
switch (status) {
case update_request::Status::OK:
response_.set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_OK);
response_->set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_OK);
break;
case update_request::Status::ERROR:
response_.set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_ERROR);
response_->set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_ERROR);
break;
}
}

void UpdateRequestImpl::set_version(uint32_t version) {
response_.set_version(version);
response_->set_version(version);
}

bool UpdateRequestImpl::commit() {
Expand All @@ -64,13 +65,13 @@ void UpdateRequestImpl::subscribe(
CustomService* service,
grpc::ServerCompletionQueue* cq
) {
service->RequestUpdate(&ctx_, &request_, &responder_, cq, cq, tag());
service->RequestUpdate(&ctx_, request_, &responder_, cq, cq, tag());
}

void UpdateRequestImpl::request(
SchedulerQueue::Sender* scheduler_sender
) {
relative_paths_ = to_vector(request_.relative_paths());
relative_paths_ = to_vector(request_->relative_paths());

scheduler_sender->push(
std::make_unique<scheduler::command::ApiUpdateCommand>(
Expand All @@ -80,7 +81,7 @@ void UpdateRequestImpl::request(
}

void UpdateRequestImpl::finish() {
responder_.Finish(response_, grpc::Status::OK, tag());
responder_.Finish(*response_, grpc::Status::OK, tag());
}


Expand Down
5 changes: 3 additions & 2 deletions src/mhconfig/api/request/update_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ class UpdateRequestImpl : public Request, public UpdateRequest, public std::enab
bool commit() override;

protected:
google::protobuf::Arena arena_;
grpc::ServerAsyncResponseWriter<mhconfig::proto::UpdateResponse> responder_;

mhconfig::proto::UpdateRequest request_;
mhconfig::proto::UpdateResponse response_;
mhconfig::proto::UpdateRequest* request_;
mhconfig::proto::UpdateResponse* response_;

std::vector<std::string> relative_paths_;

Expand Down
2 changes: 2 additions & 0 deletions src/mhconfig/api/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/security/server_credentials.h>
#include <google/protobuf/arena.h>

#include "mhconfig/common.h"
#include "mhconfig/scheduler/command/command.h"
Expand Down Expand Up @@ -56,6 +57,7 @@ class Service final
CustomService service_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> cqs_;


void subscribe_requests(
grpc::ServerCompletionQueue* cq
);
Expand Down
22 changes: 0 additions & 22 deletions src/mhconfig/api/session.cpp
Original file line number Diff line number Diff line change
@@ -1,23 +1 @@
#include "mhconfig/api/session.h"

namespace mhconfig
{
namespace api
{

bool parse_from_byte_buffer(
const grpc::ByteBuffer& buffer,
grpc::protobuf::Message& message
) {
std::vector<grpc::Slice> slices;
buffer.Dump(&slices);
grpc::string buf;
buf.reserve(buffer.Length());
for (auto s = slices.begin(); s != slices.end(); ++s) {
buf.append(reinterpret_cast<const char*>(s->begin()), s->size());
}
return message.ParseFromString(buf);
}

} /* api */
} /* mhconfig */
5 changes: 0 additions & 5 deletions src/mhconfig/api/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ std::vector<T> to_vector(const ::google::protobuf::RepeatedPtrField<T>& proto_re
return result;
}

bool parse_from_byte_buffer(
const grpc::ByteBuffer& buffer,
grpc::protobuf::Message& message
);

template<typename T, typename... Args>
inline std::shared_ptr<T> make_session(Args&&... args)
{
Expand Down
12 changes: 4 additions & 8 deletions src/mhconfig/api/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Stream : public Session
//TODO add request metrics
prepare_next_request();
} else if (status_ == Status::PROCESS) {
request(std::move(next_req_), scheduler_sender);
request(scheduler_sender);
prepare_next_request();
send_message_if_neccesary();
} else if (status_ == Status::WRITING) {
Expand All @@ -86,9 +86,11 @@ class Stream : public Session
protected:
grpc::ServerAsyncReaderWriter<Resp, Req> stream_;

//TODO Use this with CRTP
virtual void prepare_next_request() = 0;

//TODO Use this with CRTP
virtual void request(
std::unique_ptr<Req>&& req,
SchedulerQueue::Sender* scheduler_sender
) = 0;

Expand Down Expand Up @@ -127,14 +129,8 @@ class Stream : public Session
Status status_{Status::CREATE};

std::queue<std::shared_ptr<OutMsg>> messages_to_send_;
std::unique_ptr<Req> next_req_;
bool going_to_finish_{false};

void prepare_next_request() {
next_req_ = std::make_unique<Req>();
stream_.Read(next_req_.get(), tag());
}

void send_message_if_neccesary() {
if (!messages_to_send_.empty()) {
auto message = messages_to_send_.front();
Expand Down
Loading

0 comments on commit e9ee3d6

Please sign in to comment.