From 59cab56754f1448c772a9dc1fd8be984c6668c20 Mon Sep 17 00:00:00 2001 From: Sainath Singineedi Date: Tue, 10 Dec 2024 00:46:33 +0530 Subject: [PATCH] example: raft-kv-memstore-grpc --- Cargo.toml | 1 + examples/raft-kv-memstore-grpc/.gitignore | 5 + examples/raft-kv-memstore-grpc/Cargo.toml | 48 ++++ examples/raft-kv-memstore-grpc/README.md | 128 ++++++++++ examples/raft-kv-memstore-grpc/build.rs | 12 + .../proto/api_service.proto | 30 +++ .../proto/internal_service.proto | 24 ++ .../proto/management_service.proto | 51 ++++ .../raft-kv-memstore-grpc/src/bin/main.rs | 78 ++++++ .../src/grpc/api_service.rs | 115 +++++++++ .../src/grpc/internal_service.rs | 147 +++++++++++ .../src/grpc/management_service.rs | 157 ++++++++++++ .../raft-kv-memstore-grpc/src/grpc/mod.rs | 3 + examples/raft-kv-memstore-grpc/src/lib.rs | 57 +++++ .../raft-kv-memstore-grpc/src/network/mod.rs | 102 ++++++++ .../raft-kv-memstore-grpc/src/store/mod.rs | 231 ++++++++++++++++++ examples/raft-kv-memstore-grpc/src/test.rs | 23 ++ .../raft-kv-memstore-grpc/test-cluster.sh | 145 +++++++++++ 18 files changed, 1357 insertions(+) create mode 100644 examples/raft-kv-memstore-grpc/.gitignore create mode 100644 examples/raft-kv-memstore-grpc/Cargo.toml create mode 100644 examples/raft-kv-memstore-grpc/README.md create mode 100644 examples/raft-kv-memstore-grpc/build.rs create mode 100644 examples/raft-kv-memstore-grpc/proto/api_service.proto create mode 100644 examples/raft-kv-memstore-grpc/proto/internal_service.proto create mode 100644 examples/raft-kv-memstore-grpc/proto/management_service.proto create mode 100644 examples/raft-kv-memstore-grpc/src/bin/main.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/api_service.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/management_service.rs create mode 100644 examples/raft-kv-memstore-grpc/src/grpc/mod.rs create mode 100644 examples/raft-kv-memstore-grpc/src/lib.rs create mode 100644 examples/raft-kv-memstore-grpc/src/network/mod.rs create mode 100644 examples/raft-kv-memstore-grpc/src/store/mod.rs create mode 100644 examples/raft-kv-memstore-grpc/src/test.rs create mode 100755 examples/raft-kv-memstore-grpc/test-cluster.sh diff --git a/Cargo.toml b/Cargo.toml index 7074e73db..d02520949 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ exclude = [ "cluster_benchmark", "examples/memstore", "examples/raft-kv-memstore", + "examples/raft-kv-memstore-grpc", "examples/raft-kv-memstore-singlethreaded", "examples/raft-kv-memstore-network-v2", "examples/raft-kv-memstore-opendal-snapshot-data", diff --git a/examples/raft-kv-memstore-grpc/.gitignore b/examples/raft-kv-memstore-grpc/.gitignore new file mode 100644 index 000000000..cb4025390 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/.gitignore @@ -0,0 +1,5 @@ +target +vendor +.idea + +/*.log diff --git a/examples/raft-kv-memstore-grpc/Cargo.toml b/examples/raft-kv-memstore-grpc/Cargo.toml new file mode 100644 index 000000000..d7df913af --- /dev/null +++ b/examples/raft-kv-memstore-grpc/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "raft-kv-memstore-grpc" +version = "0.1.0" +readme = "README.md" + +edition = "2021" +authors = [ + "Sainath Singineedi ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/databendlabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/databendlabs/openraft" + +[[bin]] +name = "raft-key-value" +path = "src/bin/main.rs" + +[dependencies] +memstore = { path = "../memstore", features = [] } +openraft = { path = "../../openraft", features = ["serde", "type-alias"] } + +clap = { version = "4.1.11", features = ["derive", "env"] } +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tokio = { version = "1.0", default-features = false, features = ["sync"] } +tracing = "0.1.29" +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } +tonic = "0.12.3" +tonic-build = "0.12.3" +bincode = "1.3.3" +dashmap = "6.1.0" +prost = "0.13.4" + +[dev-dependencies] +anyhow = "1.0.63" +maplit = "1.0.2" + +[features] + +[build-dependencies] +prost-build = "0.13.4" +tonic-build = "0.12.3" + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-memstore-grpc/README.md b/examples/raft-kv-memstore-grpc/README.md new file mode 100644 index 000000000..cb4e4d1f7 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/README.md @@ -0,0 +1,128 @@ +# Example distributed key-value store built upon openraft. + +It is an example of how to build a real-world key-value store with `openraft`. +Includes: +- An in-memory `RaftLogStorage` and `RaftStateMachine` implementation [store](./src/store/store.rs). + +- A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2). + Includes: + - raft-internal network APIs for replication and voting. + - Admin APIs to add nodes, change-membership etc. + - Application APIs to write a value by key or read a value by key. + +- Client and `RaftNetwork`([rpc](./src/network/raft_network_impl)) are built upon [reqwest](https://docs.rs/reqwest). + + [ExampleClient](./src/client.rs) is a minimal raft client in rust to talk to a raft cluster. + - It includes application API `write()` and `read()`, and administrative API `init()`, `add_learner()`, `change_membership()`, `metrics()`. + - This client tracks the last known leader id, a write operation(such as `write()` or `change_membership()`) will be redirected to the leader on client side. + +## Run it + +There is a example in bash script and an example in rust: + +- [test-cluster.sh](./test-cluster.sh) shows a simulation of 3 nodes running and sharing data, + It only uses `curl` and shows the communication between a client and the cluster in plain HTTP messages. + You can run the cluster demo with: + + ```shell + ./test-cluster.sh + ``` + +- [test_cluster.rs](./tests/cluster/test_cluster.rs) does almost the same as `test-cluster.sh` but in rust + with the `ExampleClient`. + + Run it with `cargo test`. + + +if you want to compile the application, run: + +```shell +cargo build +``` + +(If you append `--release` to make it compile in production, but we don't recommend to use +this project in production yet.) + +## What the test script does + +To run it, get the binary `raft-key-value` inside `target/debug` and run: + +```shell +./raft-key-value --id 1 --http-addr 127.0.0.1:21001 +``` + +It will start a node. + +To start the following nodes: + +```shell +./raft-key-value --id 2 --http-addr 127.0.0.1:21002 +``` + +You can continue replicating the nodes by changing the `id` and `http-addr`. + +After that, call the first node created: + +``` +POST - 127.0.0.1:21001/init +``` + +It will define the first node created as the leader. + +Then you need to inform to the leader that these nodes are learners: + +``` +POST - 127.0.0.1:21001/add-learner '[2, "127.0.0.1:21002"]' +POST - 127.0.0.1:21001/add-learner '[3, "127.0.0.1:21003"]' +``` + +Now you need to tell the leader to add all learners as members of the cluster: + +``` +POST - 127.0.0.1:21001/change-membership "[1, 2, 3]" +``` + +Write some data in any of the nodes: + +``` +POST - 127.0.0.1:21001/write "{"Set":{"key":"foo","value":"bar"}}" +``` + +Read the data from any node: + +``` +POST - 127.0.0.1:21002/read "foo" +``` + +You should be able to read that on the another instance even if you did not sync any data! + + +## How it's structured. + +The application is separated in 4 modules: + + - `bin`: You can find the `main()` function in [main](./src/bin/main.rs) the file where the setup for the server happens. + - `network`: You can find the [api](./src/network/api.rs) that implements the endpoints used by the public API and [rpc](./src/network/raft_network_impl) where all the raft communication from the node happens. [management](./src/network/management.rs) is where all the administration endpoints are present, those are used to add orremove nodes, promote and more. [raft](./src/network/raft.rs) is where all the communication are received from other nodes. + - `store`: You can find the file [store](./src/store/mod.rs) where all the key-value implementation is done. Here is where your data application will be managed. + +## Where is my data? + +The data is store inside state machines, each state machine represents a point of data and +raft enforces that all nodes have the same data in synchronization. You can have a look of +the struct [ExampleStateMachine](./src/store/mod.rs) + +## Cluster management + +The raft itself does not store node addresses. +But in a real-world application, the implementation of `RaftNetwork` needs to know the addresses. + +Thus, in this example application: + +- The storage layer has to store nodes' information. +- The network layer keeps a reference to the store so that it is able to get the address of a target node to send RPC to. + +To add a node to a cluster, it includes 3 steps: + +- Write a `node` through raft protocol to the storage. +- Add the node as a `Learner` to let it start receiving replication data from the leader. +- Invoke `change-membership` to change the learner node to a member. diff --git a/examples/raft-kv-memstore-grpc/build.rs b/examples/raft-kv-memstore-grpc/build.rs new file mode 100644 index 000000000..e1cd6302a --- /dev/null +++ b/examples/raft-kv-memstore-grpc/build.rs @@ -0,0 +1,12 @@ +fn main() -> Result<(), Box> { + println!("cargo:rerun-if-changed=src/*"); + tonic_build::configure().compile_protos( + &[ + "proto/internal_service.proto", + "proto/management_service.proto", + "proto/api_service.proto", + ], + &["proto"], + )?; + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/proto/api_service.proto b/examples/raft-kv-memstore-grpc/proto/api_service.proto new file mode 100644 index 000000000..4d46c00c3 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/proto/api_service.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; +package openraftpb; + +// ApiService provides the key-value store API operations +service ApiService { + // Get retrieves the value associated with a given key + rpc Get(GetRequest) returns (GetResponse) {} + + // Set stores a key-value pair in the distributed store + rpc Set(SetRequest) returns (SetResponse) {} +} + +// GetRequest represents a key lookup request +message GetRequest { + string key = 1; // Key to look up +} + +// GetResponse contains the value associated with the requested key +message GetResponse { + string value = 1; // Retrieved value +} + +// SetRequest represents a key-value pair to be stored +message SetRequest { + string key = 1; // Key to store + string value = 2; // Value to associate with the key +} + +// SetResponse indicates the result of a Set operation +message SetResponse {} diff --git a/examples/raft-kv-memstore-grpc/proto/internal_service.proto b/examples/raft-kv-memstore-grpc/proto/internal_service.proto new file mode 100644 index 000000000..1dd382157 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/proto/internal_service.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; +package openraftpb; + +// InternalService handles internal Raft cluster communication +service InternalService { + // Vote handles vote requests between Raft nodes during leader election + rpc Vote(RaftRequestBytes) returns (RaftReplyBytes) {} + + // Append handles call related to append entries RPC + rpc Append(RaftRequestBytes) returns (RaftReplyBytes) {} + + // Snapshot handles install snapshot RPC + rpc Snapshot(RaftRequestBytes) returns (RaftReplyBytes) {} +} + +// RaftRequestBytes encapsulates binary Raft request data +message RaftRequestBytes { + bytes value = 1; // Serialized Raft request data +} + +// RaftReplyBytes encapsulates binary Raft response data +message RaftReplyBytes { + bytes value = 1; // Serialized Raft response data +} diff --git a/examples/raft-kv-memstore-grpc/proto/management_service.proto b/examples/raft-kv-memstore-grpc/proto/management_service.proto new file mode 100644 index 000000000..265887e7c --- /dev/null +++ b/examples/raft-kv-memstore-grpc/proto/management_service.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; +package openraftpb; + +// ManagementService handles Raft cluster management operations +service ManagementService { + // Init initializes a new Raft cluster with the given nodes + rpc Init(InitRequest) returns (RaftReplyString) {} + + // AddLearner adds a new learner node to the Raft cluster + rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {} + + // ChangeMembership modifies the cluster membership configuration + rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {} + + // Metrics retrieves cluster metrics and status information + rpc Metrics(RaftRequestString) returns (RaftReplyString) {} +} + +// InitRequest contains the initial set of nodes for cluster initialization +message InitRequest { + repeated Node nodes = 1; // List of initial cluster nodes +} + +// Node represents a single node in the Raft cluster +message Node { + string rpc_addr = 1; // RPC address for node communication + uint64 node_id = 2; // Unique identifier for the node +} + +// AddLearnerRequest specifies parameters for adding a learner node +message AddLearnerRequest { + Node node = 1; // Node to be added as a learner + bool blocking = 2; // Whether to wait for the operation to complete +} + +// RaftRequestString represents a string-based Raft request +message RaftRequestString { + string data = 1; // Request data in string format +} + +// RaftReplyString represents a string-based Raft response +message RaftReplyString { + string data = 1; // Response data + string error = 2; // Error message, if any +} + +// ChangeMembershipRequest specifies parameters for modifying cluster membership +message ChangeMembershipRequest { + repeated uint64 members = 1; // New set of member node IDs + bool retain = 2; // Whether to retain existing configuration +} diff --git a/examples/raft-kv-memstore-grpc/src/bin/main.rs b/examples/raft-kv-memstore-grpc/src/bin/main.rs new file mode 100644 index 000000000..4aeca2df3 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/bin/main.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; + +use clap::Parser; +use openraft::Config; +use raft_kv_memstore_grpc::grpc::api_service::ApiServiceImpl; +use raft_kv_memstore_grpc::grpc::internal_service::InternalServiceImpl; +use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl; +use raft_kv_memstore_grpc::network::Network; +use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer; +use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer; +use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer; +use raft_kv_memstore_grpc::LogStore; +use raft_kv_memstore_grpc::Raft; +use raft_kv_memstore_grpc::StateMachineStore; +use tonic::transport::Server; +use tracing::info; + +#[derive(Parser, Clone, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Opt { + #[clap(long)] + pub id: u64, + + #[clap(long)] + /// Network address to bind the server to (e.g., "127.0.0.1:50051") + pub addr: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing first, before any logging happens + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_file(true) + .with_line_number(true) + .init(); + + // Parse the parameters passed by arguments. + let options = Opt::parse(); + let node_id = options.id; + let addr = options.addr; + + // Create a configuration for the raft instance. + let config = Arc::new( + Config { + heartbeat_interval: 500, + election_timeout_min: 1500, + election_timeout_max: 3000, + ..Default::default() + } + .validate()?, + ); + + // Create stores and network + let log_store = LogStore::default(); + let state_machine_store = Arc::new(StateMachineStore::default()); + let network = Network {}; + + // Create Raft instance + let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?; + + // Create the management service with raft instance + let management_service = ManagementServiceImpl::new(raft.clone()); + let internal_service = InternalServiceImpl::new(raft.clone()); + let api_service = ApiServiceImpl::new(raft, state_machine_store); + + // Start server + let server_future = Server::builder() + .add_service(ManagementServiceServer::new(management_service)) + .add_service(InternalServiceServer::new(internal_service)) + .add_service(ApiServiceServer::new(api_service)) + .serve(addr.parse()?); + + info!("Node {node_id} starting server at {addr}"); + server_future.await?; + + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs new file mode 100644 index 000000000..35ad5931a --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/api_service.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; + +use openraft::Raft; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tracing::debug; + +use crate::protobuf::api_service_server::ApiService; +use crate::protobuf::GetRequest; +use crate::protobuf::GetResponse; +use crate::protobuf::SetRequest; +use crate::protobuf::SetResponse; +use crate::store::Request as StoreRequest; +use crate::store::StateMachineStore; +use crate::TypeConfig; + +/// External API service implementation providing key-value store operations. +/// This service handles client requests for getting and setting values in the distributed store. +/// +/// # Responsibilities +/// - Handle key-value get operations +/// - Handle key-value set operations +/// - Ensure consistency through Raft consensus +/// +/// # Protocol Safety +/// This service implements the client-facing API and should validate all inputs +/// before processing them through the Raft consensus protocol. +pub struct ApiServiceImpl { + /// The Raft node instance for consensus operations + raft_node: Raft, + /// The state machine store for direct reads + state_machine_store: Arc, +} + +impl ApiServiceImpl { + /// Creates a new instance of the API service + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will use + /// * `state_machine_store` - The state machine store for reading data + pub fn new(raft_node: Raft, state_machine_store: Arc) -> Self { + ApiServiceImpl { + raft_node, + state_machine_store, + } + } + + /// Validates a key-value request + fn validate_request(&self, key: &str, value: Option<&str>) -> Result<(), Status> { + if key.is_empty() { + return Err(Status::internal("Key cannot be empty")); + } + if let Some(val) = value { + if val.is_empty() { + return Err(Status::internal("Value cannot be empty")); + } + } + Ok(()) + } +} + +#[tonic::async_trait] +impl ApiService for ApiServiceImpl { + /// Sets a value for a given key in the distributed store + /// + /// # Arguments + /// * `request` - Contains the key and value to set + /// + /// # Returns + /// * `Ok(Response)` - Success response after the value is set + /// * `Err(Status)` - Error status if the set operation fails + async fn set(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + debug!("Processing set request for key: {}", req.key); + + self.validate_request(&req.key, Some(&req.value))?; + + self.raft_node + .client_write(StoreRequest::Set { + key: req.key.clone(), + value: req.value.clone(), + }) + .await + .map_err(|e| Status::internal(format!("Failed to write to store: {}", e)))?; + + debug!("Successfully set value for key: {}", req.key); + Ok(Response::new(SetResponse {})) + } + + /// Gets a value for a given key from the distributed store + /// + /// # Arguments + /// * `request` - Contains the key to retrieve + /// + /// # Returns + /// * `Ok(Response)` - Success response containing the value + /// * `Err(Status)` - Error status if the get operation fails + async fn get(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + debug!("Processing get request for key: {}", req.key); + + self.validate_request(&req.key, None)?; + + let sm = self.state_machine_store.state_machine.read().await; + let value = sm + .data + .get(&req.key) + .ok_or_else(|| Status::internal(format!("Key not found: {}", req.key)))? + .to_string(); + + debug!("Successfully retrieved value for key: {}", req.key); + Ok(Response::new(GetResponse { value })) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs new file mode 100644 index 000000000..638ecf479 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs @@ -0,0 +1,147 @@ +use bincode::deserialize; +use bincode::serialize; +use openraft::Raft; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tracing::debug; + +use crate::protobuf::internal_service_server::InternalService; +use crate::protobuf::RaftReplyBytes; +use crate::protobuf::RaftRequestBytes; +use crate::TypeConfig; + +/// Internal gRPC service implementation for Raft protocol communications. +/// This service handles the core Raft consensus protocol operations between cluster nodes. +/// +/// # Responsibilities +/// - Vote requests/responses during leader election +/// - Log replication between nodes +/// - Snapshot installation for state synchronization +/// +/// # Protocol Safety +/// This service implements critical consensus protocol operations and should only be +/// exposed to other trusted Raft cluster nodes, never to external clients. +pub struct InternalServiceImpl { + /// The local Raft node instance that this service operates on + raft_node: Raft, +} + +impl InternalServiceImpl { + /// Creates a new instance of the internal service + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will operate on + pub fn new(raft_node: Raft) -> Self { + InternalServiceImpl { raft_node } + } + + /// Helper function to deserialize request bytes + fn deserialize_request serde::Deserialize<'a>>(value: &[u8]) -> Result { + deserialize(value).map_err(|e| Status::internal(format!("Failed to deserialize request: {}", e))) + } + + /// Helper function to serialize response + fn serialize_response(value: T) -> Result, Status> { + serialize(&value).map_err(|e| Status::internal(format!("Failed to serialize response: {}", e))) + } + + /// Helper function to create a standard response + fn create_response(value: T) -> Result, Status> { + let value = Self::serialize_response(value)?; + Ok(Response::new(RaftReplyBytes { value })) + } +} + +#[tonic::async_trait] +impl InternalService for InternalServiceImpl { + /// Handles vote requests during leader election. + /// + /// # Arguments + /// * `request` - The vote request containing candidate information + /// + /// # Returns + /// * `Ok(Response)` - Vote response indicating whether the vote was granted + /// * `Err(Status)` - Error status if the vote operation fails + /// + /// # Protocol Details + /// This implements the RequestVote RPC from the Raft protocol. + /// Nodes vote for candidates based on log completeness and term numbers. + async fn vote(&self, request: Request) -> Result, Status> { + debug!("Processing vote request"); + let req = request.into_inner(); + + // Deserialize the vote request + let vote_req = Self::deserialize_request(&req.value)?; + + // Process the vote request + let vote_resp = self + .raft_node + .vote(vote_req) + .await + .map_err(|e| Status::internal(format!("Vote operation failed: {}", e)))?; + + debug!("Vote request processed successfully"); + Self::create_response(vote_resp) + } + + /// Handles append entries requests for log replication. + /// + /// # Arguments + /// * `request` - The append entries request containing log entries to replicate + /// + /// # Returns + /// * `Ok(Response)` - Response indicating success/failure of the append operation + /// * `Err(Status)` - Error status if the append operation fails + /// + /// # Protocol Details + /// This implements the AppendEntries RPC from the Raft protocol. + /// Used for both log replication and as heartbeat mechanism. + async fn append(&self, request: Request) -> Result, Status> { + debug!("Processing append entries request"); + let req = request.into_inner(); + + // Deserialize the append request + let append_req = Self::deserialize_request(&req.value)?; + + // Process the append request + let append_resp = self + .raft_node + .append_entries(append_req) + .await + .map_err(|e| Status::internal(format!("Append entries operation failed: {}", e)))?; + + debug!("Append entries request processed successfully"); + Self::create_response(append_resp) + } + + /// Handles snapshot installation requests for state transfer. + /// + /// # Arguments + /// * `request` - The snapshot installation request containing state data + /// + /// # Returns + /// * `Ok(Response)` - Response indicating success/failure of snapshot installation + /// * `Err(Status)` - Error status if the snapshot operation fails + /// + /// # Protocol Details + /// This implements the InstallSnapshot RPC from the Raft protocol. + /// Used to bring lagging followers up to date more efficiently than regular log replication. + async fn snapshot(&self, request: Request) -> Result, Status> { + debug!("Processing snapshot installation request"); + let req = request.into_inner(); + + // Deserialize the snapshot request + let snapshot_req = Self::deserialize_request(&req.value)?; + + // Process the snapshot request + let snapshot_resp = self + .raft_node + .install_snapshot(snapshot_req) + .await + .map_err(|e| Status::internal(format!("Snapshot installation failed: {}", e)))?; + + debug!("Snapshot installation request processed successfully"); + Self::create_response(snapshot_resp) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs new file mode 100644 index 000000000..4bc773b35 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/management_service.rs @@ -0,0 +1,157 @@ +use std::collections::BTreeMap; + +use openraft::Raft; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tracing::debug; + +use crate::protobuf::management_service_server::ManagementService; +use crate::protobuf::AddLearnerRequest; +use crate::protobuf::ChangeMembershipRequest; +use crate::protobuf::InitRequest; +use crate::protobuf::RaftReplyString; +use crate::protobuf::RaftRequestString; +use crate::Node; +use crate::TypeConfig; + +/// Management service implementation for Raft cluster administration. +/// Handles cluster initialization, membership changes, and metrics collection. +/// +/// # Responsibilities +/// - Cluster initialization +/// - Adding learner nodes +/// - Changing cluster membership +/// - Collecting metrics +pub struct ManagementServiceImpl { + raft_node: Raft, +} + +impl ManagementServiceImpl { + /// Creates a new instance of the management service + /// + /// # Arguments + /// * `raft_node` - The Raft node instance this service will manage + pub fn new(raft_node: Raft) -> Self { + ManagementServiceImpl { raft_node } + } + + /// Helper function to create a standard response + fn create_response(data: T) -> Result, Status> { + let data = serde_json::to_string(&data) + .map_err(|e| Status::internal(format!("Failed to serialize response: {}", e)))?; + + Ok(Response::new(RaftReplyString { + data, + error: Default::default(), + })) + } +} + +#[tonic::async_trait] +impl ManagementService for ManagementServiceImpl { + /// Initializes a new Raft cluster with the specified nodes + /// + /// # Arguments + /// * `request` - Contains the initial set of nodes for the cluster + /// + /// # Returns + /// * Success response with initialization details + /// * Error if initialization fails + async fn init(&self, request: Request) -> Result, Status> { + debug!("Initializing Raft cluster"); + let req = request.into_inner(); + + // Convert nodes into required format + let nodes_map: BTreeMap = req + .nodes + .into_iter() + .map(|node| { + (node.node_id, Node { + rpc_addr: node.rpc_addr, + node_id: node.node_id, + }) + }) + .collect(); + + // Initialize the cluster + let result = self + .raft_node + .initialize(nodes_map) + .await + .map_err(|e| Status::internal(format!("Failed to initialize cluster: {}", e)))?; + + debug!("Cluster initialization successful"); + Self::create_response(result) + } + + /// Adds a learner node to the Raft cluster + /// + /// # Arguments + /// * `request` - Contains the node information and blocking preference + /// + /// # Returns + /// * Success response with learner addition details + /// * Error if the operation fails + async fn add_learner(&self, request: Request) -> Result, Status> { + let req = request.into_inner(); + + let node = req.node.ok_or_else(|| Status::internal("Node information is required"))?; + + debug!("Adding learner node {}", node.node_id); + + let raft_node = Node { + rpc_addr: node.rpc_addr.clone(), + node_id: node.node_id, + }; + + let result = self + .raft_node + .add_learner(node.node_id, raft_node, req.blocking) + .await + .map_err(|e| Status::internal(format!("Failed to add learner node: {}", e)))?; + + debug!("Successfully added learner node {}", node.node_id); + Self::create_response(result) + } + + /// Changes the membership of the Raft cluster + /// + /// # Arguments + /// * `request` - Contains the new member set and retention policy + /// + /// # Returns + /// * Success response with membership change details + /// * Error if the operation fails + async fn change_membership( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + debug!( + "Changing membership. Members: {:?}, Retain: {}", + req.members, req.retain + ); + + let result = self + .raft_node + .change_membership(req.members, req.retain) + .await + .map_err(|e| Status::internal(format!("Failed to change membership: {}", e)))?; + + debug!("Successfully changed cluster membership"); + Self::create_response(result) + } + + /// Retrieves metrics about the Raft node + /// + /// # Returns + /// * Success response with metrics data + /// * Error if metrics collection fails + async fn metrics(&self, _request: Request) -> Result, Status> { + debug!("Collecting metrics"); + let metrics = self.raft_node.metrics().borrow().clone(); + Self::create_response(metrics).map_err(|e| Status::internal(format!("Failed to collect metrics: {}", e))) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/grpc/mod.rs b/examples/raft-kv-memstore-grpc/src/grpc/mod.rs new file mode 100644 index 000000000..5f903ab1f --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/grpc/mod.rs @@ -0,0 +1,3 @@ +pub mod management_service; +pub mod internal_service; +pub mod api_service; diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs new file mode 100644 index 000000000..08a12e8d5 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -0,0 +1,57 @@ +#![allow(clippy::uninlined_format_args)] + +use std::fmt::Display; + +use crate::store::Request; +use crate::store::Response; + +pub mod grpc; +pub mod network; +pub mod store; +#[cfg(test)] +mod test; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for example K/V store. + pub TypeConfig: + D = Request, + R = Response, + Node = Node, +); + +pub type LogStore = store::LogStore; +pub type StateMachineStore = store::StateMachineStore; +pub type Raft = openraft::Raft; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default)] +pub struct Node { + pub node_id: u64, + pub rpc_addr: String, +} + +impl Display for Node { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Node {{ rpc_addr: {}, node_id: {} }}", self.rpc_addr, self.node_id) + } +} + +pub mod protobuf { + tonic::include_proto!("openraftpb"); +} + +pub mod typ { + + use crate::TypeConfig; + + pub type RaftError = openraft::error::RaftError; + pub type RPCError = openraft::error::RPCError>; + + pub type ClientWriteError = openraft::error::ClientWriteError; + pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError; + pub type ForwardToLeader = openraft::error::ForwardToLeader; + pub type InitializeError = openraft::error::InitializeError; + + pub type ClientWriteResponse = openraft::raft::ClientWriteResponse; +} diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs new file mode 100644 index 000000000..ffcc55e38 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -0,0 +1,102 @@ +use bincode::deserialize; +use bincode::serialize; +use openraft::error::InstallSnapshotError; +use openraft::error::NetworkError; +use openraft::error::RPCError; +use openraft::error::RaftError; +use openraft::network::RPCOption; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::InstallSnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::RaftNetwork; +use openraft::RaftNetworkFactory; +use tonic::transport::Channel; + +use crate::protobuf::internal_service_client::InternalServiceClient; +use crate::protobuf::RaftRequestBytes; +use crate::Node; +use crate::NodeId; +use crate::TypeConfig; + +/// Network implementation for gRPC-based Raft communication. +/// Provides the networking layer for Raft nodes to communicate with each other. +pub struct Network {} + +type RaftServiceClient = InternalServiceClient; + +impl Network {} + +/// Implementation of the RaftNetworkFactory trait for creating new network connections. +/// This factory creates gRPC client connections to other Raft nodes. +impl RaftNetworkFactory for Network { + type Network = NetworkConnection; + + #[tracing::instrument(level = "debug", skip_all)] + async fn new_client(&mut self, _: NodeId, node: &Node) -> Self::Network { + let channel = Channel::builder(format!("http://{}", node.rpc_addr).parse().unwrap()) + .connect() + .await + .unwrap(); + NetworkConnection::new(InternalServiceClient::new(channel)) + } +} + +/// Represents an active network connection to a remote Raft node. +/// Handles serialization and deserialization of Raft messages over gRPC. +pub struct NetworkConnection { + client: RaftServiceClient, +} + +impl NetworkConnection { + /// Creates a new NetworkConnection with the provided gRPC client. + pub fn new(client: RaftServiceClient) -> Self { + NetworkConnection { client } + } +} + +/// Implementation of RaftNetwork trait for handling Raft protocol communications. +#[allow(clippy::blocks_in_conditions)] +impl RaftNetwork for NetworkConnection { + async fn append_entries( + &mut self, + req: AppendEntriesRequest, + _option: RPCOption, + ) -> Result, RPCError>> { + let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let request = RaftRequestBytes { value }; + let response = self.client.append(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let message = response.into_inner(); + let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + Ok(result) + } + + async fn install_snapshot( + &mut self, + req: InstallSnapshotRequest, + _option: RPCOption, + ) -> Result, RPCError>> + { + let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let request = RaftRequestBytes { value }; + let response = self.client.snapshot(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let message = response.into_inner(); + let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + Ok(result) + } + + async fn vote( + &mut self, + req: VoteRequest, + _option: RPCOption, + ) -> Result, RPCError>> { + let value = serialize(&req).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let request = RaftRequestBytes { value }; + let response = self.client.vote(request).await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + let message = response.into_inner(); + let result = deserialize(&message.value).map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + Ok(result) + } +} diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs new file mode 100644 index 000000000..24c6adbdd --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -0,0 +1,231 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::io::Cursor; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use openraft::alias::SnapshotDataOf; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::LogId; +use openraft::RaftSnapshotBuilder; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::RwLock; + +use crate::NodeId; +use crate::TypeConfig; + +pub type LogStore = memstore::LogStore; + +/** + * Here you will set the types of request that will interact with the raft nodes. + * For example the `Set` will be used to write data (key and value) to the raft database. + * The `AddNode` will append a new node to the current existing shared list of nodes. + * You will want to add any request that can write data in all nodes here. + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Set { key: String, value: String }, +} + +/** + * Here you will defined what type of answer you expect from reading the data of a node. + * In this example it will return a optional value from a given key in + * the `Request.Set`. + * + * TODO: Should we explain how to create multiple `AppDataResponse`? + * + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + pub value: Option>, +} + +#[derive(Debug)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Vec, +} + +/// Data contained in the Raft state machine. +/// +/// Note that we are using `serde` to serialize the +/// `data`, which has a implementation to be serialized. Note that for this test we set both the key +/// and value as String, but you could set any type of value that has the serialization impl. +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct StateMachineData { + pub last_applied_log: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +/// Defines a state machine for the Raft cluster. This state machine represents a copy of the +/// data for this node. Additionally, it is responsible for storing the last snapshot of the data. +#[derive(Debug, Default)] +pub struct StateMachineStore { + /// The Raft state machine. + pub state_machine: RwLock, + + /// Used in identifier for snapshot. + /// + /// Note that concurrently created snapshots and snapshots created on different nodes + /// are not guaranteed to have sequential `snapshot_idx` values, but this does not matter for + /// correctness. + snapshot_idx: AtomicU64, + + /// The last received snapshot. + current_snapshot: RwLock>, +} + +impl RaftSnapshotBuilder for Arc { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&mut self) -> Result, StorageError> { + // Serialize the data of the state machine. + let state_machine = self.state_machine.read().await; + let data = serde_json::to_vec(&state_machine.data).map_err(|e| StorageError::read_state_machine(&e))?; + + let last_applied_log = state_machine.last_applied_log; + let last_membership = state_machine.last_membership.clone(); + + // Lock the current snapshot before releasing the lock on the state machine, to avoid a race + // condition on the written snapshot + let mut current_snapshot = self.current_snapshot.write().await; + drop(state_machine); + + let snapshot_idx = self.snapshot_idx.fetch_add(1, Ordering::Relaxed) + 1; + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: data.clone(), + }; + + *current_snapshot = Some(snapshot); + + Ok(Snapshot { + meta, + snapshot: Box::new(Cursor::new(data)), + }) + } +} + +impl RaftStateMachine for Arc { + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + let state_machine = self.state_machine.read().await; + Ok((state_machine.last_applied_log, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> + Send { + let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator + + let mut sm = self.state_machine.write().await; + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied_log = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => {} + EntryPayload::Normal(ref req) => match req { + Request::Set { key, value } => { + sm.data.insert(key.clone(), value.clone()); + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + } + }; + res.push(Response { value: None }) + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot(&mut self) -> Result>, StorageError> { + Ok(Box::new(Cursor::new(Vec::new()))) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box>, + ) -> Result<(), StorageError> { + tracing::info!( + { snapshot_size = snapshot.get_ref().len() }, + "decoding snapshot for installation" + ); + + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + // Update the state machine. + let updated_state_machine_data = serde_json::from_slice(&new_snapshot.data) + .map_err(|e| StorageError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?; + let updated_state_machine = StateMachineData { + last_applied_log: meta.last_log_id, + last_membership: meta.last_membership.clone(), + data: updated_state_machine_data, + }; + let mut state_machine = self.state_machine.write().await; + *state_machine = updated_state_machine; + + // Lock the current snapshot before releasing the lock on the state machine, to avoid a race + // condition on the written snapshot + let mut current_snapshot = self.current_snapshot.write().await; + drop(state_machine); + + // Update current snapshot. + *current_snapshot = Some(new_snapshot); + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&mut self) -> Result>, StorageError> { + match &*self.current_snapshot.read().await { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: Box::new(Cursor::new(data)), + })) + } + None => Ok(None), + } + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } +} diff --git a/examples/raft-kv-memstore-grpc/src/test.rs b/examples/raft-kv-memstore-grpc/src/test.rs new file mode 100644 index 000000000..dff18a3ec --- /dev/null +++ b/examples/raft-kv-memstore-grpc/src/test.rs @@ -0,0 +1,23 @@ +use std::sync::Arc; + +use openraft::testing::log::StoreBuilder; +use openraft::testing::log::Suite; +use openraft::StorageError; + +use crate::store::LogStore; +use crate::store::StateMachineStore; +use crate::TypeConfig; + +struct MemKVStoreBuilder {} + +impl StoreBuilder, ()> for MemKVStoreBuilder { + async fn build(&self) -> Result<((), LogStore, Arc), StorageError> { + Ok(((), LogStore::default(), Arc::default())) + } +} + +#[tokio::test] +pub async fn test_mem_store() -> Result<(), StorageError> { + Suite::test_all(MemKVStoreBuilder {}).await?; + Ok(()) +} diff --git a/examples/raft-kv-memstore-grpc/test-cluster.sh b/examples/raft-kv-memstore-grpc/test-cluster.sh new file mode 100755 index 000000000..ea2fa5bd3 --- /dev/null +++ b/examples/raft-kv-memstore-grpc/test-cluster.sh @@ -0,0 +1,145 @@ +#!/bin/sh + +set -o errexit + +cargo build + +kill() { + if [ "$(uname)" = "Darwin" ]; then + SERVICE='raft-key-value' + if pgrep -xq -- "${SERVICE}"; then + pkill -f "${SERVICE}" + fi + else + set +e # killall will error if finds no process to kill + killall raft-key-value + set -e + fi +} + +rpc() { + local port=$1 + local method=$2 + local body="$3" + local isApiService="$4" + local cmd="grpcurl -plaintext -proto ./proto/management_service.proto -d $body -import-path ./proto localhost:$port openraftpb.ManagementService/$method" + if [ "$isApiService" = "true" ]; then + cmd="grpcurl -plaintext -proto ./proto/api_service.proto -d $body -import-path ./proto localhost:$port openraftpb.ApiService/$method" + fi + + echo '---'" rpc(127.0.0.1:$port/$method, $body)" + + { + time $cmd + } | { + if type jq > /dev/null 2>&1; then + jq 'if has("data") then .data |= fromjson else . end' + else + cat + fi + } + + echo + echo +} + +export RUST_LOG=trace +export RUST_BACKTRACE=full + +echo "Killing all running raft-key-value" + +kill + +sleep 1 + +echo "Start 5 uninitialized raft-key-value servers..." + +nohup ./target/debug/raft-key-value --id 1 --addr 127.0.0.1:5051 > n1.log & +sleep 1 +echo "Server 1 started" + +nohup ./target/debug/raft-key-value --id 2 --addr 127.0.0.1:5052 > n2.log & +sleep 1 +echo "Server 2 started" + +nohup ./target/debug/raft-key-value --id 3 --addr 127.0.0.1:5053 > n3.log & +sleep 1 +echo "Server 3 started" +sleep 1 + +nohup ./target/debug/raft-key-value --id 4 --addr 127.0.0.1:5054 > n4.log & +sleep 1 +echo "Server 4 started" +sleep 1 + +nohup ./target/debug/raft-key-value --id 5 --addr 127.0.0.1:5055 > n5.log & +sleep 1 +echo "Server 5 started" +sleep 1 + +echo "Initialize servers 1,2,3 as a 3-nodes cluster" +sleep 2 +echo + +rpc 5051 Init '{"nodes":[{"node_id":"1","rpc_addr":"127.0.0.1:5051"},{"node_id":"2","rpc_addr":"127.0.0.1:5052"},{"node_id":"3","rpc_addr":"127.0.0.1:5053"}]}' + +echo "Server 1 is a leader now" + +sleep 2 + +echo "Get metrics from the leader" +sleep 2 +echo +rpc 5051 Metrics '{}' +sleep 1 + + +echo "Adding node 4 and node 5 as learners, to receive log from leader node 1" + +sleep 1 +echo +rpc 5051 AddLearner '{"blocking":true,"node":{"node_id":"4","rpc_addr":"127.0.0.1:5054"}}' +echo "Node 4 added as learner" +sleep 1 +echo +rpc 5051 AddLearner '{"blocking":true,"node":{"node_id":"5","rpc_addr":"127.0.0.1:5055"}}' +echo "Node 5 added as learner" +sleep 1 + +echo "Get metrics from the leader, after adding 2 learners" +sleep 2 +echo +rpc 5051 Metrics '{}' +sleep 1 + +echo "Changing membership from [1, 2, 3] to 5 nodes cluster: [1, 2, 3, 4, 5]" +echo +rpc 5051 ChangeMembership '{"members":["1","2","3","4","5"],"retain":true}' +sleep 1 +echo 'Membership changed to [1, 2, 3, 4, 5]' +sleep 1 + +echo "Get metrics from the leader again" +sleep 1 +echo +rpc 5051 Metrics '{}' +sleep 1 + +echo "Write foo=zoo on node-3" +sleep 1 +echo +rpc 5051 Set '{"key":"foo","value":"zoo"}' true +sleep 1 +echo "Data written" +sleep 1 + +echo "Read foo=zoo from node-3" +sleep 1 +echo "Read from node 2" +echo +rpc 5052 Get '{"key":"foo"}' true +echo + + +echo "Killing all nodes..." +kill