Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Multi raft #390

Closed
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
databases 2

################################ SNAPSHOTTING #################################
#
Expand Down Expand Up @@ -319,7 +319,7 @@ slowlog-max-len 128
# to the same DB is distributed among several RocksDB instances.

# RocksDB instances number per DB
db-instance-num 3
db-instance-num 1
# default is 86400 * 7
small-compaction-threshold 604800
# default is 86400 * 3
Expand All @@ -343,6 +343,6 @@ rocksdb-ttl-second 604800
rocksdb-periodic-second 259200;

############################### RAFT ###############################
use-raft no
use-raft yes
# Braft relies on brpc to communicate via the default port number plus the port offset
raft-port-offset 10
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ TARGET_INCLUDE_DIRECTORIES(pikiwidb
PRIVATE ${rocksdb_SOURCE_DIR}/include
PRIVATE ${BRAFT_INCLUDE_DIR}
PRIVATE ${BRPC_INCLUDE_DIR}
PRIVATE ${PROTO_OUTPUT_DIR}
)

TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; pstd braft brpc ssl crypto zlib protobuf leveldb gflags z praft praft_pb "${LIB}")
Expand Down
9 changes: 5 additions & 4 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
*/

#include "base_cmd.h"
#include "common.h"
#include "config.h"
#include "log.h"
#include "pikiwidb.h"
#include "praft/praft.h"
#include "store.h"

namespace pikiwidb {

Expand All @@ -35,16 +35,17 @@ void BaseCmd::Execute(PClient* client) {
DEBUG("execute command: {}", client->CmdName());

if (g_config.use_raft.load()) {
auto praft = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft();
// 1. If PRAFT is not initialized yet, return an error message to the client for both read and write commands.
if (!PRAFT.IsInitialized() && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) {
if (!praft->IsInitialized() && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) {
DEBUG("drop command: {}", client->CmdName());
return client->SetRes(CmdRes::kErrOther, "PRAFT is not initialized");
}

// 2. If PRAFT is initialized and the current node is not the leader, return a redirection message for write
// commands.
if (HasFlag(kCmdFlagsWrite) && !PRAFT.IsLeader()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", PRAFT.GetLeaderAddress()));
if (HasFlag(kCmdFlagsWrite) && !praft->IsLeader()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", praft->GetLeaderAddress()));
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "base_cmd.h"
#include "config.h"
#include "pikiwidb.h"
#include "store.h"

namespace pikiwidb {

Expand Down Expand Up @@ -269,7 +270,7 @@ int PClient::handlePacket(const char* start, int bytes) {
if (isPeerMaster()) {
if (isClusterCmdTarget()) {
// Proccees the packet at one turn.
int len = PRAFT.ProcessClusterCmdResponse(this, start, bytes); // @todo
int len = PSTORE.GetBackend(dbno_)->GetPRaft()->ProcessClusterCmdResponse(this, start, bytes); // @todo
if (len > 0) {
return len;
}
Expand Down Expand Up @@ -456,7 +457,7 @@ void PClient::OnConnect() {
}

if (isClusterCmdTarget()) {
PRAFT.SendNodeRequest(this);
PSTORE.GetBackend(dbno_)->GetPRaft()->SendNodeRequest(this);
}
} else {
if (g_config.password.empty()) {
Expand Down Expand Up @@ -541,7 +542,8 @@ bool PClient::isPeerMaster() const {
}

bool PClient::isClusterCmdTarget() const {
return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort();
auto praft = PSTORE.GetBackend(dbno_)->GetPRaft();
return praft->GetClusterCmdCtx().GetPeerIp() == PeerIP() && praft->GetClusterCmdCtx().GetPort() == PeerPort();
}

int PClient::uniqueID() const {
Expand Down
2 changes: 2 additions & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ enum class ClientState {
kClosed,
};

const int kChannelTimeoutMS = 200;

class DB;
struct PSlaveInfo;

Expand Down
22 changes: 13 additions & 9 deletions src/cmd_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void FlushallCmd::DoCmd(PClient* client) {
}

SelectCmd::SelectCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {}
: BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {}

bool SelectCmd::DoInitial(PClient* client) { return true; }

Expand Down Expand Up @@ -138,7 +138,10 @@ void PingCmd::DoCmd(PClient* client) { client->SetRes(CmdRes::kPong, "PONG"); }
InfoCmd::InfoCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {}

bool InfoCmd::DoInitial(PClient* client) { return true; }
bool InfoCmd::DoInitial(PClient* client) {
praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft();
return true;
}

// @todo The info raft command is only supported for the time being
void InfoCmd::DoCmd(PClient* client) {
Expand Down Expand Up @@ -175,19 +178,20 @@ void InfoCmd::InfoRaft(PClient* client) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

if (!PRAFT.IsInitialized()) {
assert(praft_);
if (!praft_->IsInitialized()) {
return client->SetRes(CmdRes::kErrOther, "Don't already cluster member");
}

auto node_status = PRAFT.GetNodeStatus();
auto node_status = praft_->GetNodeStatus();
if (node_status.state == braft::State::STATE_END) {
return client->SetRes(CmdRes::kErrOther, "Node is not initialized");
}

std::string message;
message += "raft_group_id:" + PRAFT.GetGroupID() + "\r\n";
message += "raft_node_id:" + PRAFT.GetNodeID() + "\r\n";
message += "raft_peer_id:" + PRAFT.GetPeerID() + "\r\n";
message += "raft_group_id:" + praft_->GetGroupID() + "\r\n";
message += "raft_node_id:" + praft_->GetNodeID() + "\r\n";
message += "raft_peer_id:" + praft_->GetPeerID() + "\r\n";
if (braft::is_active_state(node_status.state)) {
message += "raft_state:up\r\n";
} else {
Expand All @@ -197,9 +201,9 @@ void InfoCmd::InfoRaft(PClient* client) {
message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n";
message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n";

if (PRAFT.IsLeader()) {
if (praft_->IsLeader()) {
std::vector<braft::PeerId> peers;
auto status = PRAFT.GetListPeers(&peers);
auto status = praft_->GetListPeers(&peers);
if (!status.ok()) {
return client->SetRes(CmdRes::kErrOther, status.error_str());
}
Expand Down
3 changes: 3 additions & 0 deletions src/cmd_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ class InfoCmd : public BaseCmd {

void InfoRaft(PClient* client);
void InfoData(PClient* client);

private:
PRaft* praft_ = nullptr;
};

class CmdDebug : public BaseCmdGroup {
Expand Down
Loading
Loading