Skip to content

Commit

Permalink
feat: add pd and proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
longfar-ncy committed Sep 29, 2024
1 parent bad54e1 commit ff2f9db
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 0 deletions.
20 changes: 20 additions & 0 deletions ppd/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "pd_service.h"

int main(int argc, char* argv[]) {
brpc::Server server;

PlacementDriverServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
fprintf(stderr, "Fail to add service!\n");
return -1;
}

// 启动服务
if (server.Start(8080, nullptr) != 0) {
fprintf(stderr, "Fail to start server!\n");
return -1;
}

server.RunUntilAskedToQuit();
return 0;
}
84 changes: 84 additions & 0 deletions ppd/pd.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
syntax="proto3";
package pikiwidb;
option cc_generic_services = true;

import "store.proto";

message GetClusterInfoRequest {
};

message GetClusterInfoResponse {
bool success = 1;
repeated Store store = 2;
};

message Store {
int64 store_id = 1;
string ip = 2;
int32 port = 3;
StoreState state = 4;
repeated Region region = 5;
};

message Region {
int64 region_id = 1;
optional string start_key = 2;
optional string end_key = 3;
repeated RegionEpoch region_epoch = 4;
repeated Peer peers = 5;
};

message RegionEpoch {
int64 conf_change_ver = 1; // conf change version
int64 region_ver = 2; // region version (split or merge)
};

enum StoreState {
UP = 0;
OFFLINE = 1;
TOMBSTONE = 2;
};

message CreateAllRegionsRequest {
int64 regions_count = 1;
int32 region_peers_count = 2;
repeated RegionOptions regionOptions = 3;
};

message CreateAllRegionsResponse {
bool success = 1;
};

message DeleteAllRegionsRequest {
};

message DeleteAllRegionsResponse {
bool success = 1;
};

message AddStoreRequest {
string ip = 1;
int32 port = 2;
};

message AddStoreResponse {
bool success = 1;
optional int64 store_id = 2;
optional string redirect = 3;
};

message RemoveStoreRequest {
int64 store_id = 1;
};

message RemoveStoreResponse {
bool success = 1;
};

service PlacementDriverService {
rpc CreateAllRegions(CreateAllRegionsRequest) returns (CreateAllRegionsResponse);
rpc DeleteAllRegions(DeleteAllRegionsRequest) returns (DeleteAllRegionsResponse);
rpc AddStore(AddStoreRequest) returns (AddStoreResponse);
rpc RemoveStore(RemoveStoreRequest) returns (RemoveStoreResponse);
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
};
52 changes: 52 additions & 0 deletions ppd/pd_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include "pd_service.h"

#include "pd_server.h"

namespace pikiwidb {
void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
auto [success, store_id] = PDSERVER.AddStore(request->ip(), request->port());
if (!success) {
response->set_success(false);
return;
}

response->set_success(true);
response->set_store_id(store_id);
}

void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::GetClusterInfo(::google::protobuf::RpcController* controller,
const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
PDSERVER.GetClusterInfo(response);
}

void PlacementDriverServiceImpl::OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response,
::google::protobuf::Closure* done) {}
} // namespace pikiwidb
37 changes: 37 additions & 0 deletions ppd/pd_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include "pd.pb.h"

namespace pikiwidb {

class PlacementDriverServiceImpl : public PlacementDriverService {
public:
PlacementDriverServiceImpl() = default;

void CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void AddStore(::google::protobuf::RpcController* controller, const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) override;

void RemoveStore(::google::protobuf::RpcController* controller, const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response, ::google::protobuf::Closure* done) override;

void GetClusterInfo(::google::protobuf::RpcController* controller, const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response, ::google::protobuf::Closure* done) override;

void OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response, ::google::protobuf::Closure* done) override;

void ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response, ::google::protobuf::Closure* done) override;
};

} // namespace pikiwidb
20 changes: 20 additions & 0 deletions pproxy/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "proxy_service.h"

int main(int argc, char* argv[]) {
brpc::Server server;

ProxyServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
fprintf(stderr, "Fail to add service!\n");
return -1;
}

// 启动服务
if (server.Start(8080, nullptr) != 0) {
fprintf(stderr, "Fail to start server!\n");
return -1;
}

server.RunUntilAskedToQuit();
return 0;
}
28 changes: 28 additions & 0 deletions pproxy/proxy.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto3";
package pikiwidb.proxy;
option cc_generic_services = true;

// 定义请求和响应
message RunCommandRequest {
string command = 1;
}

message RunCommandResponse {
string output = 1;
}
message GetRouteInfoRequest {
}
message GetRouteInfoResponse {
message RouteInfo {
string group_id = 1;
string endpoint = 2;
int32 role = 3;
}
repeated RouteInfo infos = 1;
}

// 定义服务
service ProxyService {
rpc RunCommand(RunCommandRequest) returns (RunCommandResponse);
rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse);
}
35 changes: 35 additions & 0 deletions pproxy/proxy_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "proxy_service.h"

namespace pikiwidb::proxy {
void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::RunCommandRequest* request,
pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) {
std::string command = request->command();
std::string output = ExecuteCommand(command);

response->set_output(output);

done->Run();
}
void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::GetRouteInfoRequest* request,
pikiwidb::proxy::GetRouteInfoResponse* response,
::google::protobuf::Closure* done) {
}

std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) {
std::array<char, 128> buffer;
std::string result;

// 使用 popen 执行命令
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(command.c_str(), "r"), pclose);
if (!pipe) {
return "popen() failed!";
}
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) {
result += buffer.data();
}
return result;
}

} // namespace pikiwidb::proxy
14 changes: 14 additions & 0 deletions pproxy/proxy_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include "proxy.pb.h"

class ProxyServiceImpl : public ProxyService {
public:
void RunCommand(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::RunCommandRequest* request,
pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) override;
void GetRouteInfo(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::GetRouteInfoRequest* request,
pikiwidb::proxy::GetRouteInfoResponse* response, ::google::protobuf::Closure* done) override;

private:
std::string ExecuteCommand(const std::string& command);
};
Loading

0 comments on commit ff2f9db

Please sign in to comment.