diff --git a/etc/conf/pikiwidb.conf b/etc/conf/pikiwidb.conf index 38d7371cc..3570d42a7 100644 --- a/etc/conf/pikiwidb.conf +++ b/etc/conf/pikiwidb.conf @@ -35,7 +35,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 16 +databases 2 ################################ SNAPSHOTTING ################################# # @@ -343,6 +343,6 @@ rocksdb-ttl-second 604800 rocksdb-periodic-second 259200; ############################### RAFT ############################### -use-raft no +use-raft yes # Braft relies on brpc to communicate via the default port number plus the port offset raft-port-offset 10 diff --git a/ppd/main.cc b/ppd/main.cc new file mode 100644 index 000000000..e3c79645e --- /dev/null +++ b/ppd/main.cc @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "brpc/server.h" +#include "butil/errno.h" +#include "gflags/gflags.h" +#include "spdlog/spdlog.h" + +#include "pd_service.h" + +DEFINE_int32(port, 8080, "Port of rpc server"); +DEFINE_int32(idle_timeout_s, 60, + "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s`"); +DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); + +int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + brpc::Server server; + PlacementDriverServiceImpl service; + if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) { + spdlog::error("Failed to add service for: {}", berror()); + return -1; + } + + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + options.max_concurrency = FLAGS_max_concurrency; + + // 启动服务 + if (server.Start(FLAGS_port, &options) != 0) { + spdlog::error("Failed to start server for: {}", berror()); + return -1; + } + + server.RunUntilAskedToQuit(); +} \ No newline at end of file diff --git a/ppd/pd.proto b/ppd/pd.proto new file mode 100644 index 000000000..84f4f8714 --- /dev/null +++ b/ppd/pd.proto @@ -0,0 +1,93 @@ +syntax = "proto3"; +package pikiwidb; +option cc_generic_services = true; + +message Peer { + string group_id = 1; + int32 cluster_idx = 2; +}; + +message GetClusterInfoRequest { +}; + +message GetClusterInfoResponse { + bool success = 1; + repeated Store store = 2; +}; + +message Store { + int64 store_id = 1; + string ip = 2; + int32 port = 3; + StoreState state = 4; + repeated Region region = 5; +}; + +message Region { + int64 region_id = 1; + string start_key = 2; + string end_key = 3; + repeated RegionEpoch region_epoch = 4; + repeated Peer peers = 5; +}; + +message RegionEpoch { + int64 conf_change_ver = 1; // conf change version + int64 region_ver = 2; // region version (split or merge) +}; + +enum StoreState { + UP = 0; + OFFLINE = 1; + TOMBSTONE = 2; +}; + +message RegionOptions { + string start_key = 1; + string end_key = 2; + int32 max_data_size = 3; +}; + +message CreateAllRegionsRequest { + int64 regions_count = 1; + int32 region_peers_count = 2; + repeated RegionOptions regionOptions = 3; +}; + +message CreateAllRegionsResponse { + bool success = 1; +}; + +message DeleteAllRegionsRequest { +}; + +message DeleteAllRegionsResponse { + bool success = 1; +}; + +message AddStoreRequest { + string ip = 1; + int32 port = 2; +}; + +message AddStoreResponse { + bool success = 1; + optional int64 store_id = 2; + optional string redirect = 3; +}; + +message RemoveStoreRequest { + int64 store_id = 1; +}; + +message RemoveStoreResponse { + bool success = 1; +}; + +service PlacementDriverService { + rpc CreateAllRegions(CreateAllRegionsRequest) returns (CreateAllRegionsResponse); + rpc DeleteAllRegions(DeleteAllRegionsRequest) returns (DeleteAllRegionsResponse); + rpc AddStore(AddStoreRequest) returns (AddStoreResponse); + rpc RemoveStore(RemoveStoreRequest) returns (RemoveStoreResponse); + rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); +}; \ No newline at end of file diff --git a/ppd/pd_service.cc b/ppd/pd_service.cc new file mode 100644 index 000000000..6463eb3f1 --- /dev/null +++ b/ppd/pd_service.cc @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "pd_service.h" + +#include "pd_server.h" +#include "spdlog/spdlog.h" + +namespace pikiwidb { +void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::CreateAllRegionsRequest* request, + ::pikiwidb::CreateAllRegionsResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::DeleteAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::DeleteAllRegionsRequest* request, + ::pikiwidb::DeleteAllRegionsResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* controller, + const ::pikiwidb::AddStoreRequest* request, + ::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto [success, store_id] = PDSERVER.AddStore(request->ip(), request->port()); + if (!success) { + response->set_success(false); + return; + } + + response->set_success(true); + response->set_store_id(store_id); + spdlog::info("add store success: {}", store_id); +} + +void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller, + const ::pikiwidb::RemoveStoreRequest* request, + ::pikiwidb::RemoveStoreResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::GetClusterInfo(::google::protobuf::RpcController* controller, + const ::pikiwidb::GetClusterInfoRequest* request, + ::pikiwidb::GetClusterInfoResponse* response, + ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + PDSERVER.GetClusterInfo(response); +} + +void PlacementDriverServiceImpl::OpenPDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::OpenPDSchedulingRequest* request, + ::pikiwidb::OpenPDSchedulingResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::ClosePDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::ClosePDSchedulingRequest* request, + ::pikiwidb::ClosePDSchedulingResponse* response, + ::google::protobuf::Closure* done) {} +} // namespace pikiwidb \ No newline at end of file diff --git a/ppd/pd_service.h b/ppd/pd_service.h new file mode 100644 index 000000000..37bcbf113 --- /dev/null +++ b/ppd/pd_service.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "pd.pb.h" + +namespace pikiwidb { + +class PlacementDriverServiceImpl : public PlacementDriverService { + public: + PlacementDriverServiceImpl() = default; + + void CreateAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::CreateAllRegionsRequest* request, + ::pikiwidb::CreateAllRegionsResponse* response, ::google::protobuf::Closure* done) override; + + void DeleteAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::DeleteAllRegionsRequest* request, + ::pikiwidb::DeleteAllRegionsResponse* response, ::google::protobuf::Closure* done) override; + + void AddStore(::google::protobuf::RpcController* controller, const ::pikiwidb::AddStoreRequest* request, + ::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) override; + + void RemoveStore(::google::protobuf::RpcController* controller, const ::pikiwidb::RemoveStoreRequest* request, + ::pikiwidb::RemoveStoreResponse* response, ::google::protobuf::Closure* done) override; + + void GetClusterInfo(::google::protobuf::RpcController* controller, const ::pikiwidb::GetClusterInfoRequest* request, + ::pikiwidb::GetClusterInfoResponse* response, ::google::protobuf::Closure* done) override; + + void OpenPDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::OpenPDSchedulingRequest* request, + ::pikiwidb::OpenPDSchedulingResponse* response, ::google::protobuf::Closure* done) override; + + void ClosePDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::ClosePDSchedulingRequest* request, + ::pikiwidb::ClosePDSchedulingResponse* response, ::google::protobuf::Closure* done) override; +}; + +} // namespace pikiwidb \ No newline at end of file diff --git a/pproxy/main.cc b/pproxy/main.cc new file mode 100644 index 000000000..ff0fa5186 --- /dev/null +++ b/pproxy/main.cc @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "brpc/server.h" +#include "gflags/gflags.h" +#include "spdlog/spdlog.h" + +#include "proxy_service.h" + +DEFINE_int32(port, 8080, "Port of rpc server"); +DEFINE_int32(idle_timeout_s, 60, + "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s`"); +DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); + +int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + brpc::Server server; + ProxyServiceImpl service; + if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) { + spdlog::error("Failed to add service for: {}", berror()); + return -1; + } + + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + options.max_concurrency = FLAGS_max_concurrency; + + if (server.Start(FLAGS_port, &options) != 0) { + spdlog::error("Failed to start server for: {}", berror()); + return -1; + } + + server.RunUntilAskedToQuit(); +} \ No newline at end of file diff --git a/pproxy/proxy.proto b/pproxy/proxy.proto new file mode 100644 index 000000000..2340d83d1 --- /dev/null +++ b/pproxy/proxy.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +package pikiwidb.proxy; +option cc_generic_services = true; + +message RunCommandRequest { + string command = 1; +} + +message RunCommandResponse { + string output = 1; +} +message GetRouteInfoRequest { +} +message GetRouteInfoResponse { + message RouteInfo { + string group_id = 1; + string endpoint = 2; + int32 role = 3; + } + repeated RouteInfo infos = 1; +} + +service ProxyService { + rpc RunCommand(RunCommandRequest) returns (RunCommandResponse); + rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse); +} diff --git a/pproxy/proxy_service.cc b/pproxy/proxy_service.cc new file mode 100644 index 000000000..a69176610 --- /dev/null +++ b/pproxy/proxy_service.cc @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "proxy_service.h" + +#include +#include +#include + +namespace pikiwidb::proxy { +void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl, + const pikiwidb::proxy::RunCommandRequest* request, + pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) { + std::string command = request->command(); // 检查命令是否在白名单中 + + if (!IsCommandAllowed(command)) { + response->set_error("Command not allowed"); + done->Run(); + return; + } + + std::string output = ExecuteCommand(command); + if (output.empty()) { + response->set_error("Command execution failed"); + } else { + response->set_output(output); + } + done->Run(); +} + +void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl, + const pikiwidb::proxy::GetRouteInfoRequest* request, + pikiwidb::proxy::GetRouteInfoResponse* response, + ::google::protobuf::Closure* done) {} + +std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) { + if (!IsCommandAllowed(command)) { + return "Command not allowed"; + } + + std::array buffer; + std::string result; + std::unique_ptr pipe(popen(command.c_str(), "r"), pclose); + if (!pipe) { + return "Failed to execute command"; + } + + while (true) { + if (fgets(buffer.data(), buffer.size(), pipe.get()) == nullptr) { + if (feof(pipe.get())) { + break; + } else { + return "Error reading command output"; + } + } + result += buffer.data(); + } + return result; +} + +} // namespace pikiwidb::proxy \ No newline at end of file diff --git a/pproxy/proxy_service.h b/pproxy/proxy_service.h new file mode 100644 index 000000000..3936cb200 --- /dev/null +++ b/pproxy/proxy_service.h @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "proxy.pb.h" + +class ProxyServiceImpl : public ProxyService { + public: + void RunCommand(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::RunCommandRequest* request, + pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) override; + void GetRouteInfo(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::GetRouteInfoRequest* request, + pikiwidb::proxy::GetRouteInfoResponse* response, ::google::protobuf::Closure* done) override; + + private: + std::string ExecuteCommand(const std::string& command); +}; diff --git a/pproxy/router.cc b/pproxy/router.cc new file mode 100644 index 000000000..c089d033e --- /dev/null +++ b/pproxy/router.cc @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "router.h" +#include +#include + +Router::Router() { + // maximum 100 parameters + params.reserve(100); +} + +Router *Router::add(const char *method, const char *pattern, + std::function &)> handler) { + // step over any initial slash + if (pattern[0] == '/') { + pattern++; + } + + std::vector nodes; + // nodes.push_back(method); + + const char *stop; + const char *start = pattern; + const char *end_ptr = pattern + strlen(pattern); + do { + stop = getNextSegment(start, end_ptr); + + // std::cout << "Segment(" << std::string(start, stop - start) << ")" << std::endl; + + nodes.emplace_back(start, stop - start); + + start = stop + 1; + } while (stop != end_ptr); + + // if pattern starts with / then move 1+ and run inline slash parser + + add(nodes, handlers.size()); + handlers.push_back(handler); + + compile(); + return this; +} + +void Router::compile() { + compiled_tree.clear(); + compile_tree(tree); +} + +void Router::route(const char *method, unsigned int method_length, const char *url, unsigned int url_length, userData) { + handlers[lookup(url, url_length)](userData, params); + params.clear(); +} + +void Router::add(const std::vector &route, short handler) { + Node *parent = tree; + for (const std::string &node : route) { + if (parent->children.find(node) == parent->children.end()) { + parent->children[node] = std::shared_ptr(new Node({node, {}, handler})); + } + parent = parent->children[node]; + } +} + +unsigned short Router::compile_tree(Node *n) { + unsigned short nodeLength = 6 + n->name.length(); + for (const auto &c : n->children) { + nodeLength += compile_tree(c.second); + } + + unsigned short nodeNameLength = n->name.length(); + + std::string compiledNode; + compiledNode.append(reinterpret_cast(&nodeLength), sizeof(nodeLength)); + compiledNode.append(reinterpret_cast(&nodeNameLength), sizeof(nodeNameLength)); + compiledNode.append(reinterpret_cast(&n->handler), sizeof(n->handler)); + compiledNode.append(n->name.data(), n->name.length()); + + compiled_tree = compiledNode + compiled_tree; + return nodeLength; +} + +const char *Router::find_node(const char *parent_node, const char *name, int name_length) { + unsigned short nodeLength = *reinterpret_cast(&parent_node[0]); + unsigned short nodeNameLength = *reinterpret_cast(&parent_node[2]); + + // std::cout << "Finding node: <" << std::string(name, name_length) << ">" << std::endl; + + const char *stoppp = parent_node + nodeLength; + for (const char *candidate = parent_node + 6 + nodeNameLength; candidate < stoppp;) { + unsigned short nodeLength = *reinterpret_cast(&candidate[0]); + unsigned short nodeNameLength = *reinterpret_cast(&candidate[2]); + + // whildcard, parameter, equal + if (nodeNameLength == 0) { + return candidate; + } else if (candidate[6] == ':') { + // parameter + + // todo: push this pointer on the stack of args! + params.push_back(std::string_view({name, static_cast(name_length)})); + + return candidate; + } else if (nodeNameLength == name_length && !memcmp(candidate + 6, name, name_length)) { + return candidate; + } + + candidate = candidate + nodeLength; + } + + return nullptr; +} + +const char *Router::getNextSegment(const char *start, const char *end) { + const char *stop = static_cast(memchr(start, '/', end - start)); + return stop ? stop : end; +} + +int Router::lookup(const char *url, int length) { + // all urls start with / + url++; + length--; + + const char *treeStart = static_cast(compiled_tree.data()); + + const char *stop; + const char *start = url; + const char *end_ptr = url + length; + do { + stop = getNextSegment(start, end_ptr); + + // std::cout << "Matching(" << std::string(start, stop - start) << ")" << std::endl; + + if (nullptr == (treeStart = find_node(treeStart, start, stop - start))) { + return -1; + } + + start = stop + 1; + } while (stop != end_ptr); + + return *reinterpret_cast(&treeStart[4]); +} diff --git a/pproxy/router.h b/pproxy/router.h new file mode 100644 index 000000000..2788a93f3 --- /dev/null +++ b/pproxy/router.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +class Router { + public: + Router(); + + Router *add(const char *method, const char *pattern, std::function &)> handler); + + void compile(); + + void route(const char *method, unsigned int method_length, const char *url, unsigned int url_length, userData); + + private: + std::vector &)>> handlers; + std::vector params; + + struct Node { + std::string name; + std::map> children; + short handler; + }; + + std::shared_ptr tree = std::shared_ptr(new Node({"GET", {}, -1})); + std::string compiled_tree; + + void add(const std::vector &route, short handler); + + unsigned short compile_tree(Node *n); + + const char *find_node(const char *parent_node, const char *name, int name_length); + + // returns next slash from start or end + const char *getNextSegment(const char *start, const char *end); + + // should take method also! + int lookup(const char *url, int length); +}; \ No newline at end of file diff --git a/pproxy/task_manager.cc b/pproxy/task_manager.cc new file mode 100644 index 000000000..35346dd54 --- /dev/null +++ b/pproxy/task_manager.cc @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "task_manager.h" + +#include +#include + +#include "threadpool.h" + +Task::Status Task::TextToStatus(const std::string& input) { + if (input == "pending") { + return Task::pending; + } else if (input == "completed") { + return Task::completed; + } else if (input == "deleted") { + return Task::deleted; + } else if (input == "recurring") { + return Task::recurring; + } else if (input == "waiting") { + return Task::waiting; + } + + return Task::pending; +} +std::string Task::StatusToText(Status s) { + if (s == Task::pending) { + return "pending"; + } else if (s == Task::completed) { + return "completed"; + } else if (s == Task::deleted) { + return "deleted"; + } else if (s == Task::recurring) { + return "recurring"; + } else if (s == Task::waiting) { + return "waiting"; + } + return "pending"; +} + +TaskManager::TaskManager(std::shared_ptr threadpool, size_t maxWorkers) + : _threadpool(std::move(threadpool)), _maxWorkers(maxWorkers) {} + +std::future TaskManager::stop() { + auto task = std::make_shared>([this] { + std::unique_lock guard(mutex_); + bool isLast = workerCount_ == 1; + + // Guarantee that the task finishes last. + while (!isLast) { + guard.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + guard.lock(); + isLast = workerCount_ == 1; + } + }); + auto future = task->get_future(); + + // Adding a new task and expecting the future guarantees that the last batch of tasks is being executed. + auto functor = [task = std::move(task)]() mutable { (*task)(); }; + std::lock_guard guard(mutex_); + + stopped_ = true; + tasks_.emplace(std::move(functor)); + this->processTasks(); + + return future; +} + +void TaskManager::addTask(std::function functor) { + std::lock_guard guard(mutex_); + + if (stopped_) { + return; + } + tasks_.emplace(std::move(functor), Clock::now()); + this->processTasks(); +} + +void TaskManager::processTasks() { + if (tasks_.empty() || workerCount_ == maxWorkers_) { + return; + } + auto task = std::move(tasks_.front()); + tasks_.pop(); + + ++workerCount_; + threadpool_->execute(std::move(task)); +} \ No newline at end of file diff --git a/pproxy/task_manager.h b/pproxy/task_manager.h new file mode 100644 index 000000000..6055a1eca --- /dev/null +++ b/pproxy/task_manager.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "pstd/thread_pool.h" + +class Task { + public: + Task(); + Task(const Task&); + Task& operator=(const Task&); + bool operator==(const Task&); + Task(const std::string&); + ~Task(); + + enum Status { pending, completed, deleted, recurring, waiting }; + + static Status TextToStatus(const std::string& input); + static std::string StatusToText(Status s); + + void SetEntry(); + + Status GetStatus() const; + void SetStatus(Status); + + private: + int determineVersion(const std::string&); + void legacyParse(const std::string&); + int id; +}; + +class TaskManager { + public: + TaskManager(std::shared_ptr threadpool, size_t maxWorkers); + + std::future Stop(); + + template + auto Push(F&& function, Args&&... args) // + -> std::future> { + using ReturnType = std::invoke_result_t; + + auto task = std::make_shared>( // + std::bind(std::forward(function), std::forward(args)...)); + auto future = task->get_future(); + + auto functor = [this, task = std::move(task)]() mutable { + (*task)(); + { + std::lock_guard guard(mutex_); + + --workerCount_; + this->processTasks(); + } + }; + this->addTask(std::move(functor)); + + return future; + } + + private: + void addTask(std::function functor); + void processTasks(); + + private: + std::shared_ptr threadpool_; + std::queue tasks_; + std::mutex mutex_; + size_t maxWorkers_; + size_t workerCount_{0}; + bool stopped_{false}; +}; \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9e066544e..c3794d9e8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -30,7 +30,7 @@ TARGET_INCLUDE_DIRECTORIES(pikiwidb PRIVATE ${rocksdb_SOURCE_DIR}/include PRIVATE ${BRAFT_INCLUDE_DIR} PRIVATE ${BRPC_INCLUDE_DIR} - PRIVATE ${LIBEVENT_INCLUDE_DIRS} + PRIVATE ${PROTO_OUTPUT_DIR} ) ADD_DEPENDENCIES(pikiwidb diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 4e53d6873..24613c858 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -9,15 +9,12 @@ #include "base_cmd.h" -#include "fmt/core.h" - #include "praft/praft.h" +#include "pstd/log.h" -#include "common.h" #include "config.h" -#include "log.h" #include "pikiwidb.h" -#include "praft/praft.h" +#include "store.h" namespace pikiwidb { @@ -41,19 +38,18 @@ std::vector BaseCmd::CurrentKey(PClient* client) const { return std void BaseCmd::Execute(PClient* client) { DEBUG("execute command: {}", client->CmdName()); - // read consistency (lease read) / write redirection - if (g_config.use_raft.load(std::memory_order_relaxed) && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) { - if (!PRAFT.IsInitialized()) { + if (g_config.use_raft.load()) { + auto praft = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + // 1. If PRAFT is not initialized yet, return an error message to the client for both read and write commands. + if (!praft->IsInitialized() && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) { + DEBUG("drop command: {}", client->CmdName()); return client->SetRes(CmdRes::kErrOther, "PRAFT is not initialized"); } - if (!PRAFT.IsLeader()) { - auto leader_addr = PRAFT.GetLeaderAddress(); - if (leader_addr.empty()) { - return client->SetRes(CmdRes::kErrOther, std::string("-CLUSTERDOWN No Raft leader")); - } - - return client->SetRes(CmdRes::kErrOther, fmt::format("-MOVED {}", leader_addr)); + // 2. If PRAFT is initialized and the current node is not the leader, return a redirection message for write + // commands. + if (HasFlag(kCmdFlagsWrite) && !praft->IsLeader()) { + return client->SetRes(CmdRes::kMoved, fmt::format("MOVED {}", praft->GetLeaderAddress())); } } diff --git a/src/client.cc b/src/client.cc index 8b3a1fbf8..22422a3fc 100644 --- a/src/client.cc +++ b/src/client.cc @@ -13,16 +13,14 @@ #include #include "fmt/core.h" + #include "praft/praft.h" #include "pstd/log.h" #include "pstd/pstd_string.h" #include "base_cmd.h" #include "config.h" -#include "env.h" #include "pikiwidb.h" -#include "pstd_string.h" -#include "slow_log.h" #include "store.h" namespace pikiwidb { @@ -280,7 +278,7 @@ int PClient::handlePacket(const char* start, int bytes) { if (isPeerMaster()) { if (isClusterCmdTarget()) { // Proccees the packet at one turn. - int len = PRAFT.ProcessClusterCmdResponse(this, start, bytes); // @todo + int len = PSTORE.GetBackend(dbno_)->GetPRaft()->ProcessClusterCmdResponse(this, start, bytes); // @todo if (len > 0) { return len; } @@ -448,7 +446,7 @@ void PClient::OnConnect() { } if (isClusterCmdTarget()) { - PRAFT.SendNodeRequest(this); + PSTORE.GetBackend(dbno_)->GetPRaft()->SendNodeRequest(this); } } else { if (g_config.password.empty()) { @@ -511,7 +509,8 @@ bool PClient::isPeerMaster() const { } bool PClient::isClusterCmdTarget() const { - return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort(); + auto praft = PSTORE.GetBackend(dbno_)->GetPRaft(); + return praft->GetClusterCmdCtx().GetPeerIp() == PeerIP() && praft->GetClusterCmdCtx().GetPort() == PeerPort(); } uint64_t PClient::uniqueID() const { return GetConnId(); } diff --git a/src/client.h b/src/client.h index 3da74f70b..44b0cf291 100644 --- a/src/client.h +++ b/src/client.h @@ -154,6 +154,8 @@ enum class ClientState { kClosed, }; +const int kChannelTimeoutMS = 200; + class DB; struct PSlaveInfo; diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index be7872747..cc3fd8368 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -11,31 +11,31 @@ management of the PikiwiDB. */ +#include "cmd_admin.h" #include #include #include #include + #include #include - #include #include +#include #include #include #include -#include "cmd_admin.h" -#include "db.h" #include "braft/raft.h" -#include "pstd_string.h" #include "rocksdb/version.h" -#include "pikiwidb.h" #include "praft/praft.h" #include "pstd/env.h" +#include "pstd/pstd_string.h" -#include "cmd_table_manager.h" +#include "db.h" +#include "pikiwidb.h" #include "slow_log.h" #include "store.h" @@ -120,7 +120,7 @@ void FlushallCmd::DoCmd(PClient* client) { } SelectCmd::SelectCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {} + : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {} bool SelectCmd::DoInitial(PClient* client) { return true; } @@ -178,11 +178,11 @@ bool InfoCmd::DoInitial(PClient* client) { return true; } - std::string argv_ = client->argv_[1].data(); + auto argv = client->argv_[1]; // convert section to lowercase - std::transform(argv_.begin(), argv_.end(), argv_.begin(), [](unsigned char c) { return std::tolower(c); }); + std::transform(argv.begin(), argv.end(), argv.begin(), [](unsigned char c) { return std::tolower(c); }); if (argc == 2) { - auto it = sectionMap.find(argv_); + auto it = sectionMap.find(argv); if (it != sectionMap.end()) { info_section_ = it->second; } else { @@ -237,7 +237,7 @@ void InfoCmd::DoCmd(PClient* client) { InfoCommandStats(client, info); break; case kInfoRaft: - InfoRaft(info); + InfoRaft(client, info); break; default: break; @@ -260,23 +260,27 @@ void InfoCmd::DoCmd(PClient* client) { raft_num_voting_nodes:2 raft_node1:id=1733428433,state=connected,voting=yes,addr=localhost,port=5001,last_conn_secs=5,conn_errors=0,conn_oks=1 */ -void InfoCmd::InfoRaft(std::string& message) { - if (!PRAFT.IsInitialized()) { - message += "-ERR Not a cluster member.\r\n"; - return; +void InfoCmd::InfoRaft(PClient* client, std::string& message) { + if (client->argv_.size() != 2) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - auto node_status = PRAFT.GetNodeStatus(); + auto praft = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + assert(praft); + if (!praft->IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); + } + + auto node_status = praft->GetNodeStatus(); if (node_status.state == braft::State::STATE_END) { message += "-ERR Node is not initialized.\r\n"; return; } std::stringstream tmp_stream; - - tmp_stream << "raft_group_id:" << PRAFT.GetGroupID() << "\r\n"; - tmp_stream << "raft_node_id:" << PRAFT.GetNodeID() << "\r\n"; - tmp_stream << "raft_peer_id:" << PRAFT.GetPeerID() << "\r\n"; + tmp_stream << "raft_group_id:" << praft->GetGroupID() << "\r\n"; + tmp_stream << "raft_node_id:" << praft->GetNodeID() << "\r\n"; + tmp_stream << "raft_peer_id:" << praft->GetPeerID() << "\r\n"; if (braft::is_active_state(node_status.state)) { tmp_stream << "raft_state:up\r\n"; } else { @@ -286,9 +290,9 @@ void InfoCmd::InfoRaft(std::string& message) { tmp_stream << "raft_leader_id:" << node_status.leader_id.to_string() << "\r\n"; tmp_stream << "raft_current_term:" << std::to_string(node_status.term) << "\r\n"; - if (PRAFT.IsLeader()) { + if (praft->IsLeader()) { std::vector peers; - auto status = PRAFT.GetListPeers(&peers); + auto status = praft->GetListPeers(&peers); if (!status.ok()) { tmp_stream.str("-ERR "); tmp_stream << status.error_str() << "\r\n"; @@ -319,8 +323,12 @@ void InfoCmd::InfoServer(std::string& info) { tmp_stream << "# Server\r\n"; tmp_stream << "PikiwiDB_version:" << version << "\r\n"; +#ifdef KPIKIWIDB_GIT_COMMIT_ID tmp_stream << "PikiwiDB_build_git_sha:" << KPIKIWIDB_GIT_COMMIT_ID << "\r\n"; +#endif +#ifdef KPIKIWIDB_BUILD_DATE tmp_stream << "Pikiwidb_build_compile_date: " << KPIKIWIDB_BUILD_DATE << "\r\n"; +#endif tmp_stream << "os:" << host_info.sysname << " " << host_info.release << " " << host_info.machine << "\r\n"; tmp_stream << "arch_bits:" << (reinterpret_cast(&host_info.machine) + strlen(host_info.machine) - 2) << "\r\n"; tmp_stream << "process_id:" << getpid() << "\r\n"; diff --git a/src/cmd_admin.h b/src/cmd_admin.h index c79c0561e..33c533e84 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -160,7 +160,7 @@ class InfoCmd : public BaseCmd { void InfoServer(std::string& info); void InfoStats(std::string& info); void InfoCPU(std::string& info); - void InfoRaft(std::string& info); + void InfoRaft(PClient* client, std::string& message); void InfoData(std::string& info); void InfoCommandStats(PClient* client, std::string& info); std::string FormatCommandStatLine(const CommandStatistics& stats); diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index e91c93405..7e2b7237b 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -11,12 +11,14 @@ */ #include "cmd_raft.h" +#include #include #include #include -// #include "net/event_loop.h" +#include "brpc/channel.h" + #include "praft/praft.h" #include "pstd/log.h" #include "pstd/pstd_string.h" @@ -24,7 +26,9 @@ #include "client.h" #include "config.h" #include "pikiwidb.h" +#include "praft.pb.h" #include "replication.h" +#include "store.h" namespace pikiwidb { @@ -39,10 +43,13 @@ bool RaftNodeCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); return false; } + group_id_ = client->argv_[2]; + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); return true; } void RaftNodeCmd::DoCmd(PClient* client) { + assert(praft_); auto cmd = client->argv_[1]; pstd::StringToUpper(cmd); if (cmd == kAddCmd) { @@ -50,6 +57,7 @@ void RaftNodeCmd::DoCmd(PClient* client) { } else if (cmd == kRemoveCmd) { DoCmdRemove(client); } else if (cmd == kDoSnapshot) { + assert(0); // TODO(longfar): add group id in arguments DoCmdSnapshot(client); } else { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); @@ -57,9 +65,13 @@ void RaftNodeCmd::DoCmd(PClient* client) { } void RaftNodeCmd::DoCmdAdd(PClient* client) { + DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); + auto db = PSTORE.GetDBByGroupID(group_id_); + assert(db); + auto leader = db->GetPRaft(); // Check whether it is a leader. If it is not a leader, return the leader information - if (!PRAFT.IsLeader()) { - client->SetRes(CmdRes::kWrongLeader, PRAFT.GetLeaderID()); + if (!leader->IsLeader()) { + client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); return; } @@ -70,7 +82,7 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. // So we do not need to parse and use nodeid like redis; - auto s = PRAFT.AddPeer(client->argv_[3]); + auto s = leader->AddPeer(client->argv_[3]); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { @@ -80,7 +92,7 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { void RaftNodeCmd::DoCmdRemove(PClient* client) { // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (!PRAFT.IsInitialized()) { + if (!praft_->IsInitialized()) { client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); return; } @@ -91,9 +103,9 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } // Check whether it is a leader. If it is not a leader, send remove request to leader - if (!PRAFT.IsLeader()) { + if (!praft_->IsLeader()) { // Get the leader information - braft::PeerId leader_peer_id(PRAFT.GetLeaderID()); + braft::PeerId leader_peer_id(praft_->GetLeaderID()); // @todo There will be an unreasonable address, need to consider how to deal with it if (leader_peer_id.is_empty()) { client->SetRes(CmdRes::kErrOther, @@ -101,24 +113,64 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { return; } - // Connect target - std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); - auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; - auto peer_id = client->argv_[2]; - auto ret = - PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); - if (!ret) { // other clients have removed - return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); - } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent remove request to leader successfully"); - - // Not reply any message here, we will reply after the connection is established. - client->Clear(); + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = kChannelTimeoutMS; + + NodeRemoveRequest request; + NodeRemoveResponse response; + + request.set_group_id(praft_->GetGroupID()); + request.set_endpoint(client->argv_[2]); + request.set_index(client->GetCurrentDB()); + request.set_role(0); + + auto endpoint = leader_peer_id.addr; + int retry_count = 0; + do { + brpc::Channel remove_node_channel; + if (0 != remove_node_channel.Init(endpoint, &options)) { + ERROR("Fail to init remove_node_channel to praft service!"); + client->SetRes(CmdRes::kErrOther, "Fail to init remove_node_channel."); + return; + } + + brpc::Controller cntl; + PRaftService_Stub stub(&remove_node_channel); + stub.RemoveNode(&cntl, &request, &response, nullptr); + + if (cntl.Failed()) { + ERROR("Fail to send remove node rpc to target server {}", butil::endpoint2str(endpoint).c_str()); + client->SetRes(CmdRes::kErrOther, "Failed to send remove node rpc"); + return; + } + + if (response.success()) { + client->SetRes(CmdRes::kOK, "Remove Node Success"); + return; + } + + switch (response.error_code()) { + case PRaftErrorCode::kErrorReDirect: { + butil::str2endpoint(response.leader_endpoint().c_str(), &endpoint); + endpoint.port += g_config.raft_port_offset; + break; + } + default: { + ERROR("Remove node request return false"); + client->SetRes(CmdRes::kErrOther, "Failed to Remove Node"); + return; + } + } + } while (!response.success() && ++retry_count <= 3); + + ERROR("Remove node request return false"); + client->SetRes(CmdRes::kErrOther, "Failed to Remove Node"); return; } - auto s = PRAFT.RemovePeer(client->argv_[2]); + auto s = praft_->RemovePeer(client->argv_[2], client->GetCurrentDB()); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { @@ -127,12 +179,11 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } void RaftNodeCmd::DoCmdSnapshot(PClient* client) { - auto self_snapshot_index = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetSmallestFlushedLogIndex(); - INFO("DoCmdSnapshot self_snapshot_index:{}", self_snapshot_index); - auto s = PRAFT.DoSnapshot(self_snapshot_index); - if (s.ok()) { - client->SetRes(CmdRes::kOK); + auto s = praft_->DoSnapshot(); + if (!s.ok()) { + return client->SetRes(CmdRes::kErrOther, fmt::format("do snapshot error: {}", s.error_cstr())); } + client->SetRes(CmdRes::kOK); } RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) @@ -145,11 +196,14 @@ bool RaftClusterCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); return false; } + + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); return true; } void RaftClusterCmd::DoCmd(PClient* client) { - if (PRAFT.IsInitialized()) { + assert(praft_); + if (praft_->IsInitialized()) { return client->SetRes(CmdRes::kErrOther, "Already cluster member"); } @@ -167,21 +221,29 @@ void RaftClusterCmd::DoCmdInit(PClient* client) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - std::string cluster_id; + std::string group_id; if (client->argv_.size() == 3) { - cluster_id = client->argv_[2]; - if (cluster_id.size() != RAFT_GROUPID_LEN) { + group_id = client->argv_[2]; + if (group_id.size() != RAFT_GROUPID_LEN) { return client->SetRes(CmdRes::kInvalidParameter, "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } } else { - cluster_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); + group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); } - auto s = PRAFT.Init(cluster_id, false); - if (!s.ok()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: ", s.error_str())); + + auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); + if (add_region_success) { + auto s = praft_->Init(group_id, false); + if (!s.ok()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); + } + client->SetLineString(fmt::format("+OK {}", group_id)); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); } - client->SetRes(CmdRes::kOK); } static inline std::optional> GetIpAndPortFromEndPoint(const std::string& endpoint) { @@ -196,43 +258,103 @@ static inline std::optional> GetIpAndPortFromEnd } void RaftClusterCmd::DoCmdJoin(PClient* client) { - // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (PRAFT.IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, - "A node that has been added to a cluster must be removed \ - from the old cluster before it can be added to the new cluster"); + assert(client->argv_.size() == 4); + auto group_id = client->argv_[2]; + auto addr = client->argv_[3]; + butil::EndPoint tar_ep; + if (0 != butil::str2endpoint(addr.c_str(), &tar_ep)) { + ERROR("Wrong endpoint format: {}", addr); + return client->SetRes(CmdRes::kErrOther, "Wrong endpoint format"); } + tar_ep.port += g_config.raft_port_offset; - if (client->argv_.size() < 3) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + if (group_id.size() != RAFT_GROUPID_LEN) { + return client->SetRes(CmdRes::kInvalidParameter, + "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } - // (KKorpse)TODO: Support multiple nodes join at the same time. - if (client->argv_.size() > 3) { - return client->SetRes(CmdRes::kInvalidParameter, "Too many arguments"); + auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); + if (add_region_success) { + auto s = praft_->Init(group_id, false); + if (!s.ok()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); + } + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); } - auto addr = client->argv_[2]; - if (braft::PeerId(addr).is_empty()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); - } + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = kChannelTimeoutMS; + + NodeAddRequest request; + NodeAddResponse response; + + auto self_ep = butil::endpoint2str(PSTORE.GetEndPoint()); + request.set_group_id(group_id); + request.set_endpoint(self_ep.c_str()); + request.set_index(client->GetCurrentDB()); + request.set_role(0); + + int retry_count = 0; + + do { + brpc::Channel add_node_channel; + if (0 != add_node_channel.Init(tar_ep, &options)) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + ERROR("Fail to init add_node_channel to praft service!"); + client->SetRes(CmdRes::kErrOther, "Fail to init add_node_channel."); + return; + } - auto ip_port = GetIpAndPortFromEndPoint(addr); - if (!ip_port.has_value()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); - } - auto& [peer_ip, port] = *ip_port; + brpc::Controller cntl; + PRaftService_Stub stub(&add_node_channel); + stub.AddNode(&cntl, &request, &response, nullptr); - // Connect target - auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, client, std::move(peer_ip), port); - if (!ret) { // other clients have joined - return client->SetRes(CmdRes::kErrOther, "Other clients have joined"); - } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent join request to leader successfully"); + if (cntl.Failed()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + ERROR("Fail to send add node rpc to target server {}, ErrCode={}", addr, cntl.ErrorCode()); + client->SetRes(CmdRes::kErrOther, "Failed to send add node rpc"); + return; + } + + if (response.success()) { + client->SetRes(CmdRes::kOK, "Add Node Success"); + return; + } - // Not reply any message here, we will reply after the connection is established. - client->Clear(); + switch (response.error_code()) { + case PRaftErrorCode::kErrorReDirect: { + butil::str2endpoint(response.leader_endpoint().c_str(), &tar_ep); + tar_ep.port += g_config.raft_port_offset; + break; + } + default: { + ERROR("Add node request return false"); + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); + return; + } + } + } while (!response.success() && ++retry_count <= 3); + + ERROR("Add node request return false"); + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); } +void RaftClusterCmd::ClearPaftCtx() { + assert(praft_); + praft_->ShutDown(); + praft_->Join(); + praft_->Clear(); + praft_ = nullptr; +} } // namespace pikiwidb diff --git a/src/cmd_raft.h b/src/cmd_raft.h index 27ae19d16..d994271a5 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -15,8 +15,10 @@ #include "base_cmd.h" namespace pikiwidb { +class PRaft; -/* RAFT.NODE ADD [id] [address:port] +/* + * RAFT.NODE ADD [id] [address:port] * Add a new node to the cluster. The [id] can be an explicit non-zero value, * or zero to let the cluster choose one. * Reply: @@ -50,20 +52,23 @@ class RaftNodeCmd : public BaseCmd { void DoCmdRemove(PClient *client); void DoCmdSnapshot(PClient *client); + private: + PRaft *praft_ = nullptr; + std::string group_id_; + static constexpr std::string_view kAddCmd = "ADD"; static constexpr std::string_view kRemoveCmd = "REMOVE"; static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT"; }; -/* RAFT.CLUSTER INIT +/* + * RAFT.CLUSTER INIT [group_id] * Initializes a new Raft cluster. - * is an optional 32 character string, if set, cluster will use it for the id * Reply: - * +OK [group_id] + * +OK * - * RAFT.CLUSTER JOIN [addr:port] - * Join an existing cluster. - * The operation is asynchronous and may take place/retry in the background. + * RAFT.CLUSTER JOIN + * Join an existing cluster. The operation is asynchronous and may take place/retry in the background. * Reply: * +OK */ @@ -79,6 +84,11 @@ class RaftClusterCmd : public BaseCmd { void DoCmdInit(PClient *client); void DoCmdJoin(PClient *client); + void ClearPaftCtx(); + + private: + PRaft *praft_ = nullptr; + static constexpr std::string_view kInitCmd = "INIT"; static constexpr std::string_view kJoinCmd = "JOIN"; }; diff --git a/src/config.h b/src/config.h index ad88661de..0b7941d13 100644 --- a/src/config.h +++ b/src/config.h @@ -281,7 +281,7 @@ class PConfig { AtomicString run_id; // The number of databases. - std::atomic databases = 16; + std::atomic databases = 3; /* * For Network I/O threads, in future version, we may delete diff --git a/src/db.cc b/src/db.cc index e736c594e..b00cb6cc3 100644 --- a/src/db.cc +++ b/src/db.cc @@ -9,7 +9,6 @@ */ #include "db.h" -#include #include "config.h" #include "praft/praft.h" @@ -20,7 +19,9 @@ extern pikiwidb::PConfig g_config; namespace pikiwidb { DB::DB(int db_index, const std::string& db_path) - : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {} + : db_index_(db_index), + db_path_(db_path + std::to_string(db_index_) + '/'), + praft_(std::make_unique(db_index)) {} DB::~DB() { INFO("DB{} is closing...", db_index_); } @@ -28,21 +29,18 @@ rocksdb::Status DB::Open() { storage::StorageOptions storage_options; storage_options.options = g_config.GetRocksDBOptions(); storage_options.table_options = g_config.GetRocksDBBlockBasedTableOptions(); - storage_options.options.ttl = g_config.rocksdb_ttl_second.load(std::memory_order_relaxed); storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second.load(std::memory_order_relaxed); - storage_options.small_compaction_threshold = g_config.small_compaction_threshold.load(); storage_options.small_compaction_duration_threshold = g_config.small_compaction_duration_threshold.load(); if (g_config.use_raft.load(std::memory_order_relaxed)) { - storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + storage_options.append_log_function = [&r = *praft_](const Binlog& log, std::promise&& promise) { r.AppendLog(log, std::move(promise)); }; - storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) { - raft->DoSnapshot(std::forward(self_snapshot_index), - std::forward(is_sync)); + storage_options.do_snapshot_function = [&r = *praft_](int64_t self_snapshot_index, bool is_sync) { + return r.DoSnapshot(self_snapshot_index, is_sync); }; } @@ -120,11 +118,12 @@ void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[ma storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second.load(std::memory_order_relaxed); if (g_config.use_raft.load(std::memory_order_relaxed)) { - storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + storage_options.append_log_function = [&r = *praft_](const Binlog& log, std::promise&& promise) { r.AppendLog(log, std::move(promise)); }; - storage_options.do_snapshot_function = - std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2); + storage_options.do_snapshot_function = [&r = *praft_](int64_t self_snapshot_index, bool is_sync) { + return r.DoSnapshot(self_snapshot_index, is_sync); + }; } if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { diff --git a/src/db.h b/src/db.h index 7326606c2..67e62dcbe 100644 --- a/src/db.h +++ b/src/db.h @@ -10,11 +10,10 @@ #pragma once -#include +#include #include -#include "pstd/log.h" -#include "pstd/noncopyable.h" +#include "praft/praft.h" #include "storage/storage.h" namespace pikiwidb { @@ -42,6 +41,8 @@ class DB { int GetDbIndex() { return db_index_; } + PRaft* GetPRaft() { return praft_.get(); } + private: const int db_index_ = 0; const std::string db_path_; @@ -53,6 +54,7 @@ class DB { */ std::shared_mutex storage_mutex_; std::unique_ptr storage_; + std::unique_ptr praft_{nullptr}; bool opened_ = false; }; diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index 93f8f1a11..5b6742315 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -130,7 +130,7 @@ bool PikiwiDB::ParseArgs(int argc, char* argv[]) { unsigned int optarg_long = static_cast(strlen(optarg)); char* str = (char*)calloc(optarg_long, sizeof(char*)); if (str) { - if (sscanf(optarg, "%s:%d", str, &master_port_) != 2) { + if (sscanf(optarg, "%s:%hd", str, &master_port_) != 2) { ERROR("Invalid slaveof format."); free(str); return false; @@ -240,9 +240,6 @@ void PikiwiDB::Run() { } void PikiwiDB::Stop() { - pikiwidb::PRAFT.ShutDown(); - pikiwidb::PRAFT.Join(); - pikiwidb::PRAFT.Clear(); cmd_threads_.Stop(); event_server_->StopServer(); } diff --git a/src/praft/praft.cc b/src/praft/praft.cc index f0653427a..d0dcf4d11 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -10,9 +10,8 @@ #include #include "braft/raft.h" -#include "braft/snapshot.h" #include "braft/util.h" -#include "brpc/server.h" +#include "butil/endpoint.h" #include "gflags/gflags.h" #include "pstd/log.h" @@ -72,70 +71,25 @@ void ClusterCmdContext::ConnectTargetNode() { auto ip = PREPL.GetMasterAddr().GetIP(); auto port = PREPL.GetMasterAddr().GetPort(); if (ip == peer_ip_ && port == port_ && PREPL.GetMasterState() == kPReplStateConnected) { - PRAFT.SendNodeRequest(PREPL.GetMaster()); + praft_->SendNodeRequest(PREPL.GetMaster()); return; } // reconnect auto fail_cb = [&](const std::string& err) { INFO("Failed to connect to cluster node, err: {}", err); - PRAFT.OnClusterCmdConnectionFailed(err); + praft_->OnClusterCmdConnectionFailed(err); }; PREPL.SetFailCallback(fail_cb); PREPL.SetMasterState(kPReplStateNone); PREPL.SetMasterAddr(peer_ip_.c_str(), port_); } -PRaft& PRaft::Instance() { - static PRaft store; - return store; -} - -butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { - if (node_ && server_) { +butil::Status PRaft::Init(const std::string& group_id, bool initial_conf_is_null) { + if (node_) { return {0, "OK"}; } - server_ = std::make_unique(); - auto port = g_config.port + pikiwidb::g_config.raft_port_offset; - // Add your service into RPC server - DummyServiceImpl service(&PRAFT); - if (server_->AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to add service"); - } - // raft can share the same RPC server. Notice the second parameter, because - // adding services into a running server is not allowed and the listen - // address of this server is impossible to get before the server starts. You - // have to specify the address of the server. - if (braft::add_service(server_.get(), port) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to add raft service"); - } - - // It's recommended to start the server before Counter is started to avoid - // the case that it becomes the leader while the service is unreacheable by - // clients. - // Notice the default options of server is used here. Check out details from - // the doc of brpc if you would like change some option; - if (server_->Start(port, nullptr) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to start server"); - } - // It's ok to start PRaft; - assert(group_id.size() == RAFT_GROUPID_LEN); - this->group_id_ = group_id; - - // FIXME: g_config.ip is default to 127.0.0.0, which may not work in cluster. - raw_addr_ = g_config.ip.ToString() + ":" + std::to_string(port); - butil::ip_t ip; - auto ret = butil::str2ip(g_config.ip.ToString().c_str(), &ip); - if (ret != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to convert str_ip to butil::ip_t"); - } - butil::EndPoint addr(ip, port); - // Default init in one node. // initial_conf takes effect only when the replication group is started from an empty node. // The Configuration is restored from the snapshot and log files when the data in the replication group is not empty. @@ -144,37 +98,39 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { // Set initial_conf to empty for other nodes. // You can also start empty nodes simultaneously by setting the same inital_conf(ip:port of multiple nodes) for // multiple nodes. - std::string initial_conf; + braft::NodeOptions node_options; if (!initial_conf_is_null) { - initial_conf = raw_addr_ + ":0,"; - } - if (node_options_.initial_conf.parse_from(initial_conf) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to parse configuration"); + auto endpoint_str = butil::endpoint2str(PSTORE.GetEndPoint()); + std::string initial_conf = fmt::format("{}:{},", endpoint_str.c_str(), db_id_); + if (node_options.initial_conf.parse_from(initial_conf) != 0) { + return ERROR_LOG_AND_STATUS("Failed to parse configuration"); + } } + node_options.fsm = this; + node_options.node_owns_fsm = false; + node_options.snapshot_interval_s = 0; + auto prefix = fmt::format("local://{}{}/{}", g_config.db_path.ToString(), "_praft", db_id_); + node_options.log_uri = prefix + "/log"; + node_options.raft_meta_uri = prefix + "/raft_meta"; + node_options.snapshot_uri = prefix + "/snapshot"; + snapshot_adaptor_ = new PPosixFileSystemAdaptor(this); + node_options.snapshot_file_system_adaptor = &snapshot_adaptor_; + // node_options_.election_timeout_ms = FLAGS_election_timeout_ms; - node_options_.fsm = this; - node_options_.node_owns_fsm = false; - node_options_.snapshot_interval_s = 0; - std::string prefix = "local://" + g_config.db_path.ToString() + std::to_string(db_id_) + "/_praft"; - node_options_.log_uri = prefix + "/log"; - node_options_.raft_meta_uri = prefix + "/raft_meta"; - node_options_.snapshot_uri = prefix + "/snapshot"; // node_options_.disable_cli = FLAGS_disable_cli; - snapshot_adaptor_ = new PPosixFileSystemAdaptor(); - node_options_.snapshot_file_system_adaptor = &snapshot_adaptor_; - node_ = std::make_unique("pikiwidb", braft::PeerId(addr)); // group_id - if (node_->init(node_options_) != 0) { - server_.reset(); + node_ = std::make_unique(group_id, braft::PeerId(PSTORE.GetEndPoint(), db_id_)); // group_id + if (node_->init(node_options) != 0) { node_.reset(); return ERROR_LOG_AND_STATUS("Failed to init raft node"); } + group_id_ = group_id; // enable leader lease braft::FLAGS_raft_enable_leader_lease = true; + INFO("Initialized praft successfully: node_id={}", GetNodeID()); return {0, "OK"}; } @@ -295,9 +251,9 @@ void PRaft::SendNodeRequest(PClient* client) { auto cluster_cmd_type = cluster_cmd_ctx_.GetClusterCmdType(); switch (cluster_cmd_type) { - case ClusterCmdType::kJoin: - SendNodeInfoRequest(client, "DATA"); - break; + case ClusterCmdType::kJoin: { + SendNodeAddRequest(client); + } break; case ClusterCmdType::kRemove: SendNodeRemoveRequest(client); break; @@ -320,17 +276,12 @@ void PRaft::SendNodeAddRequest(PClient* client) { assert(client); // Node id in braft are ip:port, the node id param in RAFT.NODE ADD cmd will be ignored. - int unused_node_id = 0; auto port = g_config.port + pikiwidb::g_config.raft_port_offset; - auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port); - UnboundedBuffer req; - req.PushData("RAFT.NODE ADD ", 14); - req.PushData(std::to_string(unused_node_id).c_str(), std::to_string(unused_node_id).size()); - req.PushData(" ", 1); - req.PushData(raw_addr.data(), raw_addr.size()); - req.PushData("\r\n", 2); - client->SendPacket(req); - // client->Clear(); + auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port) + ":" + std::to_string(db_id_); + auto msg = fmt::format("RAFT.NODE ADD {} {}\r\n", group_id_, raw_addr); + client->SendPacket(std::move(msg)); + INFO("Sent join request to leader successfully"); + client->Clear(); } void PRaft::SendNodeRemoveRequest(PClient* client) { @@ -349,10 +300,10 @@ int PRaft::ProcessClusterCmdResponse(PClient* client, const char* start, int len int ret = 0; switch (cluster_cmd_type) { case ClusterCmdType::kJoin: - ret = PRAFT.ProcessClusterJoinCmdResponse(client, start, len); + ret = ProcessClusterJoinCmdResponse(client, start, len); break; case ClusterCmdType::kRemove: - ret = PRAFT.ProcessClusterRemoveCmdResponse(client, start, len); + ret = ProcessClusterRemoveCmdResponse(client, start, len); break; default: client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER response supports JOIN/REMOVE only"); @@ -419,47 +370,48 @@ void PRaft::LeaderRedirection(PClient* join_client, const std::string& reply) { // Reset the target of the connection cluster_cmd_ctx_.Clear(); - auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, join_client, std::move(peer_ip), port); + auto ret = GetClusterCmdCtx().Set(ClusterCmdType::kJoin, join_client, std::move(peer_ip), port); if (!ret) { // other clients have joined join_client->SetRes(CmdRes::kErrOther, "Other clients have joined"); join_client->SendPacket(); // join_client->Clear(); return; } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); + GetClusterCmdCtx().ConnectTargetNode(); // Not reply any message here, we will reply after the connection is established. join_client->Clear(); } -void PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply) { +bool PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply) { std::string prefix = RAFT_GROUP_ID; std::string::size_type prefix_length = prefix.length(); std::string::size_type group_id_start = reply.find(prefix); group_id_start += prefix_length; // locate the start location of "raft_group_id" std::string::size_type group_id_end = reply.find("\r\n", group_id_start); - if (group_id_end != std::string::npos) { - std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); - // initialize the slave node - auto s = PRAFT.Init(raft_group_id, true); - if (!s.ok()) { - join_client->SetRes(CmdRes::kErrOther, s.error_str()); - join_client->SendPacket(); - // join_client->Clear(); - // If the join fails, clear clusterContext and set it again by using the join command - cluster_cmd_ctx_.Clear(); - return; - } - - PRAFT.SendNodeAddRequest(client); - } else { + if (group_id_end == std::string::npos) { // can't find group id ERROR("Joined Raft cluster fail, because of invalid raft_group_id"); join_client->SetRes(CmdRes::kErrOther, "Invalid raft_group_id"); join_client->SendPacket(); - // join_client->Clear(); // If the join fails, clear clusterContext and set it again by using the join command cluster_cmd_ctx_.Clear(); + return false; } + + std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); + // initialize the slave node + auto s = Init(raft_group_id, true); + if (!s.ok()) { + join_client->SetRes(CmdRes::kErrOther, s.error_str()); + join_client->SendPacket(); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + ERROR("Failed to init raft: {}", s.error_cstr()); + return false; + } + INFO("Init raft successfully, groupid={}", raft_group_id); + return true; } int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len) { @@ -472,7 +424,7 @@ int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int std::string reply(start, len); if (reply.find(OK_STR) != std::string::npos) { - INFO("Joined Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); + INFO("Joined Raft cluster, node id: {}, group_id: {}", GetNodeID(), group_id_); join_client->SetRes(CmdRes::kOK); join_client->SendPacket(); // join_client->Clear(); @@ -483,12 +435,16 @@ int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int } else if (reply.find(WRONG_LEADER) != std::string::npos) { LeaderRedirection(join_client, reply); } else if (reply.find(RAFT_GROUP_ID) != std::string::npos) { - InitializeNodeBeforeAdd(client, join_client, reply); + auto res = InitializeNodeBeforeAdd(client, join_client, reply); + if (!res) { + ERROR("Failed to initialize node before add"); + return len; + } + SendNodeAddRequest(client); } else { ERROR("Joined Raft cluster fail, str: {}", reply); join_client->SetRes(CmdRes::kErrOther, reply); join_client->SendPacket(); - // join_client->Clear(); // If the join fails, clear clusterContext and set it again by using the join command cluster_cmd_ctx_.Clear(); } @@ -506,14 +462,13 @@ int PRaft::ProcessClusterRemoveCmdResponse(PClient* client, const char* start, i std::string reply(start, len); if (reply.find(OK_STR) != std::string::npos) { - INFO("Removed Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); + INFO("Removed Raft cluster, node id: {}, group_id: {}", GetNodeID(), group_id_); ShutDown(); Join(); Clear(); remove_client->SetRes(CmdRes::kOK); remove_client->SendPacket(); - // remove_client->Clear(); } else if (reply.find(NOT_LEADER) != std::string::npos) { auto remove_client = cluster_cmd_ctx_.GetClient(); remove_client->Clear(); @@ -522,7 +477,6 @@ int PRaft::ProcessClusterRemoveCmdResponse(PClient* client, const char* start, i ERROR("Removed Raft cluster fail, str: {}", reply); remove_client->SetRes(CmdRes::kErrOther, reply); remove_client->SendPacket(); - // remove_client->Clear(); } // If the remove fails, clear clusterContext and set it again by using the join command @@ -533,7 +487,7 @@ int PRaft::ProcessClusterRemoveCmdResponse(PClient* client, const char* start, i butil::Status PRaft::AddPeer(const std::string& peer) { if (!node_) { - ERROR_LOG_AND_STATUS("Node is not initialized"); + return ERROR_LOG_AND_STATUS("Node is not initialized"); } braft::SynchronizedClosure done; @@ -548,6 +502,27 @@ butil::Status PRaft::AddPeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::AddPeer(const std::string& endpoint, int index) { + if (!node_) { + return ERROR_LOG_AND_STATUS("Node is not initialized"); + } + + braft::SynchronizedClosure done; + butil::EndPoint ep; + ep.port += g_config.raft_port_offset; + butil::str2endpoint(endpoint.c_str(), &ep); + braft::PeerId peer_id(ep, index); + node_->add_peer(peer_id, &done); + done.wait(); + + if (!done.status().ok()) { + // WARN("Failed to add peer {} to node {}, status: {}", ep, node_->node_id().to_string(), + // done.status().error_str()); + return done.status(); + } + return done.status(); +} + butil::Status PRaft::RemovePeer(const std::string& peer) { if (!node_) { return ERROR_LOG_AND_STATUS("Node is not initialized"); @@ -566,6 +541,26 @@ butil::Status PRaft::RemovePeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::RemovePeer(const std::string& endpoint, int index) { + if (!node_) { + return ERROR_LOG_AND_STATUS("Node is not initialized"); + } + + braft::SynchronizedClosure done; + butil::EndPoint ep; + butil::str2endpoint(endpoint.c_str(), &ep); + ep.port += g_config.raft_port_offset; + braft::PeerId peer_id(ep, index); + node_->remove_peer(peer_id, &done); + done.wait(); + + if (!done.status().ok()) { + WARN("Failed to remove peer, status: {}", done.status().error_str()); + return done.status(); + } + return done.status(); +} + butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) { if (!node_) { return ERROR_LOG_AND_STATUS("Node is not initialized"); @@ -599,10 +594,6 @@ void PRaft::ShutDown() { if (node_) { node_->shutdown(nullptr); } - - if (server_) { - server_->Stop(0); - } } // Blocking this thread until the node is eventually down. @@ -610,10 +601,6 @@ void PRaft::Join() { if (node_) { node_->join(); } - - if (server_) { - server_->Join(); - } } void PRaft::AppendLog(const Binlog& log, std::promise&& promise) { @@ -639,10 +626,6 @@ void PRaft::Clear() { if (node_) { node_.reset(); } - - if (server_) { - server_.reset(); - } } void PRaft::on_apply(braft::Iterator& iter) { diff --git a/src/praft/praft.h b/src/praft/praft.h index 1d2148af7..f31e5fdc5 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -7,11 +7,9 @@ #pragma once -#include #include #include #include -#include #include #include "braft/file_system_adaptor.h" @@ -34,10 +32,9 @@ namespace pikiwidb { #define RAFT_GROUP_ID "raft_group_id:" #define NOT_LEADER "Not leader" -#define PRAFT PRaft::Instance() - -// class EventLoop; +class EventLoop; class Binlog; +class PRaft; enum ClusterCmdType { kNone, @@ -46,10 +43,8 @@ enum ClusterCmdType { }; class ClusterCmdContext { - friend class PRaft; - public: - ClusterCmdContext() = default; + ClusterCmdContext(PRaft* raft) : praft_(raft) {} ~ClusterCmdContext() = default; bool Set(ClusterCmdType cluster_cmd_type, PClient* client, std::string&& peer_ip, int port, @@ -69,6 +64,7 @@ class ClusterCmdContext { void ConnectTargetNode(); private: + PRaft* praft_; ClusterCmdType cluster_cmd_type_ = ClusterCmdType::kNone; std::mutex mtx_; PClient* client_ = nullptr; @@ -94,17 +90,21 @@ class PRaftWriteDoneClosure : public braft::Closure { class PRaft : public braft::StateMachine { public: - PRaft() = default; - ~PRaft() override = default; - - static PRaft& Instance(); + PRaft(int did) : db_id_(did) {} + ~PRaft() override { + ShutDown(); + Join(); + Clear(); + } //===--------------------------------------------------------------------===// // Braft API //===--------------------------------------------------------------------===// - butil::Status Init(std::string& group_id, bool initial_conf_is_null); + butil::Status Init(const std::string& group_id, bool initial_conf_is_null); butil::Status AddPeer(const std::string& peer); + butil::Status AddPeer(const std::string& endpoint, int index); butil::Status RemovePeer(const std::string& peer); + butil::Status RemovePeer(const std::string& endpoint, int index); butil::Status DoSnapshot(int64_t self_snapshot_index = 0, bool is_sync = true); void ShutDown(); @@ -124,7 +124,7 @@ class PRaft : public braft::StateMachine { int ProcessClusterCmdResponse(PClient* client, const char* start, int len); void CheckRocksDBConfiguration(PClient* client, PClient* join_client, const std::string& reply); void LeaderRedirection(PClient* join_client, const std::string& reply); - void InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply); + bool InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply); int ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len); int ProcessClusterRemoveCmdResponse(PClient* client, const char* start, int len); @@ -142,7 +142,7 @@ class PRaft : public braft::StateMachine { storage::LogIndex GetTerm(uint64_t log_index); storage::LogIndex GetLastLogIndex(bool is_flush = false); - bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; } + bool IsInitialized() const { return node_ != nullptr; } private: void on_apply(braft::Iterator& iter) override; @@ -159,16 +159,15 @@ class PRaft : public braft::StateMachine { void on_start_following(const ::braft::LeaderChangeContext& ctx) override; private: - std::unique_ptr server_{nullptr}; // brpc std::unique_ptr node_{nullptr}; butil::atomic leader_term_ = -1; braft::NodeOptions node_options_; // options for raft node std::string raw_addr_; // ip:port of this node scoped_refptr snapshot_adaptor_ = nullptr; - ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command - std::string group_id_; // group id - int db_id_ = 0; // db_id + ClusterCmdContext cluster_cmd_ctx_{this}; // context for cluster join/remove command + std::string group_id_; // group id + int db_id_ = 0; // db_id bool is_node_first_start_up_ = true; }; diff --git a/src/praft/praft.proto b/src/praft/praft.proto index 61a495f21..9bb140e79 100644 --- a/src/praft/praft.proto +++ b/src/praft/praft.proto @@ -2,12 +2,33 @@ syntax="proto3"; package pikiwidb; option cc_generic_services = true; -message DummyRequest { -}; +message NodeAddRequest { + string group_id = 1; + string endpoint = 2; + uint32 index = 3; + uint32 role = 4; +} -message DummyResponse { -}; +message NodeAddResponse { + bool success = 1; + uint32 error_code = 2; + string leader_endpoint = 3; +} + +message NodeRemoveRequest { + string group_id = 1; + string endpoint = 2; + uint32 index = 3; + uint32 role = 4; +} + +message NodeRemoveResponse { + bool success = 1; + uint32 error_code = 2; + string leader_endpoint = 3; +} -service DummyService { - rpc DummyMethod(DummyRequest) returns (DummyResponse); +service PRaftService { + rpc AddNode(NodeAddRequest) returns (NodeAddResponse); + rpc RemoveNode(NodeRemoveRequest) returns (NodeRemoveResponse); }; diff --git a/src/praft/praft_service.cc b/src/praft/praft_service.cc new file mode 100644 index 000000000..c71305aa8 --- /dev/null +++ b/src/praft/praft_service.cc @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "praft_service.h" + +#include "fmt/format.h" +#include "store.h" + +namespace pikiwidb { +void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto groupid = request->group_id(); + auto end_point = request->endpoint(); + auto index = request->index(); + auto role = request->role(); + + auto db_ptr = PSTORE.GetDBByGroupID(groupid); + if (!db_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + auto praft_ptr = db_ptr->GetPRaft(); + if (!praft_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + + if (!praft_ptr->IsLeader()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorReDirect)); + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); + return; + } + + auto status = praft_ptr->AddPeer(end_point, index); + if (!status.ok()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorAddNode)); + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); + return; + } + response->set_success(true); +} + +void PRaftServiceImpl::RemoveNode(::google::protobuf::RpcController* controller, + const ::pikiwidb::NodeRemoveRequest* request, + ::pikiwidb::NodeRemoveResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto groupid = request->group_id(); + auto end_point = request->endpoint(); + auto index = request->index(); + auto role = request->role(); + + auto db_ptr = PSTORE.GetDBByGroupID(groupid); + if (!db_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + auto praft_ptr = db_ptr->GetPRaft(); + if (!praft_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + + if (!praft_ptr->IsLeader()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorReDirect)); + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); + return; + } + + auto status = praft_ptr->RemovePeer(end_point, index); + if (!status.ok()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorAddNode)); + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); + return; + } + response->set_success(true); +} + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/praft/praft_service.h b/src/praft/praft_service.h index db08c7b7b..70e72f1c4 100644 --- a/src/praft/praft_service.h +++ b/src/praft/praft_service.h @@ -12,15 +12,14 @@ namespace pikiwidb { class PRaft; - -class DummyServiceImpl : public DummyService { +class PRaftServiceImpl : public PRaftService { public: - explicit DummyServiceImpl(PRaft* praft) : praft_(praft) {} - void DummyMethod(::google::protobuf::RpcController* controller, const ::pikiwidb::DummyRequest* request, - ::pikiwidb::DummyResponse* response, ::google::protobuf::Closure* done) override {} + PRaftServiceImpl() = default; + void AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done); - private: - PRaft* praft_ = nullptr; + void RemoveNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeRemoveRequest* request, + ::pikiwidb::NodeRemoveResponse* response, ::google::protobuf::Closure* done); }; } // namespace pikiwidb diff --git a/src/praft/psnapshot.cc b/src/praft/psnapshot.cc index 17e389aca..a0b4f9152 100644 --- a/src/praft/psnapshot.cc +++ b/src/praft/psnapshot.cc @@ -83,7 +83,7 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o auto& new_meta = const_cast(snapshot_meta_memtable.meta()); auto last_log_index = PSTORE.GetBackend(db_id)->GetStorage()->GetSmallestFlushedLogIndex(); new_meta.set_last_included_index(last_log_index); - auto last_log_term = PRAFT.GetTerm(last_log_index); + auto last_log_term = praft_->GetTerm(last_log_index); new_meta.set_last_included_term(last_log_term); INFO("Succeed to fix db_{} snapshot meta: {}, {}", db_id, last_log_index, last_log_term); diff --git a/src/praft/psnapshot.h b/src/praft/psnapshot.h index 7aee35c58..2779f77d8 100644 --- a/src/praft/psnapshot.h +++ b/src/praft/psnapshot.h @@ -12,6 +12,7 @@ #include "braft/file_system_adaptor.h" #include "braft/macros.h" #include "braft/snapshot.h" +#include "praft/praft.h" #define PRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" #define PRAFT_SNAPSHOT_PATH "snapshot/snapshot_" @@ -21,8 +22,8 @@ namespace pikiwidb { class PPosixFileSystemAdaptor : public braft::PosixFileSystemAdaptor { public: - PPosixFileSystemAdaptor() {} - ~PPosixFileSystemAdaptor() {} + explicit PPosixFileSystemAdaptor(PRaft* praft) : praft_(praft) {} + ~PPosixFileSystemAdaptor() override = default; braft::FileAdaptor* open(const std::string& path, int oflag, const ::google::protobuf::Message* file_meta, butil::File::Error* e) override; @@ -31,6 +32,7 @@ class PPosixFileSystemAdaptor : public braft::PosixFileSystemAdaptor { private: braft::raft_mutex_t mutex_; + PRaft* praft_{nullptr}; }; } // namespace pikiwidb diff --git a/src/store.cc b/src/store.cc index 6f0500091..59c000f14 100644 --- a/src/store.cc +++ b/src/store.cc @@ -10,16 +10,13 @@ #include "store.h" -#include -#include - #include "config.h" #include "db.h" +#include "praft/praft_service.h" #include "pstd/log.h" #include "pstd/pstd_string.h" namespace pikiwidb { - PStore::~PStore() { INFO("STORE is closing..."); } PStore& PStore::Instance() { @@ -36,7 +33,29 @@ void PStore::Init(int db_number) { backends_.push_back(std::move(db)); INFO("Open DB_{} success!", i); } - INFO("STORE Init success!"); + + auto rpc_ip = g_config.ip.ToString(); + auto rpc_port = + g_config.port.load(std::memory_order_relaxed) + g_config.raft_port_offset.load(std::memory_order_relaxed); + + if (0 != butil::str2endpoint(fmt::format("{}:{}", rpc_ip, std::to_string(rpc_port)).c_str(), &endpoint_)) { + return ERROR("Wrong endpoint format"); + } + + if (0 != braft::add_service(GetRpcServer(), endpoint_)) { + return ERROR("Failed to add raft service to rpc server"); + } + + if (0 != rpc_server_->AddService(dynamic_cast(praft_service_.get()), + brpc::SERVER_OWNS_SERVICE)) { + return ERROR("Failed to add praft service to rpc server"); + } + + if (0 != rpc_server_->Start(endpoint_, nullptr)) { + return ERROR("Failed to start rpc server"); + } + INFO("Started RPC server successfully on addr {}", butil::endpoint2str(endpoint_).c_str()); + INFO("PSTORE Init success!"); } void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { @@ -76,4 +95,32 @@ void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { } }); } + +bool PStore::AddRegion(const std::string& group_id, uint32_t dbno) { + std::lock_guard lock(rw_mutex_); + if (region_map_.find(group_id) != region_map_.end()) { + return false; + } + region_map_.emplace(group_id, dbno); + return true; +} + +bool PStore::RemoveRegion(const std::string& group_id) { + std::lock_guard lock(rw_mutex_); + if (region_map_.find(group_id) != region_map_.end()) { + region_map_.erase(group_id); + return true; + } + return false; +} + +DB* PStore::GetDBByGroupID(const std::string& group_id) const { + std::shared_lock lock(rw_mutex_); + auto it = region_map_.find(group_id); + if (it == region_map_.end()) { + return nullptr; + } + return backends_[it->second].get(); +} + } // namespace pikiwidb diff --git a/src/store.h b/src/store.h index 4383cb392..144706008 100644 --- a/src/store.h +++ b/src/store.h @@ -10,17 +10,27 @@ #pragma once +#include +#include +#include +#include +#include + +#include "praft/praft.h" +#include "praft/praft_service.h" + #define GLOG_NO_ABBREVIATED_SEVERITIES -#include -#include +#include #include -#include "common.h" +#include "brpc/server.h" +#include "butil/endpoint.h" + #include "db.h" -#include "storage/storage.h" namespace pikiwidb { +class RaftServiceImpl; enum TaskType { kCheckpoint = 0, kLoadDBFromCheckpoint, kEmpty }; @@ -37,13 +47,20 @@ struct TaskContext { bool sync = false; TaskContext() = delete; TaskContext(TaskType t, bool s = false) : type(t), sync(s) {} - TaskContext(TaskType t, int d, bool s = false) : type(t), db(d), sync(s) {} - TaskContext(TaskType t, int d, const std::map& a, bool s = false) + TaskContext(TaskType t, int32_t d, bool s = false) : type(t), db(d), sync(s) {} + TaskContext(TaskType t, int32_t d, const std::map& a, bool s = false) : type(t), db(d), args(a), sync(s) {} }; using TasksVector = std::vector; +enum PRaftErrorCode { + kErrorDisMatch = 0, + kErrorAddNode, + kErrorRemoveNode, + kErrorReDirect, +}; + class PStore { public: static PStore& Instance(); @@ -60,10 +77,38 @@ class PStore { int GetDBNumber() const { return db_number_; } + brpc::Server* GetRpcServer() const { return rpc_server_.get(); } + + const butil::EndPoint& GetEndPoint() const { return endpoint_; } + + /** + * return true if add group_id -> dbno into region_map_ success. + * return false if group_id -> dbno already exists in region_map_. + */ + bool AddRegion(const std::string& group_id, uint32_t dbno); + + /** + * return true if remove group_id -> dbno from region_map_ success. + * return false if group_id -> dbno do not exists in region_map_. + */ + bool RemoveRegion(const std::string& group_id); + + /** + * return nullptr if group_id -> dbno do not existed in region_map_. + */ + DB* GetDBByGroupID(const std::string& group_id) const; + private: PStore() = default; + int db_number_ = 0; std::vector> backends_; + butil::EndPoint endpoint_; + std::unique_ptr praft_service_{std::make_unique()}; + std::unique_ptr rpc_server_{std::make_unique()}; + + mutable std::shared_mutex rw_mutex_; + std::unordered_map region_map_; }; #define PSTORE PStore::Instance() diff --git a/tests/consistency_test.go b/tests/consistency_test.go index ccecce14d..c60d8b670 100644 --- a/tests/consistency_test.go +++ b/tests/consistency_test.go @@ -898,12 +898,12 @@ var _ = Describe("Consistency", Ordered, func() { } { // set write on leader - set, err := leader.SetEx(ctx, testKey, testValue, 3).Result() + set, err := leader.SetEx(ctx, testKey, testValue, 3*time.Second).Result() Expect(err).NotTo(HaveOccurred()) Expect(set).To(Equal("OK")) // read check - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) readChecker(func(c *redis.Client) { _, err := c.Get(ctx, testKey).Result() Expect(err).To(Equal(redis.Nil))