diff --git a/src/mhconfig/api/request/get_request_impl.cpp b/src/mhconfig/api/request/get_request_impl.cpp index 924bfd7..32bfcff 100644 --- a/src/mhconfig/api/request/get_request_impl.cpp +++ b/src/mhconfig/api/request/get_request_impl.cpp @@ -9,9 +9,10 @@ namespace request GetRequestImpl::GetRequestImpl() - : Request(), - responder_(&ctx_) + : responder_(&ctx_) { + request_ = google::protobuf::Arena::CreateMessage(&arena_); + response_ = google::protobuf::Arena::CreateMessage(&arena_); } GetRequestImpl::~GetRequestImpl() { @@ -22,11 +23,11 @@ const std::string GetRequestImpl::name() const { } const std::string& GetRequestImpl::root_path() const { - return request_.root_path(); + return request_->root_path(); } const uint32_t GetRequestImpl::version() const { - return request_.version(); + return request_->version(); } const std::vector& GetRequestImpl::overrides() const { @@ -34,7 +35,7 @@ const std::vector& GetRequestImpl::overrides() const { } const std::string& GetRequestImpl::document() const { - return request_.document(); + return request_->document(); } const std::vector& GetRequestImpl::key() const { @@ -44,38 +45,38 @@ const std::vector& GetRequestImpl::key() const { void GetRequestImpl::set_status(get_request::Status status) { switch (status) { case get_request::Status::OK: - response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_OK); + response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_OK); break; case get_request::Status::ERROR: - response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_ERROR); + response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_ERROR); break; case get_request::Status::INVALID_VERSION: - response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_INVALID_VERSION); + response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_INVALID_VERSION); break; case get_request::Status::REF_GRAPH_IS_NOT_DAG: - response_.set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_REF_GRAPH_IS_NOT_DAG); + response_->set_status(::mhconfig::proto::GetResponse_Status::GetResponse_Status_REF_GRAPH_IS_NOT_DAG); break; } } void GetRequestImpl::set_namespace_id(uint64_t namespace_id) { - response_.set_namespace_id(namespace_id); + response_->set_namespace_id(namespace_id); } void GetRequestImpl::set_version(uint32_t version) { - response_.set_version(version); + response_->set_version(version); } void GetRequestImpl::set_element(mhconfig::ElementRef element) { elements_data_.clear(); - response_.clear_elements(); - mhconfig::api::config::fill_elements(element, &response_, response_.add_elements()); + response_->clear_elements(); + mhconfig::api::config::fill_elements(element, response_, response_->add_elements()); } void GetRequestImpl::set_element_bytes(const char* data, size_t len) { elements_data_.clear(); elements_data_.write(data, len); - response_.clear_elements(); + response_->clear_elements(); } bool GetRequestImpl::commit() { @@ -99,10 +100,13 @@ void GetRequestImpl::subscribe( void GetRequestImpl::request( SchedulerQueue::Sender* scheduler_sender ) { - bool ok = parse_from_byte_buffer(raw_request_, request_); - if (ok) { - overrides_ = to_vector(request_.overrides()); - key_ = to_vector(request_.key()); + auto status = grpc::SerializationTraits::Deserialize( + &raw_request_, + request_ + ); + if (status.ok()) { + overrides_ = to_vector(request_->overrides()); + key_ = to_vector(request_->key()); scheduler_sender->push( std::make_unique( @@ -116,7 +120,7 @@ void GetRequestImpl::request( } void GetRequestImpl::finish() { - bool ok = response_.SerializeToOstream(&elements_data_); + bool ok = response_->SerializeToOstream(&elements_data_); if (ok) { grpc::Slice slice(elements_data_.str()); grpc::ByteBuffer raw_response(&slice, 1); diff --git a/src/mhconfig/api/request/get_request_impl.h b/src/mhconfig/api/request/get_request_impl.h index 48c6c85..963b829 100644 --- a/src/mhconfig/api/request/get_request_impl.h +++ b/src/mhconfig/api/request/get_request_impl.h @@ -7,6 +7,8 @@ #include "mhconfig/scheduler/command/command.h" #include "mhconfig/scheduler/command/api_get_command.h" +#include + namespace mhconfig { namespace api @@ -46,12 +48,13 @@ class GetRequestImpl : public Request, public GetRequest, public std::enable_sha bool commit() override; protected: + google::protobuf::Arena arena_; grpc::ServerAsyncResponseWriter responder_; grpc::ByteBuffer raw_request_; - mhconfig::proto::GetRequest request_; - mhconfig::proto::GetResponse response_; + mhconfig::proto::GetRequest* request_; + mhconfig::proto::GetResponse* response_; std::stringstream elements_data_; diff --git a/src/mhconfig/api/request/run_gc_request_impl.cpp b/src/mhconfig/api/request/run_gc_request_impl.cpp index 0eb206c..92a4ab4 100644 --- a/src/mhconfig/api/request/run_gc_request_impl.cpp +++ b/src/mhconfig/api/request/run_gc_request_impl.cpp @@ -8,9 +8,10 @@ namespace request { RunGCRequestImpl::RunGCRequestImpl() - : Request(), - responder_(&ctx_) + : responder_(&ctx_) { + request_ = google::protobuf::Arena::CreateMessage(&arena_); + response_ = google::protobuf::Arena::CreateMessage(&arena_); } RunGCRequestImpl::~RunGCRequestImpl() { @@ -31,7 +32,7 @@ void RunGCRequestImpl::subscribe( CustomService* service, grpc::ServerCompletionQueue* cq ) { - service->RequestRunGC(&ctx_, &request_, &responder_, cq, cq, tag()); + service->RequestRunGC(&ctx_, request_, &responder_, cq, cq, tag()); } bool RunGCRequestImpl::commit() { @@ -52,7 +53,7 @@ void RunGCRequestImpl::request( } run_gc::Type RunGCRequestImpl::type() { - switch (request_.type()) { + switch (request_->type()) { case mhconfig::proto::RunGCRequest::Type::RunGCRequest_Type_CACHE_GENERATION_0: return scheduler::command::run_gc::Type::CACHE_GENERATION_0; case mhconfig::proto::RunGCRequest::Type::RunGCRequest_Type_CACHE_GENERATION_1: @@ -69,11 +70,11 @@ run_gc::Type RunGCRequestImpl::type() { } uint32_t RunGCRequestImpl::max_live_in_seconds() { - return request_.max_live_in_seconds(); + return request_->max_live_in_seconds(); } void RunGCRequestImpl::finish() { - responder_.Finish(response_, grpc::Status::OK, tag()); + responder_.Finish(*response_, grpc::Status::OK, tag()); } diff --git a/src/mhconfig/api/request/run_gc_request_impl.h b/src/mhconfig/api/request/run_gc_request_impl.h index cdf220e..62bffb2 100644 --- a/src/mhconfig/api/request/run_gc_request_impl.h +++ b/src/mhconfig/api/request/run_gc_request_impl.h @@ -36,10 +36,11 @@ class RunGCRequestImpl : public Request, public RunGCRequest, public std::enable bool commit() override; protected: + google::protobuf::Arena arena_; grpc::ServerAsyncResponseWriter responder_; - mhconfig::proto::RunGCRequest request_; - mhconfig::proto::RunGCResponse response_; + mhconfig::proto::RunGCRequest* request_; + mhconfig::proto::RunGCResponse* response_; void request( SchedulerQueue::Sender* scheduler_sender diff --git a/src/mhconfig/api/request/update_request_impl.cpp b/src/mhconfig/api/request/update_request_impl.cpp index 9c0f815..12b7ea9 100644 --- a/src/mhconfig/api/request/update_request_impl.cpp +++ b/src/mhconfig/api/request/update_request_impl.cpp @@ -9,9 +9,10 @@ namespace request UpdateRequestImpl::UpdateRequestImpl() - : Request(), - responder_(&ctx_) + : responder_(&ctx_) { + request_ = google::protobuf::Arena::CreateMessage(&arena_); + response_ = google::protobuf::Arena::CreateMessage(&arena_); } UpdateRequestImpl::~UpdateRequestImpl() { @@ -22,7 +23,7 @@ const std::string UpdateRequestImpl::name() const { } const std::string& UpdateRequestImpl::root_path() const { - return request_.root_path(); + return request_->root_path(); } const std::vector& UpdateRequestImpl::relative_paths() const { @@ -31,22 +32,22 @@ const std::vector& UpdateRequestImpl::relative_paths() const { void UpdateRequestImpl::set_namespace_id(uint64_t namespace_id) { - response_.set_namespace_id(namespace_id); + response_->set_namespace_id(namespace_id); } void UpdateRequestImpl::set_status(update_request::Status status) { switch (status) { case update_request::Status::OK: - response_.set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_OK); + response_->set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_OK); break; case update_request::Status::ERROR: - response_.set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_ERROR); + response_->set_status(::mhconfig::proto::UpdateResponse_Status::UpdateResponse_Status_ERROR); break; } } void UpdateRequestImpl::set_version(uint32_t version) { - response_.set_version(version); + response_->set_version(version); } bool UpdateRequestImpl::commit() { @@ -64,13 +65,13 @@ void UpdateRequestImpl::subscribe( CustomService* service, grpc::ServerCompletionQueue* cq ) { - service->RequestUpdate(&ctx_, &request_, &responder_, cq, cq, tag()); + service->RequestUpdate(&ctx_, request_, &responder_, cq, cq, tag()); } void UpdateRequestImpl::request( SchedulerQueue::Sender* scheduler_sender ) { - relative_paths_ = to_vector(request_.relative_paths()); + relative_paths_ = to_vector(request_->relative_paths()); scheduler_sender->push( std::make_unique( @@ -80,7 +81,7 @@ void UpdateRequestImpl::request( } void UpdateRequestImpl::finish() { - responder_.Finish(response_, grpc::Status::OK, tag()); + responder_.Finish(*response_, grpc::Status::OK, tag()); } diff --git a/src/mhconfig/api/request/update_request_impl.h b/src/mhconfig/api/request/update_request_impl.h index 5b56c7b..7529b45 100644 --- a/src/mhconfig/api/request/update_request_impl.h +++ b/src/mhconfig/api/request/update_request_impl.h @@ -41,10 +41,11 @@ class UpdateRequestImpl : public Request, public UpdateRequest, public std::enab bool commit() override; protected: + google::protobuf::Arena arena_; grpc::ServerAsyncResponseWriter responder_; - mhconfig::proto::UpdateRequest request_; - mhconfig::proto::UpdateResponse response_; + mhconfig::proto::UpdateRequest* request_; + mhconfig::proto::UpdateResponse* response_; std::vector relative_paths_; diff --git a/src/mhconfig/api/service.h b/src/mhconfig/api/service.h index 7e5777c..f0d8f9f 100644 --- a/src/mhconfig/api/service.h +++ b/src/mhconfig/api/service.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "mhconfig/common.h" #include "mhconfig/scheduler/command/command.h" @@ -56,6 +57,7 @@ class Service final CustomService service_; std::vector> cqs_; + void subscribe_requests( grpc::ServerCompletionQueue* cq ); diff --git a/src/mhconfig/api/session.cpp b/src/mhconfig/api/session.cpp index 9129153..0636b8f 100644 --- a/src/mhconfig/api/session.cpp +++ b/src/mhconfig/api/session.cpp @@ -1,23 +1 @@ #include "mhconfig/api/session.h" - -namespace mhconfig -{ -namespace api -{ - -bool parse_from_byte_buffer( - const grpc::ByteBuffer& buffer, - grpc::protobuf::Message& message -) { - std::vector slices; - buffer.Dump(&slices); - grpc::string buf; - buf.reserve(buffer.Length()); - for (auto s = slices.begin(); s != slices.end(); ++s) { - buf.append(reinterpret_cast(s->begin()), s->size()); - } - return message.ParseFromString(buf); -} - -} /* api */ -} /* mhconfig */ diff --git a/src/mhconfig/api/session.h b/src/mhconfig/api/session.h index 40f5030..548ec51 100644 --- a/src/mhconfig/api/session.h +++ b/src/mhconfig/api/session.h @@ -26,11 +26,6 @@ std::vector to_vector(const ::google::protobuf::RepeatedPtrField& proto_re return result; } -bool parse_from_byte_buffer( - const grpc::ByteBuffer& buffer, - grpc::protobuf::Message& message -); - template inline std::shared_ptr make_session(Args&&... args) { diff --git a/src/mhconfig/api/stream/stream.h b/src/mhconfig/api/stream/stream.h index 15b23a1..63a6428 100644 --- a/src/mhconfig/api/stream/stream.h +++ b/src/mhconfig/api/stream/stream.h @@ -59,7 +59,7 @@ class Stream : public Session //TODO add request metrics prepare_next_request(); } else if (status_ == Status::PROCESS) { - request(std::move(next_req_), scheduler_sender); + request(scheduler_sender); prepare_next_request(); send_message_if_neccesary(); } else if (status_ == Status::WRITING) { @@ -86,9 +86,11 @@ class Stream : public Session protected: grpc::ServerAsyncReaderWriter stream_; + //TODO Use this with CRTP + virtual void prepare_next_request() = 0; + //TODO Use this with CRTP virtual void request( - std::unique_ptr&& req, SchedulerQueue::Sender* scheduler_sender ) = 0; @@ -127,14 +129,8 @@ class Stream : public Session Status status_{Status::CREATE}; std::queue> messages_to_send_; - std::unique_ptr next_req_; bool going_to_finish_{false}; - void prepare_next_request() { - next_req_ = std::make_unique(); - stream_.Read(next_req_.get(), tag()); - } - void send_message_if_neccesary() { if (!messages_to_send_.empty()) { auto message = messages_to_send_.front(); diff --git a/src/mhconfig/api/stream/watch_stream_impl.cpp b/src/mhconfig/api/stream/watch_stream_impl.cpp index 25f4e62..9cbba3d 100644 --- a/src/mhconfig/api/stream/watch_stream_impl.cpp +++ b/src/mhconfig/api/stream/watch_stream_impl.cpp @@ -12,64 +12,69 @@ WatchOutputMessageImpl::WatchOutputMessageImpl( ) : stream_(stream) { + proto_response_ = google::protobuf::Arena::CreateMessage(&arena_); } WatchOutputMessageImpl::~WatchOutputMessageImpl() { } void WatchOutputMessageImpl::set_uid(uint32_t uid) { - proto_response_.set_uid(uid); + proto_response_->set_uid(uid); } void WatchOutputMessageImpl::set_status(watch::Status status) { switch (status) { case watch::Status::OK: - proto_response_.set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_OK); + proto_response_->set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_OK); break; case watch::Status::ERROR: - proto_response_.set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_ERROR); + proto_response_->set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_ERROR); break; case watch::Status::INVALID_VERSION: - proto_response_.set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_INVALID_VERSION); + proto_response_->set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_INVALID_VERSION); break; case watch::Status::REF_GRAPH_IS_NOT_DAG: - proto_response_.set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_REF_GRAPH_IS_NOT_DAG); + proto_response_->set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_REF_GRAPH_IS_NOT_DAG); break; case watch::Status::UID_IN_USE: - proto_response_.set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_UID_IN_USE); + proto_response_->set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_UID_IN_USE); break; case watch::Status::UNKNOWN_UID: - proto_response_.set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_UNKNOWN_UID); + proto_response_->set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_UNKNOWN_UID); break; case watch::Status::REMOVED: - proto_response_.set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_REMOVED); + proto_response_->set_status(::mhconfig::proto::WatchResponse_Status::WatchResponse_Status_REMOVED); break; } } void WatchOutputMessageImpl::set_namespace_id(uint64_t namespace_id) { - proto_response_.set_namespace_id(namespace_id); + proto_response_->set_namespace_id(namespace_id); } void WatchOutputMessageImpl::set_version(uint32_t version) { - proto_response_.set_version(version); + proto_response_->set_version(version); } void WatchOutputMessageImpl::set_element(mhconfig::ElementRef element) { elements_data_.clear(); - proto_response_.clear_elements(); - mhconfig::api::config::fill_elements(element, &proto_response_, proto_response_.add_elements()); + proto_response_->clear_elements(); + mhconfig::api::config::fill_elements( + element, + proto_response_, + proto_response_->add_elements() + ); } void WatchOutputMessageImpl::set_element_bytes(const char* data, size_t len) { elements_data_.clear(); elements_data_.write(data, len); - proto_response_.clear_elements(); + proto_response_->clear_elements(); } bool WatchOutputMessageImpl::send(bool finish) { if (auto stream = stream_.lock()) { - if (proto_response_.SerializeToOstream(&elements_data_)) { + if (proto_response_->SerializeToOstream(&elements_data_)) { slice_ = grpc::Slice(elements_data_.str()); response_ = grpc::ByteBuffer(&slice_, 1); return stream->send(shared_from_this(), finish); @@ -89,7 +94,6 @@ WatchInputMessageImpl::WatchInputMessageImpl( overrides_ = to_vector(request_->overrides()); } - WatchInputMessageImpl::~WatchInputMessageImpl() { } @@ -234,14 +238,21 @@ bool WatchStreamImpl::unregister(uint32_t uid) { return watcher_by_id_.erase(uid); } +void WatchStreamImpl::prepare_next_request() { + next_req_.Clear(); + stream_.Read(&next_req_, tag()); +} + void WatchStreamImpl::request( - std::unique_ptr&& raw_req, SchedulerQueue::Sender* scheduler_sender ) { auto req = std::make_unique(); - bool ok = parse_from_byte_buffer(*raw_req, *req); + auto status = grpc::SerializationTraits::Deserialize( + &next_req_, + req.get() + ); auto msg = std::make_shared(std::move(req), shared_from_this()); - if (ok) { + if (status.ok()) { if (msg->remove()) { spdlog::debug("Removing watcher with uid {} of the stream {}", msg->uid(), tag()); size_t removed_elements = watcher_by_id_.erase(msg->uid()); diff --git a/src/mhconfig/api/stream/watch_stream_impl.h b/src/mhconfig/api/stream/watch_stream_impl.h index 47c41b6..9ac62b1 100644 --- a/src/mhconfig/api/stream/watch_stream_impl.h +++ b/src/mhconfig/api/stream/watch_stream_impl.h @@ -8,6 +8,8 @@ #include "mhconfig/scheduler/command/command.h" #include "mhconfig/scheduler/command/api_watch_command.h" +#include + #include "spdlog/spdlog.h" namespace mhconfig @@ -43,8 +45,9 @@ class WatchOutputMessageImpl : public WatchOutputMessage, public std::enable_sha grpc::ByteBuffer response_; private: + google::protobuf::Arena arena_; + mhconfig::proto::WatchResponse* proto_response_; std::weak_ptr stream_; - mhconfig::proto::WatchResponse proto_response_; std::stringstream elements_data_; grpc::Slice slice_; @@ -131,12 +134,14 @@ class WatchStreamImpl final protected: friend class WatchOutputMessageImpl; + void prepare_next_request() override; + void request( - std::unique_ptr&& raw_req, SchedulerQueue::Sender* scheduler_sender ) override; private: + grpc::ByteBuffer next_req_; std::unordered_map> watcher_by_id_; }; diff --git a/src/mhconfig/proto/mhconfig.proto b/src/mhconfig/proto/mhconfig.proto index 48fab06..c65d57e 100644 --- a/src/mhconfig/proto/mhconfig.proto +++ b/src/mhconfig/proto/mhconfig.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package mhconfig.proto; +option cc_enable_arenas = true; + service MHConfig { // Public methods rpc Get(GetRequest) returns (GetResponse);