From 91dae07b16c47d84c5bc69e5c5a26f54b10bee1b Mon Sep 17 00:00:00 2001 From: longfar Date: Mon, 24 Jun 2024 20:20:12 +0800 Subject: [PATCH 01/10] refactor: remove the singleton of PRaft --- src/base_cmd.cc | 9 +++++---- src/client.cc | 8 +++++--- src/cmd_admin.cc | 20 ++++++++++++-------- src/cmd_admin.h | 3 +++ src/cmd_raft.cc | 36 ++++++++++++++++++++---------------- src/cmd_raft.h | 7 +++++++ src/db.cc | 17 ++++++++--------- src/db.h | 8 +++++--- src/pikiwidb.cc | 3 --- src/praft/praft.cc | 27 +++++++++++---------------- src/praft/praft.h | 24 +++++++++++------------- tests/consistency_test.go | 4 ++-- 12 files changed, 89 insertions(+), 77 deletions(-) diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 3c09991e7..b6beedc55 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -6,11 +6,11 @@ */ #include "base_cmd.h" -#include "common.h" #include "config.h" #include "log.h" #include "pikiwidb.h" #include "praft/praft.h" +#include "store.h" namespace pikiwidb { @@ -35,16 +35,17 @@ void BaseCmd::Execute(PClient* client) { DEBUG("execute command: {}", client->CmdName()); if (g_config.use_raft.load()) { + auto praft = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); // 1. If PRAFT is not initialized yet, return an error message to the client for both read and write commands. - if (!PRAFT.IsInitialized() && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) { + if (!praft->IsInitialized() && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) { DEBUG("drop command: {}", client->CmdName()); return client->SetRes(CmdRes::kErrOther, "PRAFT is not initialized"); } // 2. If PRAFT is initialized and the current node is not the leader, return a redirection message for write // commands. - if (HasFlag(kCmdFlagsWrite) && !PRAFT.IsLeader()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", PRAFT.GetLeaderAddress())); + if (HasFlag(kCmdFlagsWrite) && !praft->IsLeader()) { + return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", praft->GetLeaderAddress())); } } diff --git a/src/client.cc b/src/client.cc index fbb358779..70412c89a 100644 --- a/src/client.cc +++ b/src/client.cc @@ -18,6 +18,7 @@ #include "base_cmd.h" #include "config.h" #include "pikiwidb.h" +#include "store.h" namespace pikiwidb { @@ -269,7 +270,7 @@ int PClient::handlePacket(const char* start, int bytes) { if (isPeerMaster()) { if (isClusterCmdTarget()) { // Proccees the packet at one turn. - int len = PRAFT.ProcessClusterCmdResponse(this, start, bytes); // @todo + int len = PSTORE.GetBackend(dbno_)->GetPRaft()->ProcessClusterCmdResponse(this, start, bytes); // @todo if (len > 0) { return len; } @@ -456,7 +457,7 @@ void PClient::OnConnect() { } if (isClusterCmdTarget()) { - PRAFT.SendNodeRequest(this); + PSTORE.GetBackend(dbno_)->GetPRaft()->SendNodeRequest(this); } } else { if (g_config.password.empty()) { @@ -541,7 +542,8 @@ bool PClient::isPeerMaster() const { } bool PClient::isClusterCmdTarget() const { - return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort(); + auto praft = PSTORE.GetBackend(dbno_)->GetPRaft(); + return praft->GetClusterCmdCtx().GetPeerIp() == PeerIP() && praft->GetClusterCmdCtx().GetPort() == PeerPort(); } int PClient::uniqueID() const { diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 2112943d7..c153ed6fb 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -138,7 +138,10 @@ void PingCmd::DoCmd(PClient* client) { client->SetRes(CmdRes::kPong, "PONG"); } InfoCmd::InfoCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {} -bool InfoCmd::DoInitial(PClient* client) { return true; } +bool InfoCmd::DoInitial(PClient* client) { + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + return true; +} // @todo The info raft command is only supported for the time being void InfoCmd::DoCmd(PClient* client) { @@ -175,19 +178,20 @@ void InfoCmd::InfoRaft(PClient* client) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - if (!PRAFT.IsInitialized()) { + assert(praft_); + if (!praft_->IsInitialized()) { return client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); } - auto node_status = PRAFT.GetNodeStatus(); + auto node_status = praft_->GetNodeStatus(); if (node_status.state == braft::State::STATE_END) { return client->SetRes(CmdRes::kErrOther, "Node is not initialized"); } std::string message; - message += "raft_group_id:" + PRAFT.GetGroupID() + "\r\n"; - message += "raft_node_id:" + PRAFT.GetNodeID() + "\r\n"; - message += "raft_peer_id:" + PRAFT.GetPeerID() + "\r\n"; + message += "raft_group_id:" + praft_->GetGroupID() + "\r\n"; + message += "raft_node_id:" + praft_->GetNodeID() + "\r\n"; + message += "raft_peer_id:" + praft_->GetPeerID() + "\r\n"; if (braft::is_active_state(node_status.state)) { message += "raft_state:up\r\n"; } else { @@ -197,9 +201,9 @@ void InfoCmd::InfoRaft(PClient* client) { message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n"; - if (PRAFT.IsLeader()) { + if (praft_->IsLeader()) { std::vector peers; - auto status = PRAFT.GetListPeers(&peers); + auto status = praft_->GetListPeers(&peers); if (!status.ok()) { return client->SetRes(CmdRes::kErrOther, status.error_str()); } diff --git a/src/cmd_admin.h b/src/cmd_admin.h index c78164093..bd3e99589 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -124,6 +124,9 @@ class InfoCmd : public BaseCmd { void InfoRaft(PClient* client); void InfoData(PClient* client); + + private: + PRaft* praft_ = nullptr; }; class CmdDebug : public BaseCmdGroup { diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 9bfaabc19..af2a49619 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -11,7 +11,6 @@ #include #include -#include "net/event_loop.h" #include "praft/praft.h" #include "pstd/log.h" #include "pstd/pstd_string.h" @@ -20,6 +19,7 @@ #include "config.h" #include "pikiwidb.h" #include "replication.h" +#include "store.h" namespace pikiwidb { @@ -34,10 +34,12 @@ bool RaftNodeCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); return false; } + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); return true; } void RaftNodeCmd::DoCmd(PClient* client) { + assert(praft_); auto cmd = client->argv_[1]; pstd::StringToUpper(cmd); if (cmd == kAddCmd) { @@ -53,8 +55,8 @@ void RaftNodeCmd::DoCmd(PClient* client) { void RaftNodeCmd::DoCmdAdd(PClient* client) { // Check whether it is a leader. If it is not a leader, return the leader information - if (!PRAFT.IsLeader()) { - client->SetRes(CmdRes::kWrongLeader, PRAFT.GetLeaderID()); + if (!praft_->IsLeader()) { + client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); return; } @@ -65,7 +67,7 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. // So we do not need to parse and use nodeid like redis; - auto s = PRAFT.AddPeer(client->argv_[3]); + auto s = praft_->AddPeer(client->argv_[3]); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { @@ -75,7 +77,7 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { void RaftNodeCmd::DoCmdRemove(PClient* client) { // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (!PRAFT.IsInitialized()) { + if (!praft_->IsInitialized()) { client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); return; } @@ -86,9 +88,9 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } // Check whether it is a leader. If it is not a leader, send remove request to leader - if (!PRAFT.IsLeader()) { + if (!praft_->IsLeader()) { // Get the leader information - braft::PeerId leader_peer_id(PRAFT.GetLeaderID()); + braft::PeerId leader_peer_id(praft_->GetLeaderID()); // @todo There will be an unreasonable address, need to consider how to deal with it if (leader_peer_id.is_empty()) { client->SetRes(CmdRes::kErrOther, @@ -101,11 +103,11 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; auto peer_id = client->argv_[2]; auto ret = - PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); + praft_->GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); if (!ret) { // other clients have removed return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); + praft_->GetClusterCmdCtx().ConnectTargetNode(); INFO("Sent remove request to leader successfully"); // Not reply any message here, we will reply after the connection is established. @@ -113,7 +115,7 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { return; } - auto s = PRAFT.RemovePeer(client->argv_[2]); + auto s = praft_->RemovePeer(client->argv_[2]); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { @@ -122,7 +124,7 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } void RaftNodeCmd::DoCmdSnapshot(PClient* client) { - auto s = PRAFT.DoSnapshot(); + auto s = praft_->DoSnapshot(); if (s.ok()) { client->SetRes(CmdRes::kOK); } @@ -138,11 +140,13 @@ bool RaftClusterCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); return false; } + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); return true; } void RaftClusterCmd::DoCmd(PClient* client) { - if (PRAFT.IsInitialized()) { + assert(praft_); + if (praft_->IsInitialized()) { return client->SetRes(CmdRes::kErrOther, "Already cluster member"); } @@ -170,7 +174,7 @@ void RaftClusterCmd::DoCmdInit(PClient* client) { } else { cluster_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); } - auto s = PRAFT.Init(cluster_id, false); + auto s = praft_->Init(cluster_id, false); if (!s.ok()) { return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: ", s.error_str())); } @@ -190,7 +194,7 @@ static inline std::optional> GetIpAndPortFromEnd void RaftClusterCmd::DoCmdJoin(PClient* client) { // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (PRAFT.IsInitialized()) { + if (praft_->IsInitialized()) { return client->SetRes(CmdRes::kErrOther, "A node that has been added to a cluster must be removed \ from the old cluster before it can be added to the new cluster"); @@ -217,11 +221,11 @@ void RaftClusterCmd::DoCmdJoin(PClient* client) { auto& [peer_ip, port] = *ip_port; // Connect target - auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, client, std::move(peer_ip), port); + auto ret = praft_->GetClusterCmdCtx().Set(ClusterCmdType::kJoin, client, std::move(peer_ip), port); if (!ret) { // other clients have joined return client->SetRes(CmdRes::kErrOther, "Other clients have joined"); } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); + praft_->GetClusterCmdCtx().ConnectTargetNode(); INFO("Sent join request to leader successfully"); // Not reply any message here, we will reply after the connection is established. diff --git a/src/cmd_raft.h b/src/cmd_raft.h index 6a4c1f869..d57db1367 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -12,6 +12,7 @@ #include "base_cmd.h" namespace pikiwidb { +class PRaft; /* RAFT.NODE ADD [id] [address:port] * Add a new node to the cluster. The [id] can be an explicit non-zero value, @@ -47,6 +48,9 @@ class RaftNodeCmd : public BaseCmd { void DoCmdRemove(PClient *client); void DoCmdSnapshot(PClient *client); +private: + PRaft* praft_ = nullptr; + static constexpr std::string_view kAddCmd = "ADD"; static constexpr std::string_view kRemoveCmd = "REMOVE"; static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT"; @@ -76,6 +80,9 @@ class RaftClusterCmd : public BaseCmd { void DoCmdInit(PClient *client); void DoCmdJoin(PClient *client); +private: + PRaft* praft_ = nullptr; + static constexpr std::string_view kInitCmd = "INIT"; static constexpr std::string_view kJoinCmd = "JOIN"; }; diff --git a/src/db.cc b/src/db.cc index ac6974b5f..524af5b83 100644 --- a/src/db.cc +++ b/src/db.cc @@ -6,7 +6,6 @@ */ #include "db.h" -#include #include "config.h" #include "praft/praft.h" @@ -25,20 +24,19 @@ rocksdb::Status DB::Open() { storage::StorageOptions storage_options; storage_options.options = g_config.GetRocksDBOptions(); storage_options.table_options = g_config.GetRocksDBBlockBasedTableOptions(); - storage_options.options.ttl = g_config.rocksdb_ttl_second.load(std::memory_order_relaxed); storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second.load(std::memory_order_relaxed); - storage_options.small_compaction_threshold = g_config.small_compaction_threshold.load(); storage_options.small_compaction_duration_threshold = g_config.small_compaction_duration_threshold.load(); if (g_config.use_raft.load(std::memory_order_relaxed)) { - storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + storage_options.append_log_function = [&r = *praft_](const Binlog& log, std::promise&& promise) { r.AppendLog(log, std::move(promise)); }; - storage_options.do_snapshot_function = - std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2); + storage_options.do_snapshot_function = [&r = *praft_](int64_t self_snapshot_index, bool is_sync) { + return r.DoSnapshot(self_snapshot_index, is_sync); + }; } storage_options.db_instance_num = g_config.db_instance_num.load(); @@ -108,11 +106,12 @@ void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[ma storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second.load(std::memory_order_relaxed); if (g_config.use_raft.load(std::memory_order_relaxed)) { - storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + storage_options.append_log_function = [&r = *praft_](const Binlog& log, std::promise&& promise) { r.AppendLog(log, std::move(promise)); }; - storage_options.do_snapshot_function = - std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2); + storage_options.do_snapshot_function = [&r = *praft_](int64_t self_snapshot_index, bool is_sync) { + return r.DoSnapshot(self_snapshot_index, is_sync); + }; } storage_ = std::make_unique(); diff --git a/src/db.h b/src/db.h index c7508273e..fefb1ac92 100644 --- a/src/db.h +++ b/src/db.h @@ -7,11 +7,10 @@ #pragma once -#include +#include #include -#include "pstd/log.h" -#include "pstd/noncopyable.h" +#include "praft/praft.h" #include "storage/storage.h" namespace pikiwidb { @@ -39,6 +38,8 @@ class DB { int GetDbIndex() { return db_index_; } + PRaft* GetPRaft() { return praft_.get(); } + private: const int db_index_ = 0; const std::string db_path_; @@ -50,6 +51,7 @@ class DB { */ std::shared_mutex storage_mutex_; std::unique_ptr storage_; + std::unique_ptr praft_{std::make_unique()}; bool opened_ = false; }; diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index 0638c3c49..abc0aa3c6 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -198,9 +198,6 @@ void PikiwiDB::Run() { } void PikiwiDB::Stop() { - pikiwidb::PRAFT.ShutDown(); - pikiwidb::PRAFT.Join(); - pikiwidb::PRAFT.Clear(); slave_threads_.Exit(); worker_threads_.Exit(); cmd_threads_.Stop(); diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 26239c743..d665824f7 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -66,24 +66,19 @@ void ClusterCmdContext::ConnectTargetNode() { auto ip = PREPL.GetMasterAddr().GetIP(); auto port = PREPL.GetMasterAddr().GetPort(); if (ip == peer_ip_ && port == port_ && PREPL.GetMasterState() == kPReplStateConnected) { - PRAFT.SendNodeRequest(PREPL.GetMaster()); + praft_->SendNodeRequest(PREPL.GetMaster()); return; } // reconnect auto fail_cb = [&](EventLoop*, const char* peer_ip, int port) { - PRAFT.OnClusterCmdConnectionFailed(EventLoop::Self(), peer_ip, port); + praft_->OnClusterCmdConnectionFailed(EventLoop::Self(), peer_ip, port); }; PREPL.SetFailCallback(fail_cb); PREPL.SetMasterState(kPReplStateNone); PREPL.SetMasterAddr(peer_ip_.c_str(), port_); } -PRaft& PRaft::Instance() { - static PRaft store; - return store; -} - butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { if (node_ && server_) { return {0, "OK"}; @@ -92,7 +87,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { server_ = std::make_unique(); auto port = g_config.port + pikiwidb::g_config.raft_port_offset; // Add your service into RPC server - DummyServiceImpl service(&PRAFT); + DummyServiceImpl service(this); if (server_->AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { server_.reset(); return ERROR_LOG_AND_STATUS("Failed to add service"); @@ -300,10 +295,10 @@ int PRaft::ProcessClusterCmdResponse(PClient* client, const char* start, int len int ret = 0; switch (cluster_cmd_type) { case ClusterCmdType::kJoin: - ret = PRAFT.ProcessClusterJoinCmdResponse(client, start, len); + ret = ProcessClusterJoinCmdResponse(client, start, len); break; case ClusterCmdType::kRemove: - ret = PRAFT.ProcessClusterRemoveCmdResponse(client, start, len); + ret = ProcessClusterRemoveCmdResponse(client, start, len); break; default: client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER response supports JOIN/REMOVE only"); @@ -370,14 +365,14 @@ void PRaft::LeaderRedirection(PClient* join_client, const std::string& reply) { // Reset the target of the connection cluster_cmd_ctx_.Clear(); - auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, join_client, std::move(peer_ip), port); + auto ret = GetClusterCmdCtx().Set(ClusterCmdType::kJoin, join_client, std::move(peer_ip), port); if (!ret) { // other clients have joined join_client->SetRes(CmdRes::kErrOther, "Other clients have joined"); join_client->SendPacket(join_client->Message()); join_client->Clear(); return; } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); + GetClusterCmdCtx().ConnectTargetNode(); // Not reply any message here, we will reply after the connection is established. join_client->Clear(); @@ -392,7 +387,7 @@ void PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const if (group_id_end != std::string::npos) { std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); // initialize the slave node - auto s = PRAFT.Init(raft_group_id, true); + auto s = Init(raft_group_id, true); if (!s.ok()) { join_client->SetRes(CmdRes::kErrOther, s.error_str()); join_client->SendPacket(join_client->Message()); @@ -402,7 +397,7 @@ void PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const return; } - PRAFT.SendNodeAddRequest(client); + SendNodeAddRequest(client); } else { ERROR("Joined Raft cluster fail, because of invalid raft_group_id"); join_client->SetRes(CmdRes::kErrOther, "Invalid raft_group_id"); @@ -423,7 +418,7 @@ int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int std::string reply(start, len); if (reply.find(OK_STR) != std::string::npos) { - INFO("Joined Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); + INFO("Joined Raft cluster, node id: {}, group_id: {}", GetNodeID(), group_id_); join_client->SetRes(CmdRes::kOK); join_client->SendPacket(join_client->Message()); join_client->Clear(); @@ -457,7 +452,7 @@ int PRaft::ProcessClusterRemoveCmdResponse(PClient* client, const char* start, i std::string reply(start, len); if (reply.find(OK_STR) != std::string::npos) { - INFO("Removed Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); + INFO("Removed Raft cluster, node id: {}, group_id: {}", GetNodeID(), group_id_); ShutDown(); Join(); Clear(); diff --git a/src/praft/praft.h b/src/praft/praft.h index 65cc14d4f..98c237ec2 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -7,11 +7,9 @@ #pragma once -#include #include #include #include -#include #include #include "braft/file_system_adaptor.h" @@ -33,10 +31,9 @@ namespace pikiwidb { #define RAFT_GROUP_ID "raft_group_id:" #define NOT_LEADER "Not leader" -#define PRAFT PRaft::Instance() - class EventLoop; class Binlog; +class PRaft; enum ClusterCmdType { kNone, @@ -45,10 +42,8 @@ enum ClusterCmdType { }; class ClusterCmdContext { - friend class PRaft; - public: - ClusterCmdContext() = default; + ClusterCmdContext(PRaft* raft) : praft_(raft) {} ~ClusterCmdContext() = default; bool Set(ClusterCmdType cluster_cmd_type, PClient* client, std::string&& peer_ip, int port, @@ -68,6 +63,7 @@ class ClusterCmdContext { void ConnectTargetNode(); private: + PRaft* praft_; ClusterCmdType cluster_cmd_type_ = ClusterCmdType::kNone; std::mutex mtx_; PClient* client_ = nullptr; @@ -94,9 +90,11 @@ class PRaftWriteDoneClosure : public braft::Closure { class PRaft : public braft::StateMachine { public: PRaft() = default; - ~PRaft() override = default; - - static PRaft& Instance(); + ~PRaft() override { + ShutDown(); + Join(); + Clear(); + } //===--------------------------------------------------------------------===// // Braft API @@ -161,9 +159,9 @@ class PRaft : public braft::StateMachine { std::string raw_addr_; // ip:port of this node scoped_refptr snapshot_adaptor_ = nullptr; - ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command - std::string group_id_; // group id - int db_id_ = 0; // db_id + ClusterCmdContext cluster_cmd_ctx_{this}; // context for cluster join/remove command + std::string group_id_; // group id + int db_id_ = 0; // db_id }; } // namespace pikiwidb diff --git a/tests/consistency_test.go b/tests/consistency_test.go index 019b4a76b..8d8f3783f 100644 --- a/tests/consistency_test.go +++ b/tests/consistency_test.go @@ -775,12 +775,12 @@ var _ = Describe("Consistency", Ordered, func() { } { // set write on leader - set, err := leader.SetEx(ctx, testKey, testValue, 3).Result() + set, err := leader.SetEx(ctx, testKey, testValue, 3*time.Second).Result() Expect(err).NotTo(HaveOccurred()) Expect(set).To(Equal("OK")) // read check - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) readChecker(func(c *redis.Client) { _, err := c.Get(ctx, testKey).Result() Expect(err).To(Equal(redis.Nil)) From 55daab33aa48397ab0591c796851ddb5bc606c68 Mon Sep 17 00:00:00 2001 From: longfar Date: Fri, 28 Jun 2024 09:26:00 +0800 Subject: [PATCH 02/10] fix --- pikiwidb.conf | 6 +++--- src/cmd_raft.cc | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pikiwidb.conf b/pikiwidb.conf index 3f23870e6..72f8fbaa5 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -35,7 +35,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 16 +databases 1 ################################ SNAPSHOTTING ################################# # @@ -319,7 +319,7 @@ slowlog-max-len 128 # to the same DB is distributed among several RocksDB instances. # RocksDB instances number per DB -db-instance-num 3 +db-instance-num 1 # default is 86400 * 7 small-compaction-threshold 604800 # default is 86400 * 3 @@ -343,6 +343,6 @@ rocksdb-ttl-second 604800 rocksdb-periodic-second 259200; ############################### RAFT ############################### -use-raft no +use-raft yes # Braft relies on brpc to communicate via the default port number plus the port offset raft-port-offset 10 diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index af2a49619..89fa56ad2 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -176,7 +176,7 @@ void RaftClusterCmd::DoCmdInit(PClient* client) { } auto s = praft_->Init(cluster_id, false); if (!s.ok()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: ", s.error_str())); + return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: {}", s.error_str())); } client->SetRes(CmdRes::kOK); } From 4107a17bea1272282c5c8ec428ea7fbda4640968 Mon Sep 17 00:00:00 2001 From: longfar Date: Fri, 28 Jun 2024 11:56:39 +0800 Subject: [PATCH 03/10] refactor: database default number 1 --- src/config.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.h b/src/config.h index 7b1196388..ea22e16ec 100644 --- a/src/config.h +++ b/src/config.h @@ -151,10 +151,10 @@ class PConfig { AtomicString log_dir = "stdout"; // the log directory, differ from redis AtomicString log_level = "warning"; AtomicString run_id; - std::atomic databases = 16; + std::atomic databases = 1; std::atomic_uint32_t worker_threads_num = 2; std::atomic_uint32_t slave_threads_num = 2; - std::atomic db_instance_num = 3; + std::atomic db_instance_num = 1; std::atomic_bool use_raft = true; std::atomic_uint32_t rocksdb_max_subcompactions = 0; From 6ad5eae99488076f93fde6297e9c7f58742bf6fa Mon Sep 17 00:00:00 2001 From: longfar Date: Fri, 28 Jun 2024 17:41:13 +0800 Subject: [PATCH 04/10] feat: move rpc server to PStore from PRaft --- src/praft/praft.cc | 98 ++++++++++++---------------------------------- src/praft/praft.h | 8 +--- src/store.cc | 16 ++++++-- src/store.h | 13 ++++-- 4 files changed, 48 insertions(+), 87 deletions(-) diff --git a/src/praft/praft.cc b/src/praft/praft.cc index d665824f7..8e1e805c6 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -9,9 +9,9 @@ #include -#include "braft/snapshot.h" +#include "braft/raft.h" #include "braft/util.h" -#include "brpc/server.h" +#include "butil/endpoint.h" #include "pstd/log.h" #include "pstd/pstd_string.h" @@ -79,51 +79,11 @@ void ClusterCmdContext::ConnectTargetNode() { PREPL.SetMasterAddr(peer_ip_.c_str(), port_); } -butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { - if (node_ && server_) { +butil::Status PRaft::Init(const std::string& group_id, bool initial_conf_is_null) { + if (node_) { return {0, "OK"}; } - server_ = std::make_unique(); - auto port = g_config.port + pikiwidb::g_config.raft_port_offset; - // Add your service into RPC server - DummyServiceImpl service(this); - if (server_->AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to add service"); - } - // raft can share the same RPC server. Notice the second parameter, because - // adding services into a running server is not allowed and the listen - // address of this server is impossible to get before the server starts. You - // have to specify the address of the server. - if (braft::add_service(server_.get(), port) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to add raft service"); - } - - // It's recommended to start the server before Counter is started to avoid - // the case that it becomes the leader while the service is unreacheable by - // clients. - // Notice the default options of server is used here. Check out details from - // the doc of brpc if you would like change some options; - if (server_->Start(port, nullptr) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to start server"); - } - // It's ok to start PRaft; - assert(group_id.size() == RAFT_GROUPID_LEN); - this->group_id_ = group_id; - - // FIXME: g_config.ip is default to 127.0.0.0, which may not work in cluster. - raw_addr_ = g_config.ip.ToString() + ":" + std::to_string(port); - butil::ip_t ip; - auto ret = butil::str2ip(g_config.ip.ToString().c_str(), &ip); - if (ret != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to convert str_ip to butil::ip_t"); - } - butil::EndPoint addr(ip, port); - // Default init in one node. // initial_conf takes effect only when the replication group is started from an empty node. // The Configuration is restored from the snapshot and log files when the data in the replication group is not empty. @@ -132,34 +92,36 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { // Set initial_conf to empty for other nodes. // You can also start empty nodes simultaneously by setting the same inital_conf(ip:port of multiple nodes) for // multiple nodes. - std::string initial_conf; + braft::NodeOptions node_options; if (!initial_conf_is_null) { - initial_conf = raw_addr_ + ":0,"; - } - if (node_options_.initial_conf.parse_from(initial_conf) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to parse configuration"); + auto endpoint_str = butil::endpoint2str(PSTORE.GetEndPoint()); + std::string initial_conf = fmt::format("{}:0,", endpoint_str.c_str()); + if (node_options.initial_conf.parse_from(initial_conf) != 0) { + return ERROR_LOG_AND_STATUS("Failed to parse configuration"); + } } + node_options.fsm = this; + node_options.node_owns_fsm = false; + node_options.snapshot_interval_s = 0; + auto prefix = fmt::format("local://{}{}/{}", g_config.db_path.ToString(), "_praft", db_id_); + node_options.log_uri = prefix + "/log"; + node_options.raft_meta_uri = prefix + "/raft_meta"; + node_options.snapshot_uri = prefix + "/snapshot"; + snapshot_adaptor_ = new PPosixFileSystemAdaptor(); + node_options.snapshot_file_system_adaptor = &snapshot_adaptor_; + // node_options_.election_timeout_ms = FLAGS_election_timeout_ms; - node_options_.fsm = this; - node_options_.node_owns_fsm = false; - node_options_.snapshot_interval_s = 0; - std::string prefix = "local://" + g_config.db_path.ToString() + "_praft"; - node_options_.log_uri = prefix + "/log"; - node_options_.raft_meta_uri = prefix + "/raft_meta"; - node_options_.snapshot_uri = prefix + "/snapshot"; // node_options_.disable_cli = FLAGS_disable_cli; - snapshot_adaptor_ = new PPosixFileSystemAdaptor(); - node_options_.snapshot_file_system_adaptor = &snapshot_adaptor_; - node_ = std::make_unique("pikiwidb", braft::PeerId(addr)); // group_id - if (node_->init(node_options_) != 0) { - server_.reset(); + node_ = std::make_unique(group_id, braft::PeerId(PSTORE.GetEndPoint())); // group_id + if (node_->init(node_options) != 0) { node_.reset(); return ERROR_LOG_AND_STATUS("Failed to init raft node"); } + group_id_ = group_id; + INFO("Initialized praft successfully: node_id={}", GetNodeID()); return {0, "OK"}; } @@ -540,10 +502,6 @@ void PRaft::ShutDown() { if (node_) { node_->shutdown(nullptr); } - - if (server_) { - server_->Stop(0); - } } // Blocking this thread until the node is eventually down. @@ -551,10 +509,6 @@ void PRaft::Join() { if (node_) { node_->join(); } - - if (server_) { - server_->Join(); - } } void PRaft::AppendLog(const Binlog& log, std::promise&& promise) { @@ -580,10 +534,6 @@ void PRaft::Clear() { if (node_) { node_.reset(); } - - if (server_) { - server_.reset(); - } } void PRaft::on_apply(braft::Iterator& iter) { diff --git a/src/praft/praft.h b/src/praft/praft.h index 98c237ec2..9473b761a 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -99,7 +99,7 @@ class PRaft : public braft::StateMachine { //===--------------------------------------------------------------------===// // Braft API //===--------------------------------------------------------------------===// - butil::Status Init(std::string& group_id, bool initial_conf_is_null); + butil::Status Init(const std::string& group_id, bool initial_conf_is_null); butil::Status AddPeer(const std::string& peer); butil::Status RemovePeer(const std::string& peer); butil::Status DoSnapshot(int64_t self_snapshot_index = 0, bool is_sync = true); @@ -136,7 +136,7 @@ class PRaft : public braft::StateMachine { braft::NodeStatus GetNodeStatus() const; butil::Status GetListPeers(std::vector* peers); - bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; } + bool IsInitialized() const { return node_ != nullptr; } private: void on_apply(braft::Iterator& iter) override; @@ -153,11 +153,7 @@ class PRaft : public braft::StateMachine { void on_start_following(const ::braft::LeaderChangeContext& ctx) override; private: - std::unique_ptr server_{nullptr}; // brpc std::unique_ptr node_{nullptr}; - braft::NodeOptions node_options_; // options for raft node - std::string raw_addr_; // ip:port of this node - scoped_refptr snapshot_adaptor_ = nullptr; ClusterCmdContext cluster_cmd_ctx_{this}; // context for cluster join/remove command std::string group_id_; // group id diff --git a/src/store.cc b/src/store.cc index 24a0f2ba1..6dd441f14 100644 --- a/src/store.cc +++ b/src/store.cc @@ -7,9 +7,6 @@ #include "store.h" -#include -#include - #include "config.h" #include "db.h" #include "pstd/log.h" @@ -33,6 +30,19 @@ void PStore::Init(int db_number) { backends_.push_back(std::move(db)); INFO("Open DB_{} success!", i); } + auto ip = g_config.ip.ToString(); + butil::ip_t rpc_ip; + butil::str2ip(ip.c_str(), &rpc_ip); + auto rpc_port = + g_config.port.load(std::memory_order_relaxed) + g_config.raft_port_offset.load(std::memory_order_relaxed); + endpoint_ = butil::EndPoint(rpc_ip, rpc_port); + if (braft::add_service(GetRpcServer(), endpoint_) != 0) { + return ERROR("Failed to add raft service to rpc server"); + } + if (rpc_server_->Start(endpoint_, nullptr) != 0) { + return ERROR("Failed to start rpc server"); + } + INFO("Started RPC server successfully"); INFO("STORE Init success!"); } diff --git a/src/store.h b/src/store.h index e7daf3d46..ae855ba73 100644 --- a/src/store.h +++ b/src/store.h @@ -9,13 +9,13 @@ #define GLOG_NO_ABBREVIATED_SEVERITIES -#include -#include +#include #include -#include "common.h" +#include "brpc/server.h" +#include "butil/endpoint.h" + #include "db.h" -#include "storage/storage.h" namespace pikiwidb { @@ -54,11 +54,16 @@ class PStore { void HandleTaskSpecificDB(const TasksVector& tasks); int GetDBNumber() const { return db_number_; } + brpc::Server* GetRpcServer() const { return rpc_server_.get(); } + const butil::EndPoint& GetEndPoint() const { return endpoint_; } private: PStore() = default; + int db_number_ = 0; std::vector> backends_; + butil::EndPoint endpoint_; + std::unique_ptr rpc_server_{std::make_unique()}; }; #define PSTORE PStore::Instance() From ca4acde8888692ed9c5d797a36553be39a441d04 Mon Sep 17 00:00:00 2001 From: longfar Date: Mon, 8 Jul 2024 19:45:33 +0800 Subject: [PATCH 05/10] tmp --- src/cmd_raft.cc | 44 +++++++++++++++++++----------- src/cmd_raft.h | 15 ++++++----- src/config.h | 2 +- src/praft/praft.cc | 67 ++++++++++++++++++++++++++-------------------- src/praft/praft.h | 2 +- src/store.h | 17 ++++++++++++ 6 files changed, 93 insertions(+), 54 deletions(-) diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 89fa56ad2..2943dcc54 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -34,6 +34,7 @@ bool RaftNodeCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); return false; } + group_id_ = client->argv_[2]; praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); return true; } @@ -47,6 +48,7 @@ void RaftNodeCmd::DoCmd(PClient* client) { } else if (cmd == kRemoveCmd) { DoCmdRemove(client); } else if (cmd == kDoSnapshot) { + assert(0); // TODO(longfar): add group id in arguments DoCmdSnapshot(client); } else { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); @@ -54,8 +56,11 @@ void RaftNodeCmd::DoCmd(PClient* client) { } void RaftNodeCmd::DoCmdAdd(PClient* client) { + auto db = PSTORE.GetDBByGroupID(group_id_); + assert(db); + auto praft = db->GetPRaft(); // Check whether it is a leader. If it is not a leader, return the leader information - if (!praft_->IsLeader()) { + if (!praft->IsLeader()) { client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); return; } @@ -67,7 +72,7 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. // So we do not need to parse and use nodeid like redis; - auto s = praft_->AddPeer(client->argv_[3]); + auto s = praft->AddPeer(client->argv_[3]); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { @@ -164,21 +169,22 @@ void RaftClusterCmd::DoCmdInit(PClient* client) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - std::string cluster_id; + std::string group_id; if (client->argv_.size() == 3) { - cluster_id = client->argv_[2]; - if (cluster_id.size() != RAFT_GROUPID_LEN) { + group_id = client->argv_[2]; + if (group_id.size() != RAFT_GROUPID_LEN) { return client->SetRes(CmdRes::kInvalidParameter, "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } } else { - cluster_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); + group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); } - auto s = praft_->Init(cluster_id, false); + auto s = praft_->Init(group_id, false); if (!s.ok()) { return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: {}", s.error_str())); } - client->SetRes(CmdRes::kOK); + PSTORE.AddRegion(praft_->GetGroupID(), client->GetCurrentDB()); + client->SetLineString(fmt::format("+OK {}", group_id)); } static inline std::optional> GetIpAndPortFromEndPoint(const std::string& endpoint) { @@ -200,16 +206,22 @@ void RaftClusterCmd::DoCmdJoin(PClient* client) { from the old cluster before it can be added to the new cluster"); } - if (client->argv_.size() < 3) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); - } + // if (client->argv_.size() < 3) { + // return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + // } - // (KKorpse)TODO: Support multiple nodes join at the same time. - if (client->argv_.size() > 3) { - return client->SetRes(CmdRes::kInvalidParameter, "Too many arguments"); - } + // // (KKorpse)TODO: Support multiple nodes join at the same time. + // if (client->argv_.size() > 3) { + // return client->SetRes(CmdRes::kInvalidParameter, "Too many arguments"); + // } + assert(client->argv_.size() == 4); + auto group_id = client->argv_[2]; + auto addr = client->argv_[3]; + + // init raft + auto s = praft_->Init(group_id, true); + assert(s.ok()); - auto addr = client->argv_[2]; if (braft::PeerId(addr).is_empty()) { return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); } diff --git a/src/cmd_raft.h b/src/cmd_raft.h index d57db1367..ceb68f147 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -14,7 +14,8 @@ namespace pikiwidb { class PRaft; -/* RAFT.NODE ADD [id] [address:port] +/* + * RAFT.NODE ADD [id] [address:port] * Add a new node to the cluster. The [id] can be an explicit non-zero value, * or zero to let the cluster choose one. * Reply: @@ -50,21 +51,21 @@ class RaftNodeCmd : public BaseCmd { private: PRaft* praft_ = nullptr; + std::string group_id_; static constexpr std::string_view kAddCmd = "ADD"; static constexpr std::string_view kRemoveCmd = "REMOVE"; static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT"; }; -/* RAFT.CLUSTER INIT +/* + * RAFT.CLUSTER INIT [group_id] * Initializes a new Raft cluster. - * is an optional 32 character string, if set, cluster will use it for the id * Reply: - * +OK [group_id] + * +OK * - * RAFT.CLUSTER JOIN [addr:port] - * Join an existing cluster. - * The operation is asynchronous and may take place/retry in the background. + * RAFT.CLUSTER JOIN + * Join an existing cluster. The operation is asynchronous and may take place/retry in the background. * Reply: * +OK */ diff --git a/src/config.h b/src/config.h index ea22e16ec..d59f44c3c 100644 --- a/src/config.h +++ b/src/config.h @@ -151,7 +151,7 @@ class PConfig { AtomicString log_dir = "stdout"; // the log directory, differ from redis AtomicString log_level = "warning"; AtomicString run_id; - std::atomic databases = 1; + std::atomic databases = 2; std::atomic_uint32_t worker_threads_num = 2; std::atomic_uint32_t slave_threads_num = 2; std::atomic db_instance_num = 1; diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 8e1e805c6..27e0737a5 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -121,7 +121,7 @@ butil::Status PRaft::Init(const std::string& group_id, bool initial_conf_is_null } group_id_ = group_id; - INFO("Initialized praft successfully: node_id={}", GetNodeID()); + INFO("Initialized praft successfully: group_id_={}, node_id={}", group_id_, GetNodeID()); return {0, "OK"}; } @@ -203,9 +203,10 @@ void PRaft::SendNodeRequest(PClient* client) { auto cluster_cmd_type = cluster_cmd_ctx_.GetClusterCmdType(); switch (cluster_cmd_type) { - case ClusterCmdType::kJoin: - SendNodeInfoRequest(client, "DATA"); - break; + case ClusterCmdType::kJoin: { + // SendNodeInfoRequest(client, "DATA"); + SendNodeAddRequest(client); + } break; case ClusterCmdType::kRemove: SendNodeRemoveRequest(client); break; @@ -228,16 +229,17 @@ void PRaft::SendNodeAddRequest(PClient* client) { assert(client); // Node id in braft are ip:port, the node id param in RAFT.NODE ADD cmd will be ignored. - int unused_node_id = 0; auto port = g_config.port + pikiwidb::g_config.raft_port_offset; auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port); - UnboundedBuffer req; - req.PushData("RAFT.NODE ADD ", 14); - req.PushData(std::to_string(unused_node_id).c_str(), std::to_string(unused_node_id).size()); - req.PushData(" ", 1); - req.PushData(raw_addr.data(), raw_addr.size()); - req.PushData("\r\n", 2); - client->SendPacket(req); + auto msg = fmt::format("RAFT.NODE ADD {} {}\r\n", group_id_, raw_addr); + client->SendPacket(msg); + // UnboundedBuffer req; + // req.PushData("RAFT.NODE ADD ", 14); + // req.PushData(std::to_string(unused_node_id).c_str(), std::to_string(unused_node_id).size()); + // req.PushData(" ", 1); + // req.PushData(raw_addr.data(), raw_addr.size()); + // req.PushData("\r\n", 2); + // client->SendPacket(req); client->Clear(); } @@ -340,34 +342,36 @@ void PRaft::LeaderRedirection(PClient* join_client, const std::string& reply) { join_client->Clear(); } -void PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply) { +bool PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply) { std::string prefix = RAFT_GROUP_ID; std::string::size_type prefix_length = prefix.length(); std::string::size_type group_id_start = reply.find(prefix); group_id_start += prefix_length; // locate the start location of "raft_group_id" std::string::size_type group_id_end = reply.find("\r\n", group_id_start); - if (group_id_end != std::string::npos) { - std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); - // initialize the slave node - auto s = Init(raft_group_id, true); - if (!s.ok()) { - join_client->SetRes(CmdRes::kErrOther, s.error_str()); - join_client->SendPacket(join_client->Message()); - join_client->Clear(); - // If the join fails, clear clusterContext and set it again by using the join command - cluster_cmd_ctx_.Clear(); - return; - } - - SendNodeAddRequest(client); - } else { + if (group_id_end == std::string::npos) { // can't find group id ERROR("Joined Raft cluster fail, because of invalid raft_group_id"); join_client->SetRes(CmdRes::kErrOther, "Invalid raft_group_id"); join_client->SendPacket(join_client->Message()); join_client->Clear(); // If the join fails, clear clusterContext and set it again by using the join command cluster_cmd_ctx_.Clear(); + return false; } + + std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); + // initialize the slave node + auto s = Init(raft_group_id, true); + if (!s.ok()) { + join_client->SetRes(CmdRes::kErrOther, s.error_str()); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + ERROR("Failed to init raft: {}", s.error_cstr()); + return false; + } + INFO("Init raft successfully, groupid={}", raft_group_id); + return true; } int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len) { @@ -391,7 +395,12 @@ int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int } else if (reply.find(WRONG_LEADER) != std::string::npos) { LeaderRedirection(join_client, reply); } else if (reply.find(RAFT_GROUP_ID) != std::string::npos) { - InitializeNodeBeforeAdd(client, join_client, reply); + auto res = InitializeNodeBeforeAdd(client, join_client, reply); + if (!res) { + ERROR("Failed to initialize node before add"); + return len; + } + SendNodeAddRequest(client); } else { ERROR("Joined Raft cluster fail, str: {}", reply); join_client->SetRes(CmdRes::kErrOther, reply); diff --git a/src/praft/praft.h b/src/praft/praft.h index 9473b761a..3703a4a5e 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -121,7 +121,7 @@ class PRaft : public braft::StateMachine { int ProcessClusterCmdResponse(PClient* client, const char* start, int len); void CheckRocksDBConfiguration(PClient* client, PClient* join_client, const std::string& reply); void LeaderRedirection(PClient* join_client, const std::string& reply); - void InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply); + bool InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply); int ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len); int ProcessClusterRemoveCmdResponse(PClient* client, const char* start, int len); diff --git a/src/store.h b/src/store.h index ae855ba73..aacd2b553 100644 --- a/src/store.h +++ b/src/store.h @@ -7,6 +7,11 @@ #pragma once +#include +#include +#include +#include +#include "praft/praft.h" #define GLOG_NO_ABBREVIATED_SEVERITIES #include @@ -56,6 +61,17 @@ class PStore { int GetDBNumber() const { return db_number_; } brpc::Server* GetRpcServer() const { return rpc_server_.get(); } const butil::EndPoint& GetEndPoint() const { return endpoint_; } + void AddRegion(const std::string& group_id, uint32_t dbno) { + assert(!db_map_.contains(group_id)); + db_map_.emplace(group_id, dbno); + } + DB* GetDBByGroupID(const std::string& group_id) const { + auto it = db_map_.find(group_id); + if (it == db_map_.end()) { + return nullptr; + } + return backends_[it->second].get(); + } private: PStore() = default; @@ -64,6 +80,7 @@ class PStore { std::vector> backends_; butil::EndPoint endpoint_; std::unique_ptr rpc_server_{std::make_unique()}; + std::unordered_map db_map_; }; #define PSTORE PStore::Instance() From 78a16bc93bcbf4bf48c30df31a9dfef2937bdbbf Mon Sep 17 00:00:00 2001 From: longfar Date: Sat, 13 Jul 2024 14:24:52 +0800 Subject: [PATCH 06/10] tmp --- src/cmd_admin.cc | 2 +- src/cmd_raft.cc | 4 ++-- src/db.cc | 4 +++- src/db.h | 2 +- src/praft/praft.cc | 15 +++++---------- src/praft/praft.h | 4 ++-- src/store.h | 4 ++-- 7 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index c153ed6fb..205d37cf7 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -95,7 +95,7 @@ void FlushallCmd::DoCmd(PClient* client) { } SelectCmd::SelectCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {} + : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {} bool SelectCmd::DoInitial(PClient* client) { return true; } diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 2943dcc54..9e1778e23 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -48,7 +48,7 @@ void RaftNodeCmd::DoCmd(PClient* client) { } else if (cmd == kRemoveCmd) { DoCmdRemove(client); } else if (cmd == kDoSnapshot) { - assert(0); // TODO(longfar): add group id in arguments + assert(0); // TODO(longfar): add group id in arguments DoCmdSnapshot(client); } else { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); @@ -56,6 +56,7 @@ void RaftNodeCmd::DoCmd(PClient* client) { } void RaftNodeCmd::DoCmdAdd(PClient* client) { + DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); auto db = PSTORE.GetDBByGroupID(group_id_); assert(db); auto praft = db->GetPRaft(); @@ -238,7 +239,6 @@ void RaftClusterCmd::DoCmdJoin(PClient* client) { return client->SetRes(CmdRes::kErrOther, "Other clients have joined"); } praft_->GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent join request to leader successfully"); // Not reply any message here, we will reply after the connection is established. client->Clear(); diff --git a/src/db.cc b/src/db.cc index 524af5b83..cb8f28a1c 100644 --- a/src/db.cc +++ b/src/db.cc @@ -16,7 +16,9 @@ extern pikiwidb::PConfig g_config; namespace pikiwidb { DB::DB(int db_index, const std::string& db_path) - : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {} + : db_index_(db_index), + db_path_(db_path + std::to_string(db_index_) + '/'), + praft_(std::make_unique(db_index)) {} DB::~DB() { INFO("DB{} is closing...", db_index_); } diff --git a/src/db.h b/src/db.h index fefb1ac92..2fbdd74a9 100644 --- a/src/db.h +++ b/src/db.h @@ -51,7 +51,7 @@ class DB { */ std::shared_mutex storage_mutex_; std::unique_ptr storage_; - std::unique_ptr praft_{std::make_unique()}; + std::unique_ptr praft_{nullptr}; bool opened_ = false; }; diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 27e0737a5..b7ad09f44 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -114,14 +114,14 @@ butil::Status PRaft::Init(const std::string& group_id, bool initial_conf_is_null // node_options_.election_timeout_ms = FLAGS_election_timeout_ms; // node_options_.disable_cli = FLAGS_disable_cli; - node_ = std::make_unique(group_id, braft::PeerId(PSTORE.GetEndPoint())); // group_id + node_ = std::make_unique(group_id, braft::PeerId(PSTORE.GetEndPoint(), db_id_)); // group_id if (node_->init(node_options) != 0) { node_.reset(); return ERROR_LOG_AND_STATUS("Failed to init raft node"); } group_id_ = group_id; - INFO("Initialized praft successfully: group_id_={}, node_id={}", group_id_, GetNodeID()); + INFO("Initialized praft successfully: node_id={}", GetNodeID()); return {0, "OK"}; } @@ -205,7 +205,8 @@ void PRaft::SendNodeRequest(PClient* client) { switch (cluster_cmd_type) { case ClusterCmdType::kJoin: { // SendNodeInfoRequest(client, "DATA"); - SendNodeAddRequest(client); + SendNodeInfoRequest(client, "DATA"); + // SendNodeAddRequest(client); } break; case ClusterCmdType::kRemove: SendNodeRemoveRequest(client); @@ -233,13 +234,7 @@ void PRaft::SendNodeAddRequest(PClient* client) { auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port); auto msg = fmt::format("RAFT.NODE ADD {} {}\r\n", group_id_, raw_addr); client->SendPacket(msg); - // UnboundedBuffer req; - // req.PushData("RAFT.NODE ADD ", 14); - // req.PushData(std::to_string(unused_node_id).c_str(), std::to_string(unused_node_id).size()); - // req.PushData(" ", 1); - // req.PushData(raw_addr.data(), raw_addr.size()); - // req.PushData("\r\n", 2); - // client->SendPacket(req); + INFO("Sent join request to leader successfully"); client->Clear(); } diff --git a/src/praft/praft.h b/src/praft/praft.h index 3703a4a5e..d71e255d9 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -89,7 +89,7 @@ class PRaftWriteDoneClosure : public braft::Closure { class PRaft : public braft::StateMachine { public: - PRaft() = default; + PRaft(uint32_t db_id) : db_id_(db_id) {} ~PRaft() override { ShutDown(); Join(); @@ -157,7 +157,7 @@ class PRaft : public braft::StateMachine { scoped_refptr snapshot_adaptor_ = nullptr; ClusterCmdContext cluster_cmd_ctx_{this}; // context for cluster join/remove command std::string group_id_; // group id - int db_id_ = 0; // db_id + uint32_t db_id_ = 0; // db_id }; } // namespace pikiwidb diff --git a/src/store.h b/src/store.h index aacd2b553..751771199 100644 --- a/src/store.h +++ b/src/store.h @@ -37,8 +37,8 @@ struct TaskContext { bool sync = false; TaskContext() = delete; TaskContext(TaskType t, bool s = false) : type(t), sync(s) {} - TaskContext(TaskType t, int d, bool s = false) : type(t), db(d), sync(s) {} - TaskContext(TaskType t, int d, const std::map& a, bool s = false) + TaskContext(TaskType t, uint32_t d, bool s = false) : type(t), db(d), sync(s) {} + TaskContext(TaskType t, uint32_t d, const std::map& a, bool s = false) : type(t), db(d), args(a), sync(s) {} }; From f15e146019dbd1671b5d09bb9d452f05fdc1a873 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Sun, 28 Jul 2024 13:50:40 +0800 Subject: [PATCH 07/10] feat: brpc com --- pikiwidb.conf | 2 +- src/CMakeLists.txt | 1 + src/cmd_raft.cc | 76 +++++++++++++++++++++++--------------- src/praft/praft.cc | 30 ++++++++++++--- src/praft/praft.h | 1 + src/praft/praft.proto | 18 ++++++--- src/praft/praft_service.cc | 36 ++++++++++++++++++ src/praft/praft_service.h | 14 +++---- src/store.cc | 7 +++- src/store.h | 5 +++ 10 files changed, 139 insertions(+), 51 deletions(-) create mode 100644 src/praft/praft_service.cc diff --git a/pikiwidb.conf b/pikiwidb.conf index 72f8fbaa5..3570d42a7 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -35,7 +35,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 1 +databases 2 ################################ SNAPSHOTTING ################################# # diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cc032067a..8726ea81e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -29,6 +29,7 @@ TARGET_INCLUDE_DIRECTORIES(pikiwidb PRIVATE ${rocksdb_SOURCE_DIR}/include PRIVATE ${BRAFT_INCLUDE_DIR} PRIVATE ${BRPC_INCLUDE_DIR} + PRIVATE ${PROTO_OUTPUT_DIR} ) TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; pstd braft brpc ssl crypto zlib protobuf leveldb gflags z praft praft_pb "${LIB}") diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 9e1778e23..71d2153e7 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -10,10 +10,13 @@ #include #include #include +#include #include "praft/praft.h" #include "pstd/log.h" #include "pstd/pstd_string.h" +#include "brpc/channel.h" +#include "praft.pb.h" #include "client.h" #include "config.h" @@ -180,11 +183,11 @@ void RaftClusterCmd::DoCmdInit(PClient* client) { } else { group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); } + PSTORE.AddRegion(group_id, client->GetCurrentDB()); auto s = praft_->Init(group_id, false); if (!s.ok()) { return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: {}", s.error_str())); } - PSTORE.AddRegion(praft_->GetGroupID(), client->GetCurrentDB()); client->SetLineString(fmt::format("+OK {}", group_id)); } @@ -200,48 +203,61 @@ static inline std::optional> GetIpAndPortFromEnd } void RaftClusterCmd::DoCmdJoin(PClient* client) { - // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (praft_->IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, - "A node that has been added to a cluster must be removed \ - from the old cluster before it can be added to the new cluster"); - } - - // if (client->argv_.size() < 3) { - // return client->SetRes(CmdRes::kWrongNum, client->CmdName()); - // } - - // // (KKorpse)TODO: Support multiple nodes join at the same time. - // if (client->argv_.size() > 3) { - // return client->SetRes(CmdRes::kInvalidParameter, "Too many arguments"); - // } assert(client->argv_.size() == 4); auto group_id = client->argv_[2]; auto addr = client->argv_[3]; - // init raft - auto s = praft_->Init(group_id, true); - assert(s.ok()); - - if (braft::PeerId(addr).is_empty()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); + if (group_id.size() != RAFT_GROUPID_LEN) { + return client->SetRes(CmdRes::kInvalidParameter, + "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } - auto ip_port = GetIpAndPortFromEndPoint(addr); if (!ip_port.has_value()) { return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); } - auto& [peer_ip, port] = *ip_port; + auto& [ip, port] = *ip_port; + PSTORE.AddRegion(group_id, client->GetCurrentDB()); + auto s = praft_->Init(group_id, true); + assert(s.ok()); - // Connect target - auto ret = praft_->GetClusterCmdCtx().Set(ClusterCmdType::kJoin, client, std::move(peer_ip), port); - if (!ret) { // other clients have joined - return client->SetRes(CmdRes::kErrOther, "Other clients have joined"); + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = 200; + + brpc::Channel add_node_channel; + if (0 != add_node_channel.Init(addr.c_str(), &options)) { + ERROR("Fail to init add_node_channel to praft service!"); + // 失败的情况下,应该取消 Init。 + // 并且 Remove Region。 + client->SetRes(CmdRes::kErrOther, "Fail to init channel."); + return; + } + + brpc::Controller cntl; + NodeAddRequest request; + NodeAddResponse response; + auto end_point = fmt::format("{}:{}", ip, std::to_string(port)); + request.set_groupid(group_id); + request.set_endpoint(end_point); + request.set_index(client->GetCurrentDB()); + request.set_role(0); // 0 : !witness + PRaftService_Stub stub(&add_node_channel); + stub.AddNode(&cntl, &request, &response, NULL); + + if (cntl.Failed()) { + client->SetRes(CmdRes::kErrOther, "Failed to send add node rpc"); + return; + } + if (response.success()) { + client->SetRes(CmdRes::kOK, "Add Node Success"); + return; } - praft_->GetClusterCmdCtx().ConnectTargetNode(); + // 这里需要删除 Region。并且取消 初始化 。 + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); // Not reply any message here, we will reply after the connection is established. - client->Clear(); + // client->Clear(); } } // namespace pikiwidb diff --git a/src/praft/praft.cc b/src/praft/praft.cc index b7ad09f44..e9213e7aa 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -66,6 +66,7 @@ void ClusterCmdContext::ConnectTargetNode() { auto ip = PREPL.GetMasterAddr().GetIP(); auto port = PREPL.GetMasterAddr().GetPort(); if (ip == peer_ip_ && port == port_ && PREPL.GetMasterState() == kPReplStateConnected) { + std::cout<<"已经建立连接了, 直接发送" <SendNodeRequest(PREPL.GetMaster()); return; } @@ -95,7 +96,7 @@ butil::Status PRaft::Init(const std::string& group_id, bool initial_conf_is_null braft::NodeOptions node_options; if (!initial_conf_is_null) { auto endpoint_str = butil::endpoint2str(PSTORE.GetEndPoint()); - std::string initial_conf = fmt::format("{}:0,", endpoint_str.c_str()); + std::string initial_conf = fmt::format("{}:{},", endpoint_str.c_str(), db_id_); if (node_options.initial_conf.parse_from(initial_conf) != 0) { return ERROR_LOG_AND_STATUS("Failed to parse configuration"); } @@ -120,7 +121,6 @@ butil::Status PRaft::Init(const std::string& group_id, bool initial_conf_is_null return ERROR_LOG_AND_STATUS("Failed to init raft node"); } group_id_ = group_id; - INFO("Initialized praft successfully: node_id={}", GetNodeID()); return {0, "OK"}; } @@ -205,8 +205,8 @@ void PRaft::SendNodeRequest(PClient* client) { switch (cluster_cmd_type) { case ClusterCmdType::kJoin: { // SendNodeInfoRequest(client, "DATA"); - SendNodeInfoRequest(client, "DATA"); - // SendNodeAddRequest(client); + // SendNodeInfoRequest(client, "DATA"); + SendNodeAddRequest(client); } break; case ClusterCmdType::kRemove: SendNodeRemoveRequest(client); @@ -231,7 +231,7 @@ void PRaft::SendNodeAddRequest(PClient* client) { // Node id in braft are ip:port, the node id param in RAFT.NODE ADD cmd will be ignored. auto port = g_config.port + pikiwidb::g_config.raft_port_offset; - auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port); + auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port) + ":" + std::to_string(db_id_); auto msg = fmt::format("RAFT.NODE ADD {} {}\r\n", group_id_, raw_addr); client->SendPacket(msg); INFO("Sent join request to leader successfully"); @@ -460,6 +460,26 @@ butil::Status PRaft::AddPeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::AddPeer(const std::string& endpoint, int index) { + if (!node_) { + ERROR_LOG_AND_STATUS("Node is not initialized"); + } + + braft::SynchronizedClosure done; + butil::EndPoint ep; + butil::str2endpoint(endpoint.c_str(), &ep); + braft::PeerId peer_id(ep, index); + node_->add_peer(peer_id, &done); + done.wait(); + + if (!done.status().ok()) { + // WARN("Failed to add peer {} to node {}, status: {}", end_point, node_->node_id().to_string(), done.status().error_str()); + WARN("Failed to add"); + return done.status(); + } + return done.status(); +} + butil::Status PRaft::RemovePeer(const std::string& peer) { if (!node_) { return ERROR_LOG_AND_STATUS("Node is not initialized"); diff --git a/src/praft/praft.h b/src/praft/praft.h index d71e255d9..b2a9ec266 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -101,6 +101,7 @@ class PRaft : public braft::StateMachine { //===--------------------------------------------------------------------===// butil::Status Init(const std::string& group_id, bool initial_conf_is_null); butil::Status AddPeer(const std::string& peer); + butil::Status AddPeer(const std::string& endpoint, int index); butil::Status RemovePeer(const std::string& peer); butil::Status DoSnapshot(int64_t self_snapshot_index = 0, bool is_sync = true); diff --git a/src/praft/praft.proto b/src/praft/praft.proto index 61a495f21..24bb42580 100644 --- a/src/praft/praft.proto +++ b/src/praft/praft.proto @@ -2,12 +2,18 @@ syntax="proto3"; package pikiwidb; option cc_generic_services = true; -message DummyRequest { -}; +message NodeAddRequest { + string GroupID = 1; + string EndPoint = 2; + uint32 index = 3; + uint32 role = 4; +} -message DummyResponse { -}; +message NodeAddResponse { + bool success = 1; +} -service DummyService { - rpc DummyMethod(DummyRequest) returns (DummyResponse); +service PRaftService { + rpc AddNode(NodeAddRequest) returns (NodeAddResponse); + }; diff --git a/src/praft/praft_service.cc b/src/praft/praft_service.cc new file mode 100644 index 000000000..b3413705f --- /dev/null +++ b/src/praft/praft_service.cc @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "praft_service.h" + +#include "fmt/format.h" +#include "store.h" + + +namespace pikiwidb { + void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, + const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, + ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto groupid = request->groupid(); + auto db_ptr = PSTORE.GetDBByGroupID(groupid); + auto praft_ptr = db_ptr->GetPRaft(); + auto end_point = request->endpoint(); + auto index = request->index(); + auto role = request->role(); + + auto status = praft_ptr->AddPeer(end_point, index); + if (!status.ok()) { + std::cout<<"add node fail!"<set_success(false); + return; + } + std::cout<<"add node success!"<set_success(true); +} +} \ No newline at end of file diff --git a/src/praft/praft_service.h b/src/praft/praft_service.h index d7b655a21..000e334b8 100644 --- a/src/praft/praft_service.h +++ b/src/praft/praft_service.h @@ -12,15 +12,13 @@ namespace pikiwidb { class PRaft; - -class DummyServiceImpl : public DummyService { +class PRaftServiceImpl : public PRaftService { public: - explicit DummyServiceImpl(PRaft* praft) : praft_(praft) {} - void DummyMethod(::google::protobuf::RpcController* controller, const ::pikiwidb::DummyRequest* request, - ::pikiwidb::DummyResponse* response, ::google::protobuf::Closure* done) override {} - - private: - PRaft* praft_ = nullptr; + PRaftServiceImpl() = default; + void AddNode(::google::protobuf::RpcController* controller, + const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, + ::google::protobuf::Closure* done); }; } // namespace pikiwidb diff --git a/src/store.cc b/src/store.cc index 6dd441f14..acad88d42 100644 --- a/src/store.cc +++ b/src/store.cc @@ -11,9 +11,9 @@ #include "db.h" #include "pstd/log.h" #include "pstd/pstd_string.h" +#include "praft/praft_service.h" namespace pikiwidb { - PStore::~PStore() { INFO("STORE is closing..."); } PStore& PStore::Instance() { @@ -30,6 +30,7 @@ void PStore::Init(int db_number) { backends_.push_back(std::move(db)); INFO("Open DB_{} success!", i); } + auto ip = g_config.ip.ToString(); butil::ip_t rpc_ip; butil::str2ip(ip.c_str(), &rpc_ip); @@ -39,6 +40,10 @@ void PStore::Init(int db_number) { if (braft::add_service(GetRpcServer(), endpoint_) != 0) { return ERROR("Failed to add raft service to rpc server"); } + if (0 != rpc_server_->AddService(dynamic_cast(praft_service_.get()), brpc::SERVER_OWNS_SERVICE)) { + return ERROR("Failed to add praft service to rpc server"); + } + if (rpc_server_->Start(endpoint_, nullptr) != 0) { return ERROR("Failed to start rpc server"); } diff --git a/src/store.h b/src/store.h index 751771199..c9cd50d46 100644 --- a/src/store.h +++ b/src/store.h @@ -11,7 +11,10 @@ #include #include #include + #include "praft/praft.h" +#include "praft/praft_service.h" + #define GLOG_NO_ABBREVIATED_SEVERITIES #include @@ -23,6 +26,7 @@ #include "db.h" namespace pikiwidb { +class RaftServiceImpl; enum TaskType { kCheckpoint = 0, kLoadDBFromCheckpoint, kEmpty }; @@ -79,6 +83,7 @@ class PStore { int db_number_ = 0; std::vector> backends_; butil::EndPoint endpoint_; + std::unique_ptr praft_service_{std::make_unique()}; std::unique_ptr rpc_server_{std::make_unique()}; std::unordered_map db_map_; }; From 4dba2d04407ba572ed02b85e0eed08c0be66fe7b Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Sun, 28 Jul 2024 16:28:34 +0800 Subject: [PATCH 08/10] feat: use brpc to send node add message --- src/client.h | 2 + src/cmd_raft.cc | 443 ++++++++++++++++++++----------------- src/cmd_raft.h | 14 +- src/praft/praft.cc | 6 +- src/praft/praft_service.cc | 38 ++-- src/praft/praft_service.h | 6 +- src/store.cc | 56 ++++- src/store.h | 35 ++- 8 files changed, 333 insertions(+), 267 deletions(-) diff --git a/src/client.h b/src/client.h index 7f9eb9823..1d1b6b550 100644 --- a/src/client.h +++ b/src/client.h @@ -107,6 +107,8 @@ enum class ClientState { kClosed, }; +const int kChannelTimeoutMS = 200; + class DB; struct PSlaveInfo; diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 71d2153e7..6ab941fa9 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -1,22 +1,21 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - +src / store.h /* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ #include "cmd_raft.h" +#include #include #include #include -#include +#include "brpc/channel.h" +#include "praft.pb.h" #include "praft/praft.h" #include "pstd/log.h" #include "pstd/pstd_string.h" -#include "brpc/channel.h" -#include "praft.pb.h" #include "client.h" #include "config.h" @@ -24,240 +23,270 @@ #include "replication.h" #include "store.h" -namespace pikiwidb { + namespace pikiwidb { -RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} + extern PConfig g_config; -bool RaftNodeCmd::DoInitial(PClient* client) { - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); + RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} - if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) { - client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); - return false; - } - group_id_ = client->argv_[2]; - praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); - return true; -} - -void RaftNodeCmd::DoCmd(PClient* client) { - assert(praft_); - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); - if (cmd == kAddCmd) { - DoCmdAdd(client); - } else if (cmd == kRemoveCmd) { - DoCmdRemove(client); - } else if (cmd == kDoSnapshot) { - assert(0); // TODO(longfar): add group id in arguments - DoCmdSnapshot(client); - } else { - client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); - } -} - -void RaftNodeCmd::DoCmdAdd(PClient* client) { - DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); - auto db = PSTORE.GetDBByGroupID(group_id_); - assert(db); - auto praft = db->GetPRaft(); - // Check whether it is a leader. If it is not a leader, return the leader information - if (!praft->IsLeader()) { - client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); - return; - } + bool RaftNodeCmd::DoInitial(PClient * client) { + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); - if (client->argv_.size() != 4) { - client->SetRes(CmdRes::kWrongNum, client->CmdName()); - return; + if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) { + client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); + return false; + } + group_id_ = client->argv_[2]; + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + return true; } - // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. - // So we do not need to parse and use nodeid like redis; - auto s = praft->AddPeer(client->argv_[3]); - if (s.ok()) { - client->SetRes(CmdRes::kOK); - } else { - client->SetRes(CmdRes::kErrOther, fmt::format("Failed to add peer: {}", s.error_str())); + void RaftNodeCmd::DoCmd(PClient * client) { + assert(praft_); + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd == kAddCmd) { + DoCmdAdd(client); + } else if (cmd == kRemoveCmd) { + DoCmdRemove(client); + } else if (cmd == kDoSnapshot) { + assert(0); // TODO(longfar): add group id in arguments + DoCmdSnapshot(client); + } else { + client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); + } } -} -void RaftNodeCmd::DoCmdRemove(PClient* client) { - // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (!praft_->IsInitialized()) { - client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); - return; - } + void RaftNodeCmd::DoCmdAdd(PClient * client) { + DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); + auto db = PSTORE.GetDBByGroupID(group_id_); + assert(db); + auto praft = db->GetPRaft(); + // Check whether it is a leader. If it is not a leader, return the leader information + if (!praft->IsLeader()) { + client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); + return; + } - if (client->argv_.size() != 3) { - client->SetRes(CmdRes::kWrongNum, client->CmdName()); - return; + if (client->argv_.size() != 4) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return; + } + + // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. + // So we do not need to parse and use nodeid like redis; + auto s = praft->AddPeer(client->argv_[3]); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("Failed to add peer: {}", s.error_str())); + } } - // Check whether it is a leader. If it is not a leader, send remove request to leader - if (!praft_->IsLeader()) { - // Get the leader information - braft::PeerId leader_peer_id(praft_->GetLeaderID()); - // @todo There will be an unreasonable address, need to consider how to deal with it - if (leader_peer_id.is_empty()) { - client->SetRes(CmdRes::kErrOther, - "The leader address of the cluster is incorrect, try again or delete the node from another node"); + void RaftNodeCmd::DoCmdRemove(PClient * client) { + // If the node has been initialized, it needs to close the previous initialization and rejoin the other group + if (!praft_->IsInitialized()) { + client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); return; } - // Connect target - std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); - auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; - auto peer_id = client->argv_[2]; - auto ret = - praft_->GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); - if (!ret) { // other clients have removed - return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); + if (client->argv_.size() != 3) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return; } - praft_->GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent remove request to leader successfully"); - // Not reply any message here, we will reply after the connection is established. - client->Clear(); - return; - } + // Check whether it is a leader. If it is not a leader, send remove request to leader + if (!praft_->IsLeader()) { + // Get the leader information + braft::PeerId leader_peer_id(praft_->GetLeaderID()); + // @todo There will be an unreasonable address, need to consider how to deal with it + if (leader_peer_id.is_empty()) { + client->SetRes( + CmdRes::kErrOther, + "The leader address of the cluster is incorrect, try again or delete the node from another node"); + return; + } + + // Connect target + std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); + auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; + auto peer_id = client->argv_[2]; + auto ret = + praft_->GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); + if (!ret) { // other clients have removed + return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); + } + praft_->GetClusterCmdCtx().ConnectTargetNode(); + INFO("Sent remove request to leader successfully"); + + // Not reply any message here, we will reply after the connection is established. + client->Clear(); + return; + } - auto s = praft_->RemovePeer(client->argv_[2]); - if (s.ok()) { - client->SetRes(CmdRes::kOK); - } else { - client->SetRes(CmdRes::kErrOther, fmt::format("Failed to remove peer: {}", s.error_str())); + auto s = praft_->RemovePeer(client->argv_[2]); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("Failed to remove peer: {}", s.error_str())); + } } -} -void RaftNodeCmd::DoCmdSnapshot(PClient* client) { - auto s = praft_->DoSnapshot(); - if (s.ok()) { - client->SetRes(CmdRes::kOK); + void RaftNodeCmd::DoCmdSnapshot(PClient * client) { + auto s = praft_->DoSnapshot(); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } } -} -RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} + RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} -bool RaftClusterCmd::DoInitial(PClient* client) { - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); - if (cmd != kInitCmd && cmd != kJoinCmd) { - client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); - return false; + bool RaftClusterCmd::DoInitial(PClient * client) { + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd != kInitCmd && cmd != kJoinCmd) { + client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); + return false; + } + + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + return true; } - praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); - return true; -} - -void RaftClusterCmd::DoCmd(PClient* client) { - assert(praft_); - if (praft_->IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, "Already cluster member"); + + void RaftClusterCmd::DoCmd(PClient * client) { + assert(praft_); + if (praft_->IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, "Already cluster member"); + } + + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd == kInitCmd) { + DoCmdInit(client); + } else { + DoCmdJoin(client); + } } - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); - if (cmd == kInitCmd) { - DoCmdInit(client); - } else { - DoCmdJoin(client); + void RaftClusterCmd::DoCmdInit(PClient * client) { + if (client->argv_.size() != 2 && client->argv_.size() != 3) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + std::string group_id; + if (client->argv_.size() == 3) { + group_id = client->argv_[2]; + if (group_id.size() != RAFT_GROUPID_LEN) { + return client->SetRes(CmdRes::kInvalidParameter, + "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); + } + } else { + group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); + } + + auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); + if (add_region_success) { + auto s = praft_->Init(group_id, false); + if (!s.ok()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); + } + client->SetLineString(fmt::format("+OK {}", group_id)); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); + } } -} -void RaftClusterCmd::DoCmdInit(PClient* client) { - if (client->argv_.size() != 2 && client->argv_.size() != 3) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + static inline std::optional> GetIpAndPortFromEndPoint(const std::string& endpoint) { + auto pos = endpoint.find(':'); + if (pos == std::string::npos) { + return std::nullopt; + } + + int32_t ret = 0; + pstd::String2int(endpoint.substr(pos + 1), &ret); + return {{endpoint.substr(0, pos), ret}}; } - std::string group_id; - if (client->argv_.size() == 3) { - group_id = client->argv_[2]; + void RaftClusterCmd::DoCmdJoin(PClient * client) { + assert(client->argv_.size() == 4); + auto group_id = client->argv_[2]; + auto addr = client->argv_[3]; + butil::EndPoint endpoint; + if (0 != butil::str2endpoint(addr.c_str(), &endpoint)) { + ERROR("Wrong endpoint format: {}", addr); + return client->SetRes(CmdRes::kErrOther, "Wrong endpoint format"); + } + endpoint.port += g_config.raft_port_offset; + if (group_id.size() != RAFT_GROUPID_LEN) { return client->SetRes(CmdRes::kInvalidParameter, "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } - } else { - group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); - } - PSTORE.AddRegion(group_id, client->GetCurrentDB()); - auto s = praft_->Init(group_id, false); - if (!s.ok()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: {}", s.error_str())); - } - client->SetLineString(fmt::format("+OK {}", group_id)); -} - -static inline std::optional> GetIpAndPortFromEndPoint(const std::string& endpoint) { - auto pos = endpoint.find(':'); - if (pos == std::string::npos) { - return std::nullopt; - } - int32_t ret = 0; - pstd::String2int(endpoint.substr(pos + 1), &ret); - return {{endpoint.substr(0, pos), ret}}; -} + auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); + if (add_region_success) { + auto s = praft_->Init(group_id, false); + if (!s.ok()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); + } + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); + } -void RaftClusterCmd::DoCmdJoin(PClient* client) { - assert(client->argv_.size() == 4); - auto group_id = client->argv_[2]; - auto addr = client->argv_[3]; + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = kChannelTimeoutMS; + + brpc::Channel add_node_channel; + if (0 != add_node_channel.Init(endpoint, &options)) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + ERROR("Fail to init add_node_channel to praft service!"); + client->SetRes(CmdRes::kErrOther, "Fail to init add_node_channel."); + return; + } - if (group_id.size() != RAFT_GROUPID_LEN) { - return client->SetRes(CmdRes::kInvalidParameter, - "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); - } - auto ip_port = GetIpAndPortFromEndPoint(addr); - if (!ip_port.has_value()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); - } - auto& [ip, port] = *ip_port; - PSTORE.AddRegion(group_id, client->GetCurrentDB()); - auto s = praft_->Init(group_id, true); - assert(s.ok()); - - brpc::ChannelOptions options; - options.connection_type = brpc::CONNECTION_TYPE_SINGLE; - options.max_retry = 0; - options.connect_timeout_ms = 200; - - brpc::Channel add_node_channel; - if (0 != add_node_channel.Init(addr.c_str(), &options)) { - ERROR("Fail to init add_node_channel to praft service!"); - // 失败的情况下,应该取消 Init。 - // 并且 Remove Region。 - client->SetRes(CmdRes::kErrOther, "Fail to init channel."); - return; + brpc::Controller cntl; + NodeAddRequest request; + NodeAddResponse response; + auto end_point = butil::endpoint2str(PSTORE.GetEndPoint()).c_str(); + request.set_groupid(group_id); + request.set_endpoint(std::string(end_point)); + request.set_index(client->GetCurrentDB()); + request.set_role(0); + PRaftService_Stub stub(&add_node_channel); + stub.AddNode(&cntl, &request, &response, NULL); + + if (cntl.Failed()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + ERROR("Fail to send add node rpc to target server {}", addr); + client->SetRes(CmdRes::kErrOther, "Failed to send add node rpc"); + return; + } + if (response.success()) { + client->SetRes(CmdRes::kOK, "Add Node Success"); + return; + } + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + ERROR("Add node request return false"); + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); } - brpc::Controller cntl; - NodeAddRequest request; - NodeAddResponse response; - auto end_point = fmt::format("{}:{}", ip, std::to_string(port)); - request.set_groupid(group_id); - request.set_endpoint(end_point); - request.set_index(client->GetCurrentDB()); - request.set_role(0); // 0 : !witness - PRaftService_Stub stub(&add_node_channel); - stub.AddNode(&cntl, &request, &response, NULL); - - if (cntl.Failed()) { - client->SetRes(CmdRes::kErrOther, "Failed to send add node rpc"); - return; + void RaftClusterCmd::ClearPaftCtx() { + assert(praft_); + praft_->ShutDown(); + praft_->Join(); + praft_->Clear(); + praft_ = nullptr; } - if (response.success()) { - client->SetRes(CmdRes::kOK, "Add Node Success"); - return; - } - // 这里需要删除 Region。并且取消 初始化 。 - client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); - - // Not reply any message here, we will reply after the connection is established. - // client->Clear(); -} - } // namespace pikiwidb diff --git a/src/cmd_raft.h b/src/cmd_raft.h index ceb68f147..b6b429e24 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -14,7 +14,7 @@ namespace pikiwidb { class PRaft; -/* +/* * RAFT.NODE ADD [id] [address:port] * Add a new node to the cluster. The [id] can be an explicit non-zero value, * or zero to let the cluster choose one. @@ -49,8 +49,8 @@ class RaftNodeCmd : public BaseCmd { void DoCmdRemove(PClient *client); void DoCmdSnapshot(PClient *client); -private: - PRaft* praft_ = nullptr; + private: + PRaft *praft_ = nullptr; std::string group_id_; static constexpr std::string_view kAddCmd = "ADD"; @@ -58,7 +58,7 @@ class RaftNodeCmd : public BaseCmd { static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT"; }; -/* +/* * RAFT.CLUSTER INIT [group_id] * Initializes a new Raft cluster. * Reply: @@ -81,8 +81,10 @@ class RaftClusterCmd : public BaseCmd { void DoCmdInit(PClient *client); void DoCmdJoin(PClient *client); -private: - PRaft* praft_ = nullptr; + void ClearPaftCtx(); + + private: + PRaft *praft_ = nullptr; static constexpr std::string_view kInitCmd = "INIT"; static constexpr std::string_view kJoinCmd = "JOIN"; diff --git a/src/praft/praft.cc b/src/praft/praft.cc index e9213e7aa..fcb971417 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -66,7 +66,6 @@ void ClusterCmdContext::ConnectTargetNode() { auto ip = PREPL.GetMasterAddr().GetIP(); auto port = PREPL.GetMasterAddr().GetPort(); if (ip == peer_ip_ && port == port_ && PREPL.GetMasterState() == kPReplStateConnected) { - std::cout<<"已经建立连接了, 直接发送" <SendNodeRequest(PREPL.GetMaster()); return; } @@ -461,7 +460,7 @@ butil::Status PRaft::AddPeer(const std::string& peer) { } butil::Status PRaft::AddPeer(const std::string& endpoint, int index) { - if (!node_) { + if (!node_) { ERROR_LOG_AND_STATUS("Node is not initialized"); } @@ -473,8 +472,7 @@ butil::Status PRaft::AddPeer(const std::string& endpoint, int index) { done.wait(); if (!done.status().ok()) { - // WARN("Failed to add peer {} to node {}, status: {}", end_point, node_->node_id().to_string(), done.status().error_str()); - WARN("Failed to add"); + WARN("Failed to add peer {} to node {}, status: {}", ep, node_->node_id().to_string(), done.status().error_str()); return done.status(); } return done.status(); diff --git a/src/praft/praft_service.cc b/src/praft/praft_service.cc index b3413705f..6e2c9559b 100644 --- a/src/praft/praft_service.cc +++ b/src/praft/praft_service.cc @@ -10,27 +10,21 @@ #include "fmt/format.h" #include "store.h" - namespace pikiwidb { - void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, - const ::pikiwidb::NodeAddRequest* request, - ::pikiwidb::NodeAddResponse* response, - ::google::protobuf::Closure* done) { - brpc::ClosureGuard done_guard(done); - auto groupid = request->groupid(); - auto db_ptr = PSTORE.GetDBByGroupID(groupid); - auto praft_ptr = db_ptr->GetPRaft(); - auto end_point = request->endpoint(); - auto index = request->index(); - auto role = request->role(); - - auto status = praft_ptr->AddPeer(end_point, index); - if (!status.ok()) { - std::cout<<"add node fail!"<set_success(false); - return; - } - std::cout<<"add node success!"<set_success(true); +void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto groupid = request->groupid(); + auto db_ptr = PSTORE.GetDBByGroupID(groupid); + auto praft_ptr = db_ptr->GetPRaft(); + auto end_point = request->endpoint(); + auto index = request->index(); + auto role = request->role(); + auto status = praft_ptr->AddPeer(end_point, index); + if (!status.ok()) { + response->set_success(false); + return; + } + response->set_success(true); } -} \ No newline at end of file +} // namespace pikiwidb \ No newline at end of file diff --git a/src/praft/praft_service.h b/src/praft/praft_service.h index 000e334b8..8719dc6da 100644 --- a/src/praft/praft_service.h +++ b/src/praft/praft_service.h @@ -15,10 +15,8 @@ class PRaft; class PRaftServiceImpl : public PRaftService { public: PRaftServiceImpl() = default; - void AddNode(::google::protobuf::RpcController* controller, - const ::pikiwidb::NodeAddRequest* request, - ::pikiwidb::NodeAddResponse* response, - ::google::protobuf::Closure* done); + void AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done); }; } // namespace pikiwidb diff --git a/src/store.cc b/src/store.cc index acad88d42..81ed7c233 100644 --- a/src/store.cc +++ b/src/store.cc @@ -9,9 +9,9 @@ #include "config.h" #include "db.h" +#include "praft/praft_service.h" #include "pstd/log.h" #include "pstd/pstd_string.h" -#include "praft/praft_service.h" namespace pikiwidb { PStore::~PStore() { INFO("STORE is closing..."); } @@ -30,25 +30,29 @@ void PStore::Init(int db_number) { backends_.push_back(std::move(db)); INFO("Open DB_{} success!", i); } - - auto ip = g_config.ip.ToString(); - butil::ip_t rpc_ip; - butil::str2ip(ip.c_str(), &rpc_ip); + + auto rpc_ip = g_config.ip.ToString(); auto rpc_port = g_config.port.load(std::memory_order_relaxed) + g_config.raft_port_offset.load(std::memory_order_relaxed); - endpoint_ = butil::EndPoint(rpc_ip, rpc_port); - if (braft::add_service(GetRpcServer(), endpoint_) != 0) { + + if (0 != butil::str2endpoint(fmt::format("{}:{}", rpc_ip, std::to_string(rpc_port)).c_str(), &endpoint_)) { + return ERROR("Wrong endpoint format"); + } + + if (0 != braft::add_service(GetRpcServer(), endpoint_)) { return ERROR("Failed to add raft service to rpc server"); } - if (0 != rpc_server_->AddService(dynamic_cast(praft_service_.get()), brpc::SERVER_OWNS_SERVICE)) { + + if (0 != rpc_server_->AddService(dynamic_cast(praft_service_.get()), + brpc::SERVER_OWNS_SERVICE)) { return ERROR("Failed to add praft service to rpc server"); } - - if (rpc_server_->Start(endpoint_, nullptr) != 0) { + + if (0 != rpc_server_->Start(endpoint_, nullptr)) { return ERROR("Failed to start rpc server"); } - INFO("Started RPC server successfully"); - INFO("STORE Init success!"); + INFO("Started RPC server successfully on addr {}", butil::endpoint2str(endpoint_).c_str()); + INFO("PSTORE Init success!"); } void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { @@ -88,4 +92,32 @@ void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { } }); } + +bool PStore::AddRegion(const std::string& group_id, uint32_t dbno) { + std::lock_guard lock(rw_mutex_); + if (region_map_.find(group_id) != region_map_.end()) { + return false; + } + region_map_.emplace(group_id, dbno); + return true; +} + +bool PStore::RemoveRegion(const std::string& group_id) { + std::lock_guard lock(rw_mutex_); + if (region_map_.find(group_id) != region_map_.end()) { + region_map_.erase(group_id); + return true; + } + return false; +} + +DB* PStore::GetDBByGroupID(const std::string& group_id) const { + std::shared_lock lock(rw_mutex_); + auto it = region_map_.find(group_id); + if (it == region_map_.end()) { + return nullptr; + } + return backends_[it->second].get(); +} + } // namespace pikiwidb diff --git a/src/store.h b/src/store.h index c9cd50d46..aad6d8f28 100644 --- a/src/store.h +++ b/src/store.h @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -63,19 +64,27 @@ class PStore { void HandleTaskSpecificDB(const TasksVector& tasks); int GetDBNumber() const { return db_number_; } + brpc::Server* GetRpcServer() const { return rpc_server_.get(); } + const butil::EndPoint& GetEndPoint() const { return endpoint_; } - void AddRegion(const std::string& group_id, uint32_t dbno) { - assert(!db_map_.contains(group_id)); - db_map_.emplace(group_id, dbno); - } - DB* GetDBByGroupID(const std::string& group_id) const { - auto it = db_map_.find(group_id); - if (it == db_map_.end()) { - return nullptr; - } - return backends_[it->second].get(); - } + + /** + * return true if add group_id -> dbno into region_map_ success. + * return false if group_id -> dbno already exists in region_map_. + */ + bool AddRegion(const std::string& group_id, uint32_t dbno); + + /** + * return true if remove group_id -> dbno from region_map_ success. + * return false if group_id -> dbno do not exists in region_map_. + */ + bool RemoveRegion(const std::string& group_id); + + /** + * return nullptr if group_id -> dbno do not existed in region_map_. + */ + DB* GetDBByGroupID(const std::string& group_id) const; private: PStore() = default; @@ -85,7 +94,9 @@ class PStore { butil::EndPoint endpoint_; std::unique_ptr praft_service_{std::make_unique()}; std::unique_ptr rpc_server_{std::make_unique()}; - std::unordered_map db_map_; + + mutable std::shared_mutex rw_mutex_; + std::unordered_map region_map_; }; #define PSTORE PStore::Instance() From 7604f766a91fcf673e1d0e998f616a6fa5da9302 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Sun, 28 Jul 2024 21:50:49 +0800 Subject: [PATCH 09/10] feat: add add node redirect feature --- src/cmd_raft.cc | 430 +++++++++++++++++++------------------ src/praft/praft.cc | 3 +- src/praft/praft.proto | 6 +- src/praft/praft_service.cc | 30 ++- src/store.h | 7 + 5 files changed, 266 insertions(+), 210 deletions(-) diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 6ab941fa9..ee0ac303e 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -1,9 +1,9 @@ -src / store.h /* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ #include "cmd_raft.h" #include @@ -23,228 +23,239 @@ src / store.h /* #include "replication.h" #include "store.h" - namespace pikiwidb { +namespace pikiwidb { - extern PConfig g_config; +extern PConfig g_config; - RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} +RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} - bool RaftNodeCmd::DoInitial(PClient * client) { - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); +bool RaftNodeCmd::DoInitial(PClient* client) { + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); - if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) { - client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); - return false; - } - group_id_ = client->argv_[2]; - praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); - return true; + if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) { + client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); + return false; + } + group_id_ = client->argv_[2]; + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + return true; +} + +void RaftNodeCmd::DoCmd(PClient* client) { + assert(praft_); + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd == kAddCmd) { + DoCmdAdd(client); + } else if (cmd == kRemoveCmd) { + DoCmdRemove(client); + } else if (cmd == kDoSnapshot) { + assert(0); // TODO(longfar): add group id in arguments + DoCmdSnapshot(client); + } else { + client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); + } +} + +void RaftNodeCmd::DoCmdAdd(PClient* client) { + DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); + auto db = PSTORE.GetDBByGroupID(group_id_); + assert(db); + auto praft = db->GetPRaft(); + // Check whether it is a leader. If it is not a leader, return the leader information + if (!praft->IsLeader()) { + client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); + return; } - void RaftNodeCmd::DoCmd(PClient * client) { - assert(praft_); - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); - if (cmd == kAddCmd) { - DoCmdAdd(client); - } else if (cmd == kRemoveCmd) { - DoCmdRemove(client); - } else if (cmd == kDoSnapshot) { - assert(0); // TODO(longfar): add group id in arguments - DoCmdSnapshot(client); - } else { - client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); - } + if (client->argv_.size() != 4) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return; } - void RaftNodeCmd::DoCmdAdd(PClient * client) { - DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); - auto db = PSTORE.GetDBByGroupID(group_id_); - assert(db); - auto praft = db->GetPRaft(); - // Check whether it is a leader. If it is not a leader, return the leader information - if (!praft->IsLeader()) { - client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); - return; - } + // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. + // So we do not need to parse and use nodeid like redis; + auto s = praft->AddPeer(client->argv_[3]); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("Failed to add peer: {}", s.error_str())); + } +} - if (client->argv_.size() != 4) { - client->SetRes(CmdRes::kWrongNum, client->CmdName()); - return; - } +void RaftNodeCmd::DoCmdRemove(PClient* client) { + // If the node has been initialized, it needs to close the previous initialization and rejoin the other group + if (!praft_->IsInitialized()) { + client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); + return; + } - // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. - // So we do not need to parse and use nodeid like redis; - auto s = praft->AddPeer(client->argv_[3]); - if (s.ok()) { - client->SetRes(CmdRes::kOK); - } else { - client->SetRes(CmdRes::kErrOther, fmt::format("Failed to add peer: {}", s.error_str())); - } + if (client->argv_.size() != 3) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return; } - void RaftNodeCmd::DoCmdRemove(PClient * client) { - // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (!praft_->IsInitialized()) { - client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); + // Check whether it is a leader. If it is not a leader, send remove request to leader + if (!praft_->IsLeader()) { + // Get the leader information + braft::PeerId leader_peer_id(praft_->GetLeaderID()); + // @todo There will be an unreasonable address, need to consider how to deal with it + if (leader_peer_id.is_empty()) { + client->SetRes(CmdRes::kErrOther, + "The leader address of the cluster is incorrect, try again or delete the node from another node"); return; } - if (client->argv_.size() != 3) { - client->SetRes(CmdRes::kWrongNum, client->CmdName()); - return; + // Connect target + std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); + auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; + auto peer_id = client->argv_[2]; + auto ret = + praft_->GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); + if (!ret) { // other clients have removed + return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); } + praft_->GetClusterCmdCtx().ConnectTargetNode(); + INFO("Sent remove request to leader successfully"); - // Check whether it is a leader. If it is not a leader, send remove request to leader - if (!praft_->IsLeader()) { - // Get the leader information - braft::PeerId leader_peer_id(praft_->GetLeaderID()); - // @todo There will be an unreasonable address, need to consider how to deal with it - if (leader_peer_id.is_empty()) { - client->SetRes( - CmdRes::kErrOther, - "The leader address of the cluster is incorrect, try again or delete the node from another node"); - return; - } - - // Connect target - std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); - auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; - auto peer_id = client->argv_[2]; - auto ret = - praft_->GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); - if (!ret) { // other clients have removed - return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); - } - praft_->GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent remove request to leader successfully"); - - // Not reply any message here, we will reply after the connection is established. - client->Clear(); - return; - } + // Not reply any message here, we will reply after the connection is established. + client->Clear(); + return; + } - auto s = praft_->RemovePeer(client->argv_[2]); - if (s.ok()) { - client->SetRes(CmdRes::kOK); - } else { - client->SetRes(CmdRes::kErrOther, fmt::format("Failed to remove peer: {}", s.error_str())); - } + auto s = praft_->RemovePeer(client->argv_[2]); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("Failed to remove peer: {}", s.error_str())); } +} - void RaftNodeCmd::DoCmdSnapshot(PClient * client) { - auto s = praft_->DoSnapshot(); - if (s.ok()) { - client->SetRes(CmdRes::kOK); - } +void RaftNodeCmd::DoCmdSnapshot(PClient* client) { + auto s = praft_->DoSnapshot(); + if (s.ok()) { + client->SetRes(CmdRes::kOK); } +} - RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} +RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} - bool RaftClusterCmd::DoInitial(PClient * client) { - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); - if (cmd != kInitCmd && cmd != kJoinCmd) { - client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); - return false; - } +bool RaftClusterCmd::DoInitial(PClient* client) { + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd != kInitCmd && cmd != kJoinCmd) { + client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); + return false; + } - praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); - return true; + praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + return true; +} + +void RaftClusterCmd::DoCmd(PClient* client) { + assert(praft_); + if (praft_->IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, "Already cluster member"); } - void RaftClusterCmd::DoCmd(PClient * client) { - assert(praft_); - if (praft_->IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, "Already cluster member"); - } + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd == kInitCmd) { + DoCmdInit(client); + } else { + DoCmdJoin(client); + } +} - auto cmd = client->argv_[1]; - pstd::StringToUpper(cmd); - if (cmd == kInitCmd) { - DoCmdInit(client); - } else { - DoCmdJoin(client); - } +void RaftClusterCmd::DoCmdInit(PClient* client) { + if (client->argv_.size() != 2 && client->argv_.size() != 3) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - void RaftClusterCmd::DoCmdInit(PClient * client) { - if (client->argv_.size() != 2 && client->argv_.size() != 3) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + std::string group_id; + if (client->argv_.size() == 3) { + group_id = client->argv_[2]; + if (group_id.size() != RAFT_GROUPID_LEN) { + return client->SetRes(CmdRes::kInvalidParameter, + "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } + } else { + group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); + } - std::string group_id; - if (client->argv_.size() == 3) { - group_id = client->argv_[2]; - if (group_id.size() != RAFT_GROUPID_LEN) { - return client->SetRes(CmdRes::kInvalidParameter, - "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); - } - } else { - group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); + auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); + if (add_region_success) { + auto s = praft_->Init(group_id, false); + if (!s.ok()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); } + client->SetLineString(fmt::format("+OK {}", group_id)); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); + } +} - auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); - if (add_region_success) { - auto s = praft_->Init(group_id, false); - if (!s.ok()) { - PSTORE.RemoveRegion(group_id); - ClearPaftCtx(); - return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); - } - client->SetLineString(fmt::format("+OK {}", group_id)); - } else { - client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); - } +static inline std::optional> GetIpAndPortFromEndPoint(const std::string& endpoint) { + auto pos = endpoint.find(':'); + if (pos == std::string::npos) { + return std::nullopt; } - static inline std::optional> GetIpAndPortFromEndPoint(const std::string& endpoint) { - auto pos = endpoint.find(':'); - if (pos == std::string::npos) { - return std::nullopt; - } + int32_t ret = 0; + pstd::String2int(endpoint.substr(pos + 1), &ret); + return {{endpoint.substr(0, pos), ret}}; +} + +void RaftClusterCmd::DoCmdJoin(PClient* client) { + assert(client->argv_.size() == 4); + auto group_id = client->argv_[2]; + auto addr = client->argv_[3]; + butil::EndPoint endpoint; + if (0 != butil::str2endpoint(addr.c_str(), &endpoint)) { + ERROR("Wrong endpoint format: {}", addr); + return client->SetRes(CmdRes::kErrOther, "Wrong endpoint format"); + } + endpoint.port += g_config.raft_port_offset; - int32_t ret = 0; - pstd::String2int(endpoint.substr(pos + 1), &ret); - return {{endpoint.substr(0, pos), ret}}; + if (group_id.size() != RAFT_GROUPID_LEN) { + return client->SetRes(CmdRes::kInvalidParameter, + "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } - void RaftClusterCmd::DoCmdJoin(PClient * client) { - assert(client->argv_.size() == 4); - auto group_id = client->argv_[2]; - auto addr = client->argv_[3]; - butil::EndPoint endpoint; - if (0 != butil::str2endpoint(addr.c_str(), &endpoint)) { - ERROR("Wrong endpoint format: {}", addr); - return client->SetRes(CmdRes::kErrOther, "Wrong endpoint format"); + auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); + if (add_region_success) { + auto s = praft_->Init(group_id, false); + if (!s.ok()) { + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); } - endpoint.port += g_config.raft_port_offset; + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); + } - if (group_id.size() != RAFT_GROUPID_LEN) { - return client->SetRes(CmdRes::kInvalidParameter, - "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); - } + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = kChannelTimeoutMS; - auto add_region_success = PSTORE.AddRegion(group_id, client->GetCurrentDB()); - if (add_region_success) { - auto s = praft_->Init(group_id, false); - if (!s.ok()) { - PSTORE.RemoveRegion(group_id); - ClearPaftCtx(); - return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init raft node: {}", s.error_str())); - } - } else { - client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); - } + NodeAddRequest request; + NodeAddResponse response; + + auto end_point = butil::endpoint2str(PSTORE.GetEndPoint()).c_str(); + request.set_group_id(group_id); + request.set_endpoint(std::string(end_point)); + request.set_index(client->GetCurrentDB()); + request.set_role(0); - brpc::ChannelOptions options; - options.connection_type = brpc::CONNECTION_TYPE_SINGLE; - options.max_retry = 0; - options.connect_timeout_ms = kChannelTimeoutMS; + int retry_count = 0; + do { brpc::Channel add_node_channel; if (0 != add_node_channel.Init(endpoint, &options)) { PSTORE.RemoveRegion(group_id); @@ -255,13 +266,6 @@ src / store.h /* } brpc::Controller cntl; - NodeAddRequest request; - NodeAddResponse response; - auto end_point = butil::endpoint2str(PSTORE.GetEndPoint()).c_str(); - request.set_groupid(group_id); - request.set_endpoint(std::string(end_point)); - request.set_index(client->GetCurrentDB()); - request.set_role(0); PRaftService_Stub stub(&add_node_channel); stub.AddNode(&cntl, &request, &response, NULL); @@ -272,21 +276,39 @@ src / store.h /* client->SetRes(CmdRes::kErrOther, "Failed to send add node rpc"); return; } + if (response.success()) { client->SetRes(CmdRes::kOK, "Add Node Success"); return; } - PSTORE.RemoveRegion(group_id); - ClearPaftCtx(); - ERROR("Add node request return false"); - client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); - } - void RaftClusterCmd::ClearPaftCtx() { - assert(praft_); - praft_->ShutDown(); - praft_->Join(); - praft_->Clear(); - praft_ = nullptr; - } + switch (response.error_code()) { + case PRaftErrorCode::kErrorReDirect: { + butil::str2endpoint(response.leader_endpoint().c_str(), &endpoint); + endpoint.port += g_config.raft_port_offset; + break; + } + default: { + ERROR("Add node request return false"); + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); + return; + } + } + } while (!response.success() && ++retry_count <= 3); + + ERROR("Add node request return false"); + PSTORE.RemoveRegion(group_id); + ClearPaftCtx(); + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); +} + +void RaftClusterCmd::ClearPaftCtx() { + assert(praft_); + praft_->ShutDown(); + praft_->Join(); + praft_->Clear(); + praft_ = nullptr; +} } // namespace pikiwidb diff --git a/src/praft/praft.cc b/src/praft/praft.cc index fcb971417..709ef55d7 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -472,7 +472,8 @@ butil::Status PRaft::AddPeer(const std::string& endpoint, int index) { done.wait(); if (!done.status().ok()) { - WARN("Failed to add peer {} to node {}, status: {}", ep, node_->node_id().to_string(), done.status().error_str()); + // WARN("Failed to add peer {} to node {}, status: {}", ep, node_->node_id().to_string(), + // done.status().error_str()); return done.status(); } return done.status(); diff --git a/src/praft/praft.proto b/src/praft/praft.proto index 24bb42580..5636baf1a 100644 --- a/src/praft/praft.proto +++ b/src/praft/praft.proto @@ -3,14 +3,16 @@ package pikiwidb; option cc_generic_services = true; message NodeAddRequest { - string GroupID = 1; - string EndPoint = 2; + string group_id = 1; + string endpoint = 2; uint32 index = 3; uint32 role = 4; } message NodeAddResponse { bool success = 1; + uint32 error_code = 2; + string leader_endpoint = 3; } service PRaftService { diff --git a/src/praft/praft_service.cc b/src/praft/praft_service.cc index 6e2c9559b..305f5cfca 100644 --- a/src/praft/praft_service.cc +++ b/src/praft/praft_service.cc @@ -14,15 +14,39 @@ namespace pikiwidb { void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); - auto groupid = request->groupid(); - auto db_ptr = PSTORE.GetDBByGroupID(groupid); - auto praft_ptr = db_ptr->GetPRaft(); + auto groupid = request->group_id(); auto end_point = request->endpoint(); auto index = request->index(); auto role = request->role(); + + auto db_ptr = PSTORE.GetDBByGroupID(groupid); + if (!db_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + auto praft_ptr = db_ptr->GetPRaft(); + if (!praft_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + + if (!praft_ptr->IsLeader()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorReDirect)); + std::cout << "leader addr = " << praft_ptr->GetLeaderAddress() << std::endl; + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); + return; + } + auto status = praft_ptr->AddPeer(end_point, index); if (!status.ok()) { response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorAddNode)); + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); return; } response->set_success(true); diff --git a/src/store.h b/src/store.h index aad6d8f28..c36f33975 100644 --- a/src/store.h +++ b/src/store.h @@ -49,6 +49,13 @@ struct TaskContext { using TasksVector = std::vector; +enum PRaftErrorCode { + kErrorDisMatch = 0, + kErrorAddNode, + kErrorRemoveNode, + kErrorReDirect, +}; + class PStore { public: static PStore& Instance(); From d7e2c4f91291b3ea1018a00ed8afd1b20a9498fa Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Mon, 29 Jul 2024 00:06:31 +0800 Subject: [PATCH 10/10] feat: add add remove redirect feature --- src/cmd_raft.cc | 68 ++++++++++++++++++++++++++++++-------- src/praft/praft.cc | 20 +++++++++++ src/praft/praft.h | 1 + src/praft/praft.proto | 15 ++++++++- src/praft/praft_service.cc | 43 +++++++++++++++++++++++- src/praft/praft_service.h | 3 ++ 6 files changed, 134 insertions(+), 16 deletions(-) diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index ee0ac303e..ac9158b67 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -108,24 +108,64 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { return; } - // Connect target - std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); - auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; - auto peer_id = client->argv_[2]; - auto ret = - praft_->GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); - if (!ret) { // other clients have removed - return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); - } - praft_->GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent remove request to leader successfully"); + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = kChannelTimeoutMS; + + NodeRemoveRequest request; + NodeRemoveResponse response; + + request.set_group_id(praft_->GetGroupID()); + request.set_endpoint(client->argv_[2]); + request.set_index(client->GetCurrentDB()); + request.set_role(0); + + auto endpoint = leader_peer_id.addr; + int retry_count = 0; + do { + brpc::Channel remove_node_channel; + if (0 != remove_node_channel.Init(endpoint, &options)) { + ERROR("Fail to init remove_node_channel to praft service!"); + client->SetRes(CmdRes::kErrOther, "Fail to init remove_node_channel."); + return; + } + + brpc::Controller cntl; + PRaftService_Stub stub(&remove_node_channel); + stub.RemoveNode(&cntl, &request, &response, NULL); + + if (cntl.Failed()) { + ERROR("Fail to send remove node rpc to target server {}", butil::endpoint2str(endpoint).c_str()); + client->SetRes(CmdRes::kErrOther, "Failed to send remove node rpc"); + return; + } + + if (response.success()) { + client->SetRes(CmdRes::kOK, "Remove Node Success"); + return; + } + + switch (response.error_code()) { + case PRaftErrorCode::kErrorReDirect: { + butil::str2endpoint(response.leader_endpoint().c_str(), &endpoint); + endpoint.port += g_config.raft_port_offset; + break; + } + default: { + ERROR("Remove node request return false"); + client->SetRes(CmdRes::kErrOther, "Failed to Remove Node"); + return; + } + } + } while (!response.success() && ++retry_count <= 3); - // Not reply any message here, we will reply after the connection is established. - client->Clear(); + ERROR("Remove node request return false"); + client->SetRes(CmdRes::kErrOther, "Failed to Remove Node"); return; } - auto s = praft_->RemovePeer(client->argv_[2]); + auto s = praft_->RemovePeer(client->argv_[2], client->GetCurrentDB()); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 709ef55d7..f26c42c6c 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -497,6 +497,26 @@ butil::Status PRaft::RemovePeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::RemovePeer(const std::string& endpoint, int index) { + if (!node_) { + return ERROR_LOG_AND_STATUS("Node is not initialized"); + } + + braft::SynchronizedClosure done; + butil::EndPoint ep; + butil::str2endpoint(endpoint.c_str(), &ep); + ep.port += g_config.raft_port_offset; + braft::PeerId peer_id(ep, index); + node_->remove_peer(peer_id, &done); + done.wait(); + + if (!done.status().ok()) { + WARN("Failed to remove peer, status: {}", done.status().error_str()); + return done.status(); + } + return done.status(); +} + butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) { if (!node_) { return ERROR_LOG_AND_STATUS("Node is not initialized"); diff --git a/src/praft/praft.h b/src/praft/praft.h index b2a9ec266..c4700f878 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -103,6 +103,7 @@ class PRaft : public braft::StateMachine { butil::Status AddPeer(const std::string& peer); butil::Status AddPeer(const std::string& endpoint, int index); butil::Status RemovePeer(const std::string& peer); + butil::Status RemovePeer(const std::string& endpoint, int index); butil::Status DoSnapshot(int64_t self_snapshot_index = 0, bool is_sync = true); void ShutDown(); diff --git a/src/praft/praft.proto b/src/praft/praft.proto index 5636baf1a..9bb140e79 100644 --- a/src/praft/praft.proto +++ b/src/praft/praft.proto @@ -15,7 +15,20 @@ message NodeAddResponse { string leader_endpoint = 3; } +message NodeRemoveRequest { + string group_id = 1; + string endpoint = 2; + uint32 index = 3; + uint32 role = 4; +} + +message NodeRemoveResponse { + bool success = 1; + uint32 error_code = 2; + string leader_endpoint = 3; +} + service PRaftService { rpc AddNode(NodeAddRequest) returns (NodeAddResponse); - + rpc RemoveNode(NodeRemoveRequest) returns (NodeRemoveResponse); }; diff --git a/src/praft/praft_service.cc b/src/praft/praft_service.cc index 305f5cfca..c71305aa8 100644 --- a/src/praft/praft_service.cc +++ b/src/praft/praft_service.cc @@ -37,7 +37,6 @@ void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, co if (!praft_ptr->IsLeader()) { response->set_success(false); response->set_error_code(static_cast(PRaftErrorCode::kErrorReDirect)); - std::cout << "leader addr = " << praft_ptr->GetLeaderAddress() << std::endl; response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); return; } @@ -51,4 +50,46 @@ void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, co } response->set_success(true); } + +void PRaftServiceImpl::RemoveNode(::google::protobuf::RpcController* controller, + const ::pikiwidb::NodeRemoveRequest* request, + ::pikiwidb::NodeRemoveResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto groupid = request->group_id(); + auto end_point = request->endpoint(); + auto index = request->index(); + auto role = request->role(); + + auto db_ptr = PSTORE.GetDBByGroupID(groupid); + if (!db_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + auto praft_ptr = db_ptr->GetPRaft(); + if (!praft_ptr) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + + if (!praft_ptr->IsLeader()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorReDirect)); + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); + return; + } + + auto status = praft_ptr->RemovePeer(end_point, index); + if (!status.ok()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorAddNode)); + response->set_leader_endpoint(praft_ptr->GetLeaderAddress()); + return; + } + response->set_success(true); +} + } // namespace pikiwidb \ No newline at end of file diff --git a/src/praft/praft_service.h b/src/praft/praft_service.h index 8719dc6da..365a575c4 100644 --- a/src/praft/praft_service.h +++ b/src/praft/praft_service.h @@ -17,6 +17,9 @@ class PRaftServiceImpl : public PRaftService { PRaftServiceImpl() = default; void AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done); + + void RemoveNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeRemoveRequest* request, + ::pikiwidb::NodeRemoveResponse* response, ::google::protobuf::Closure* done); }; } // namespace pikiwidb