Skip to content

Commit

Permalink
example: raft-kv-memstore-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Dec 9, 2024
1 parent 4aaff4f commit 59cab56
Show file tree
Hide file tree
Showing 18 changed files with 1,357 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ exclude = [
"cluster_benchmark",
"examples/memstore",
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-grpc",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-memstore-network-v2",
"examples/raft-kv-memstore-opendal-snapshot-data",
Expand Down
5 changes: 5 additions & 0 deletions examples/raft-kv-memstore-grpc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea

/*.log
48 changes: 48 additions & 0 deletions examples/raft-kv-memstore-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[package]
name = "raft-kv-memstore-grpc"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"Sainath Singineedi <[email protected]>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/databendlabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[[bin]]
name = "raft-key-value"
path = "src/bin/main.rs"

[dependencies]
memstore = { path = "../memstore", features = [] }
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }

clap = { version = "4.1.11", features = ["derive", "env"] }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tokio = { version = "1.0", default-features = false, features = ["sync"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tonic = "0.12.3"
tonic-build = "0.12.3"
bincode = "1.3.3"
dashmap = "6.1.0"
prost = "0.13.4"

[dev-dependencies]
anyhow = "1.0.63"
maplit = "1.0.2"

[features]

[build-dependencies]
prost-build = "0.13.4"
tonic-build = "0.12.3"

[package.metadata.docs.rs]
all-features = true
128 changes: 128 additions & 0 deletions examples/raft-kv-memstore-grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Example distributed key-value store built upon openraft.

It is an example of how to build a real-world key-value store with `openraft`.
Includes:
- An in-memory `RaftLogStorage` and `RaftStateMachine` implementation [store](./src/store/store.rs).

- A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2).
Includes:
- raft-internal network APIs for replication and voting.
- Admin APIs to add nodes, change-membership etc.
- Application APIs to write a value by key or read a value by key.

- Client and `RaftNetwork`([rpc](./src/network/raft_network_impl)) are built upon [reqwest](https://docs.rs/reqwest).

[ExampleClient](./src/client.rs) is a minimal raft client in rust to talk to a raft cluster.
- It includes application API `write()` and `read()`, and administrative API `init()`, `add_learner()`, `change_membership()`, `metrics()`.
- This client tracks the last known leader id, a write operation(such as `write()` or `change_membership()`) will be redirected to the leader on client side.

## Run it

There is a example in bash script and an example in rust:

- [test-cluster.sh](./test-cluster.sh) shows a simulation of 3 nodes running and sharing data,
It only uses `curl` and shows the communication between a client and the cluster in plain HTTP messages.
You can run the cluster demo with:

```shell
./test-cluster.sh
```

- [test_cluster.rs](./tests/cluster/test_cluster.rs) does almost the same as `test-cluster.sh` but in rust
with the `ExampleClient`.

Run it with `cargo test`.


if you want to compile the application, run:

```shell
cargo build
```

(If you append `--release` to make it compile in production, but we don't recommend to use
this project in production yet.)

## What the test script does

To run it, get the binary `raft-key-value` inside `target/debug` and run:

```shell
./raft-key-value --id 1 --http-addr 127.0.0.1:21001
```

It will start a node.

To start the following nodes:

```shell
./raft-key-value --id 2 --http-addr 127.0.0.1:21002
```

You can continue replicating the nodes by changing the `id` and `http-addr`.

After that, call the first node created:

```
POST - 127.0.0.1:21001/init
```

It will define the first node created as the leader.

Then you need to inform to the leader that these nodes are learners:

```
POST - 127.0.0.1:21001/add-learner '[2, "127.0.0.1:21002"]'
POST - 127.0.0.1:21001/add-learner '[3, "127.0.0.1:21003"]'
```

Now you need to tell the leader to add all learners as members of the cluster:

```
POST - 127.0.0.1:21001/change-membership "[1, 2, 3]"
```

Write some data in any of the nodes:

```
POST - 127.0.0.1:21001/write "{"Set":{"key":"foo","value":"bar"}}"
```

Read the data from any node:

```
POST - 127.0.0.1:21002/read "foo"
```

You should be able to read that on the another instance even if you did not sync any data!


## How it's structured.

The application is separated in 4 modules:

- `bin`: You can find the `main()` function in [main](./src/bin/main.rs) the file where the setup for the server happens.
- `network`: You can find the [api](./src/network/api.rs) that implements the endpoints used by the public API and [rpc](./src/network/raft_network_impl) where all the raft communication from the node happens. [management](./src/network/management.rs) is where all the administration endpoints are present, those are used to add orremove nodes, promote and more. [raft](./src/network/raft.rs) is where all the communication are received from other nodes.
- `store`: You can find the file [store](./src/store/mod.rs) where all the key-value implementation is done. Here is where your data application will be managed.

## Where is my data?

The data is store inside state machines, each state machine represents a point of data and
raft enforces that all nodes have the same data in synchronization. You can have a look of
the struct [ExampleStateMachine](./src/store/mod.rs)

## Cluster management

The raft itself does not store node addresses.
But in a real-world application, the implementation of `RaftNetwork` needs to know the addresses.

Thus, in this example application:

- The storage layer has to store nodes' information.
- The network layer keeps a reference to the store so that it is able to get the address of a target node to send RPC to.

To add a node to a cluster, it includes 3 steps:

- Write a `node` through raft protocol to the storage.
- Add the node as a `Learner` to let it start receiving replication data from the leader.
- Invoke `change-membership` to change the learner node to a member.
12 changes: 12 additions & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=src/*");
tonic_build::configure().compile_protos(
&[
"proto/internal_service.proto",
"proto/management_service.proto",
"proto/api_service.proto",
],
&["proto"],
)?;
Ok(())
}
30 changes: 30 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/api_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
syntax = "proto3";
package openraftpb;

// ApiService provides the key-value store API operations
service ApiService {
// Get retrieves the value associated with a given key
rpc Get(GetRequest) returns (GetResponse) {}

// Set stores a key-value pair in the distributed store
rpc Set(SetRequest) returns (SetResponse) {}
}

// GetRequest represents a key lookup request
message GetRequest {
string key = 1; // Key to look up
}

// GetResponse contains the value associated with the requested key
message GetResponse {
string value = 1; // Retrieved value
}

// SetRequest represents a key-value pair to be stored
message SetRequest {
string key = 1; // Key to store
string value = 2; // Value to associate with the key
}

// SetResponse indicates the result of a Set operation
message SetResponse {}
24 changes: 24 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
syntax = "proto3";
package openraftpb;

// InternalService handles internal Raft cluster communication
service InternalService {
// Vote handles vote requests between Raft nodes during leader election
rpc Vote(RaftRequestBytes) returns (RaftReplyBytes) {}

// Append handles call related to append entries RPC
rpc Append(RaftRequestBytes) returns (RaftReplyBytes) {}

// Snapshot handles install snapshot RPC
rpc Snapshot(RaftRequestBytes) returns (RaftReplyBytes) {}
}

// RaftRequestBytes encapsulates binary Raft request data
message RaftRequestBytes {
bytes value = 1; // Serialized Raft request data
}

// RaftReplyBytes encapsulates binary Raft response data
message RaftReplyBytes {
bytes value = 1; // Serialized Raft response data
}
51 changes: 51 additions & 0 deletions examples/raft-kv-memstore-grpc/proto/management_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
syntax = "proto3";
package openraftpb;

// ManagementService handles Raft cluster management operations
service ManagementService {
// Init initializes a new Raft cluster with the given nodes
rpc Init(InitRequest) returns (RaftReplyString) {}

// AddLearner adds a new learner node to the Raft cluster
rpc AddLearner(AddLearnerRequest) returns (RaftReplyString) {}

// ChangeMembership modifies the cluster membership configuration
rpc ChangeMembership(ChangeMembershipRequest) returns (RaftReplyString) {}

// Metrics retrieves cluster metrics and status information
rpc Metrics(RaftRequestString) returns (RaftReplyString) {}
}

// InitRequest contains the initial set of nodes for cluster initialization
message InitRequest {
repeated Node nodes = 1; // List of initial cluster nodes
}

// Node represents a single node in the Raft cluster
message Node {
string rpc_addr = 1; // RPC address for node communication
uint64 node_id = 2; // Unique identifier for the node
}

// AddLearnerRequest specifies parameters for adding a learner node
message AddLearnerRequest {
Node node = 1; // Node to be added as a learner
bool blocking = 2; // Whether to wait for the operation to complete
}

// RaftRequestString represents a string-based Raft request
message RaftRequestString {
string data = 1; // Request data in string format
}

// RaftReplyString represents a string-based Raft response
message RaftReplyString {
string data = 1; // Response data
string error = 2; // Error message, if any
}

// ChangeMembershipRequest specifies parameters for modifying cluster membership
message ChangeMembershipRequest {
repeated uint64 members = 1; // New set of member node IDs
bool retain = 2; // Whether to retain existing configuration
}
78 changes: 78 additions & 0 deletions examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::sync::Arc;

use clap::Parser;
use openraft::Config;
use raft_kv_memstore_grpc::grpc::api_service::ApiServiceImpl;
use raft_kv_memstore_grpc::grpc::internal_service::InternalServiceImpl;
use raft_kv_memstore_grpc::grpc::management_service::ManagementServiceImpl;
use raft_kv_memstore_grpc::network::Network;
use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer;
use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer;
use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc::LogStore;
use raft_kv_memstore_grpc::Raft;
use raft_kv_memstore_grpc::StateMachineStore;
use tonic::transport::Server;
use tracing::info;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
/// Network address to bind the server to (e.g., "127.0.0.1:50051")
pub addr: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing first, before any logging happens
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_file(true)
.with_line_number(true)
.init();

// Parse the parameters passed by arguments.
let options = Opt::parse();
let node_id = options.id;
let addr = options.addr;

// Create a configuration for the raft instance.
let config = Arc::new(
Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
}
.validate()?,
);

// Create stores and network
let log_store = LogStore::default();
let state_machine_store = Arc::new(StateMachineStore::default());
let network = Network {};

// Create Raft instance
let raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?;

// Create the management service with raft instance
let management_service = ManagementServiceImpl::new(raft.clone());
let internal_service = InternalServiceImpl::new(raft.clone());
let api_service = ApiServiceImpl::new(raft, state_machine_store);

// Start server
let server_future = Server::builder()
.add_service(ManagementServiceServer::new(management_service))
.add_service(InternalServiceServer::new(internal_service))
.add_service(ApiServiceServer::new(api_service))
.serve(addr.parse()?);

info!("Node {node_id} starting server at {addr}");
server_future.await?;

Ok(())
}
Loading

0 comments on commit 59cab56

Please sign in to comment.