From ff2f9db6d78f35683642dfec158efa80407fefc7 Mon Sep 17 00:00:00 2001 From: longfar Date: Sun, 29 Sep 2024 17:04:16 +0800 Subject: [PATCH] feat: add pd and proxy --- ppd/main.cc | 20 +++++ ppd/pd.proto | 84 +++++++++++++++++++++ ppd/pd_service.cc | 52 +++++++++++++ ppd/pd_service.h | 37 +++++++++ pproxy/main.cc | 20 +++++ pproxy/proxy.proto | 28 +++++++ pproxy/proxy_service.cc | 35 +++++++++ pproxy/proxy_service.h | 14 ++++ pproxy/router.h | 161 ++++++++++++++++++++++++++++++++++++++++ pproxy/task_manager.cc | 84 +++++++++++++++++++++ pproxy/task_manager.h | 74 ++++++++++++++++++ 11 files changed, 609 insertions(+) create mode 100644 ppd/main.cc create mode 100644 ppd/pd.proto create mode 100644 ppd/pd_service.cc create mode 100644 ppd/pd_service.h create mode 100644 pproxy/main.cc create mode 100644 pproxy/proxy.proto create mode 100644 pproxy/proxy_service.cc create mode 100644 pproxy/proxy_service.h create mode 100644 pproxy/router.h create mode 100644 pproxy/task_manager.cc create mode 100644 pproxy/task_manager.h diff --git a/ppd/main.cc b/ppd/main.cc new file mode 100644 index 00000000..1b87a6b0 --- /dev/null +++ b/ppd/main.cc @@ -0,0 +1,20 @@ +#include "pd_service.h" + +int main(int argc, char* argv[]) { + brpc::Server server; + + PlacementDriverServiceImpl service; + if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) { + fprintf(stderr, "Fail to add service!\n"); + return -1; + } + + // 启动服务 + if (server.Start(8080, nullptr) != 0) { + fprintf(stderr, "Fail to start server!\n"); + return -1; + } + + server.RunUntilAskedToQuit(); + return 0; +} \ No newline at end of file diff --git a/ppd/pd.proto b/ppd/pd.proto new file mode 100644 index 00000000..7d3ab454 --- /dev/null +++ b/ppd/pd.proto @@ -0,0 +1,84 @@ +syntax="proto3"; +package pikiwidb; +option cc_generic_services = true; + +import "store.proto"; + +message GetClusterInfoRequest { +}; + +message GetClusterInfoResponse { + bool success = 1; + repeated Store store = 2; +}; + +message Store { + int64 store_id = 1; + string ip = 2; + int32 port = 3; + StoreState state = 4; + repeated Region region = 5; +}; + +message Region { + int64 region_id = 1; + optional string start_key = 2; + optional string end_key = 3; + repeated RegionEpoch region_epoch = 4; + repeated Peer peers = 5; +}; + +message RegionEpoch { + int64 conf_change_ver = 1; // conf change version + int64 region_ver = 2; // region version (split or merge) +}; + +enum StoreState { + UP = 0; + OFFLINE = 1; + TOMBSTONE = 2; +}; + +message CreateAllRegionsRequest { + int64 regions_count = 1; + int32 region_peers_count = 2; + repeated RegionOptions regionOptions = 3; +}; + +message CreateAllRegionsResponse { + bool success = 1; +}; + +message DeleteAllRegionsRequest { +}; + +message DeleteAllRegionsResponse { + bool success = 1; +}; + +message AddStoreRequest { + string ip = 1; + int32 port = 2; +}; + +message AddStoreResponse { + bool success = 1; + optional int64 store_id = 2; + optional string redirect = 3; +}; + +message RemoveStoreRequest { + int64 store_id = 1; +}; + +message RemoveStoreResponse { + bool success = 1; +}; + +service PlacementDriverService { + rpc CreateAllRegions(CreateAllRegionsRequest) returns (CreateAllRegionsResponse); + rpc DeleteAllRegions(DeleteAllRegionsRequest) returns (DeleteAllRegionsResponse); + rpc AddStore(AddStoreRequest) returns (AddStoreResponse); + rpc RemoveStore(RemoveStoreRequest) returns (RemoveStoreResponse); + rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); +}; \ No newline at end of file diff --git a/ppd/pd_service.cc b/ppd/pd_service.cc new file mode 100644 index 00000000..3bf016ba --- /dev/null +++ b/ppd/pd_service.cc @@ -0,0 +1,52 @@ +#include "pd_service.h" + +#include "pd_server.h" + +namespace pikiwidb { +void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::CreateAllRegionsRequest* request, + ::pikiwidb::CreateAllRegionsResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::DeleteAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::DeleteAllRegionsRequest* request, + ::pikiwidb::DeleteAllRegionsResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* controller, + const ::pikiwidb::AddStoreRequest* request, + ::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto [success, store_id] = PDSERVER.AddStore(request->ip(), request->port()); + if (!success) { + response->set_success(false); + return; + } + + response->set_success(true); + response->set_store_id(store_id); +} + +void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller, + const ::pikiwidb::RemoveStoreRequest* request, + ::pikiwidb::RemoveStoreResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::GetClusterInfo(::google::protobuf::RpcController* controller, + const ::pikiwidb::GetClusterInfoRequest* request, + ::pikiwidb::GetClusterInfoResponse* response, + ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + PDSERVER.GetClusterInfo(response); +} + +void PlacementDriverServiceImpl::OpenPDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::OpenPDSchedulingRequest* request, + ::pikiwidb::OpenPDSchedulingResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::ClosePDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::ClosePDSchedulingRequest* request, + ::pikiwidb::ClosePDSchedulingResponse* response, + ::google::protobuf::Closure* done) {} +} // namespace pikiwidb \ No newline at end of file diff --git a/ppd/pd_service.h b/ppd/pd_service.h new file mode 100644 index 00000000..282ac3bc --- /dev/null +++ b/ppd/pd_service.h @@ -0,0 +1,37 @@ +#pragma once + +#include "pd.pb.h" + +namespace pikiwidb { + +class PlacementDriverServiceImpl : public PlacementDriverService { + public: + PlacementDriverServiceImpl() = default; + + void CreateAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::CreateAllRegionsRequest* request, + ::pikiwidb::CreateAllRegionsResponse* response, ::google::protobuf::Closure* done) override; + + void DeleteAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::DeleteAllRegionsRequest* request, + ::pikiwidb::DeleteAllRegionsResponse* response, ::google::protobuf::Closure* done) override; + + void AddStore(::google::protobuf::RpcController* controller, const ::pikiwidb::AddStoreRequest* request, + ::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) override; + + void RemoveStore(::google::protobuf::RpcController* controller, const ::pikiwidb::RemoveStoreRequest* request, + ::pikiwidb::RemoveStoreResponse* response, ::google::protobuf::Closure* done) override; + + void GetClusterInfo(::google::protobuf::RpcController* controller, const ::pikiwidb::GetClusterInfoRequest* request, + ::pikiwidb::GetClusterInfoResponse* response, ::google::protobuf::Closure* done) override; + + void OpenPDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::OpenPDSchedulingRequest* request, + ::pikiwidb::OpenPDSchedulingResponse* response, ::google::protobuf::Closure* done) override; + + void ClosePDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::ClosePDSchedulingRequest* request, + ::pikiwidb::ClosePDSchedulingResponse* response, ::google::protobuf::Closure* done) override; +}; + +} // namespace pikiwidb \ No newline at end of file diff --git a/pproxy/main.cc b/pproxy/main.cc new file mode 100644 index 00000000..9304fcb5 --- /dev/null +++ b/pproxy/main.cc @@ -0,0 +1,20 @@ +#include "proxy_service.h" + +int main(int argc, char* argv[]) { + brpc::Server server; + + ProxyServiceImpl service; + if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) { + fprintf(stderr, "Fail to add service!\n"); + return -1; + } + + // 启动服务 + if (server.Start(8080, nullptr) != 0) { + fprintf(stderr, "Fail to start server!\n"); + return -1; + } + + server.RunUntilAskedToQuit(); + return 0; +} \ No newline at end of file diff --git a/pproxy/proxy.proto b/pproxy/proxy.proto new file mode 100644 index 00000000..c899cfd8 --- /dev/null +++ b/pproxy/proxy.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; +package pikiwidb.proxy; +option cc_generic_services = true; + +// 定义请求和响应 +message RunCommandRequest { + string command = 1; +} + +message RunCommandResponse { + string output = 1; +} +message GetRouteInfoRequest { +} +message GetRouteInfoResponse { + message RouteInfo { + string group_id = 1; + string endpoint = 2; + int32 role = 3; + } + repeated RouteInfo infos = 1; +} + +// 定义服务 +service ProxyService { + rpc RunCommand(RunCommandRequest) returns (RunCommandResponse); + rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse); +} diff --git a/pproxy/proxy_service.cc b/pproxy/proxy_service.cc new file mode 100644 index 00000000..2bb4c4a3 --- /dev/null +++ b/pproxy/proxy_service.cc @@ -0,0 +1,35 @@ +#include "proxy_service.h" + +namespace pikiwidb::proxy { +void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl, + const pikiwidb::proxy::RunCommandRequest* request, + pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) { + std::string command = request->command(); + std::string output = ExecuteCommand(command); + + response->set_output(output); + + done->Run(); +} +void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl, + const pikiwidb::proxy::GetRouteInfoRequest* request, + pikiwidb::proxy::GetRouteInfoResponse* response, + ::google::protobuf::Closure* done) { +} + +std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) { + std::array buffer; + std::string result; + + // 使用 popen 执行命令 + std::unique_ptr pipe(popen(command.c_str(), "r"), pclose); + if (!pipe) { + return "popen() failed!"; + } + while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) { + result += buffer.data(); + } + return result; +} + +} // namespace pikiwidb::proxy \ No newline at end of file diff --git a/pproxy/proxy_service.h b/pproxy/proxy_service.h new file mode 100644 index 00000000..3b6be94d --- /dev/null +++ b/pproxy/proxy_service.h @@ -0,0 +1,14 @@ +#pragma once + +#include "proxy.pb.h" + +class ProxyServiceImpl : public ProxyService { + public: + void RunCommand(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::RunCommandRequest* request, + pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) override; + void GetRouteInfo(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::GetRouteInfoRequest* request, + pikiwidb::proxy::GetRouteInfoResponse* response, ::google::protobuf::Closure* done) override; + + private: + std::string ExecuteCommand(const std::string& command); +}; diff --git a/pproxy/router.h b/pproxy/router.h new file mode 100644 index 00000000..a9981c88 --- /dev/null +++ b/pproxy/router.h @@ -0,0 +1,161 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +class Router { + public: + Router() { + // maximum 100 parameters + params.reserve(100); + } + + Router *add(const char *method, const char *pattern, std::function &)> handler) { + // step over any initial slash + if (pattern[0] == '/') { + pattern++; + } + + std::vector nodes; + // nodes.push_back(method); + + const char *stop; + const char *start = pattern; + const char *end_ptr = pattern + strlen(pattern); + do { + stop = getNextSegment(start, end_ptr); + + // std::cout << "Segment(" << std::string(start, stop - start) << ")" << std::endl; + + nodes.emplace_back(start, stop - start); + + start = stop + 1; + } while (stop != end_ptr); + + // if pattern starts with / then move 1+ and run inline slash parser + + add(nodes, handlers.size()); + handlers.push_back(handler); + + compile(); + return this; + } + + void compile() { + compiled_tree.clear(); + compile_tree(tree); + } + + void route(const char *method, unsigned int method_length, const char *url, unsigned int url_length, userData) { + handlers[lookup(url, url_length)](userData, params); + params.clear(); + } + + private: + std::vector &)>> handlers; + std::vector params; + + struct Node { + std::string name; + std::map children; + short handler; + }; + + Node *tree = new Node({"GET", {}, -1}); + std::string compiled_tree; + + void add(const std::vector &route, short handler) { + Node *parent = tree; + for (const std::string &node : route) { + if (parent->children.find(node) == parent->children.end()) { + parent->children[node] = new Node({node, {}, handler}); + } + parent = parent->children[node]; + } + } + + unsigned short compile_tree(Node *n) { + unsigned short nodeLength = 6 + n->name.length(); + for (const auto &c : n->children) { + nodeLength += compile_tree(c.second); + } + + unsigned short nodeNameLength = n->name.length(); + + std::string compiledNode; + compiledNode.append(reinterpret_cast(&nodeLength), sizeof(nodeLength)); + compiledNode.append(reinterpret_cast(&nodeNameLength), sizeof(nodeNameLength)); + compiledNode.append(reinterpret_cast(&n->handler), sizeof(n->handler)); + compiledNode.append(n->name.data(), n->name.length()); + + compiled_tree = compiledNode + compiled_tree; + return nodeLength; + } + + inline const char *find_node(const char *parent_node, const char *name, int name_length) { + unsigned short nodeLength = *(unsigned short *)&parent_node[0]; + unsigned short nodeNameLength = *(unsigned short *)&parent_node[2]; + + // std::cout << "Finding node: <" << std::string(name, name_length) << ">" << std::endl; + + const char *stoppp = parent_node + nodeLength; + for (const char *candidate = parent_node + 6 + nodeNameLength; candidate < stoppp;) { + unsigned short nodeLength = *(unsigned short *)&candidate[0]; + unsigned short nodeNameLength = *(unsigned short *)&candidate[2]; + + // whildcard, parameter, equal + if (nodeNameLength == 0) { + return candidate; + } else if (candidate[6] == ':') { + // parameter + + // todo: push this pointer on the stack of args! + params.push_back(std::string_view({name, static_cast(name_length)})); + + return candidate; + } else if (nodeNameLength == name_length && !memcmp(candidate + 6, name, name_length)) { + return candidate; + } + + candidate = candidate + nodeLength; + } + + return nullptr; + } + + // returns next slash from start or end + inline const char *getNextSegment(const char *start, const char *end) { + const char *stop = static_cast(memchr(start, '/', end - start)); + return stop ? stop : end; + } + + // should take method also! + inline int lookup(const char *url, int length) { + // all urls start with / + url++; + length--; + + const char *treeStart = static_cast(compiled_tree.data()); + + const char *stop; + const char *start = url; + const char *end_ptr = url + length; + do { + stop = getNextSegment(start, end_ptr); + + // std::cout << "Matching(" << std::string(start, stop - start) << ")" << std::endl; + + if (nullptr == (treeStart = find_node(treeStart, start, stop - start))) { + return -1; + } + + start = stop + 1; + } while (stop != end_ptr); + + return *(short *)&treeStart[4]; + } +}; \ No newline at end of file diff --git a/pproxy/task_manager.cc b/pproxy/task_manager.cc new file mode 100644 index 00000000..65207950 --- /dev/null +++ b/pproxy/task_manager.cc @@ -0,0 +1,84 @@ +#include "task_manager.h" + +#include +#include + +Task::Status Task::TextToStatus(const std::string& input) { + if (input == "pending") { + return Task::pending; + } else if (input == "completed") { + return Task::completed; + } else if (input == "deleted") { + return Task::deleted; + } else if (input == "recurring") { + return Task::recurring; + } else if (input == "waiting") { + return Task::waiting; + } + + return Task::pending; +} +std::string Task::StatusToText(Status s) { + if (s == Task::pending) { + return "pending"; + } else if (s == Task::completed) { + return "completed"; + } else if (s == Task::deleted) { + return "deleted"; + } else if (s == Task::recurring) { + return "recurring"; + } else if (s == Task::waiting) { + return "waiting"; + } + return "pending"; +} + +TaskManager::TaskManager(std::shared_ptr threadpool, size_t maxWorkers) + : _threadpool(std::move(threadpool)), _maxWorkers(maxWorkers) {} + +std::future TaskManager::stop() { + auto task = std::make_shared>([this] { + std::unique_lock guard(_mutex); + bool isLast = _workerCount == 1; + + // Guarantee that the task finishes last. + while (!isLast) { + guard.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + guard.lock(); + isLast = _workerCount == 1; + } + }); + auto future = task->get_future(); + + // Adding a new task and expecting the future guarantees that the last batch of tasks is being executed. + auto functor = [task = std::move(task)]() mutable { (*task)(); }; + std::lock_guard guard(_mutex); + + _stopped = true; + _tasks.emplace(std::move(functor)); + this->processTasks(); + + return future; +} + +void TaskManager::addTask(std::function functor) { + std::lock_guard guard(_mutex); + + if (_stopped) { + return; + } + _tasks.emplace(std::move(functor), Clock::now()); + this->processTasks(); +} + +void TaskManager::processTasks() { + if (_tasks.empty() || _workerCount == _maxWorkers) { + return; + } + auto task = std::move(_tasks.front()); + _tasks.pop(); + + ++_workerCount; + _threadpool->execute(std::move(task)); +} \ No newline at end of file diff --git a/pproxy/task_manager.h b/pproxy/task_manager.h new file mode 100644 index 00000000..ad1857d2 --- /dev/null +++ b/pproxy/task_manager.h @@ -0,0 +1,74 @@ +#pragma once + +#include +#include +#include +#include +#include + +class Task { + public: + Task(); + Task(const Task&); + Task& operator=(const Task&); + bool operator==(const Task&); + Task(const std::string&); + ~Task(); + + enum Status { pending, completed, deleted, recurring, waiting }; + + static Status TextToStatus(const std::string& input); + static std::string StatusToText(Status s); + + void SetEntry(); + + Status GetStatus() const; + void SetStatus(Status); + + private: + int determineVersion(const std::string&); + void legacyParse(const std::string&); + int id; +}; + +class TaskManager { + public: + TaskManager(std::shared_ptr threadpool, size_t maxWorkers); + + std::future Stop(); + + template + auto Push(F&& function, Args&&... args) // + -> std::future::type> { + using ReturnType = typename std::result_of::type; + + auto task = std::make_shared>( // + std::bind(std::forward(function), std::forward(args)...)); + auto future = task->get_future(); + + auto functor = [this, task = std::move(task)]() mutable { + (*task)(); + { + std::lock_guard guard(_mutex); + + --_workerCount; + this->processTasks(); + } + }; + this->addTask(std::move(functor)); + + return future; + } + + private: + void addTask(std::function functor); + void processTasks(); + + private: + std::shared_ptr<::Threadpool> _threadpool; + std::queue _tasks; + std::mutex _mutex; + size_t _maxWorkers; + size_t _workerCount{0}; + bool _stopped{false}; +}; \ No newline at end of file