Skip to content

Commit

Permalink
checkpoint 4: working core, pre benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Dec 4, 2024
1 parent dd187c3 commit 57037ad
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 10 deletions.
35 changes: 28 additions & 7 deletions examples/raft-kv-memstore-grpc/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use openraft::error::InstallSnapshotError;
use openraft::error::Unreachable;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
Expand Down Expand Up @@ -26,13 +27,10 @@ impl RaftNetworkFactory<TypeConfig> for Network {
type Network = NetworkConnection;

async fn new_client(&mut self, target: NodeId, node: &BasicNode) -> Self::Network {
let channel = Channel::builder(format!("http://{}", node.addr).parse().unwrap()).connect().await.unwrap();
let client = InternalServiceClient::new(channel);
NetworkConnection {
owner: Network {},
target,
target_node: node.clone(),
client,
}
}
}
Expand All @@ -42,7 +40,6 @@ pub struct NetworkConnection {
owner: Network,
target: NodeId,
target_node: BasicNode,
client: InternalServiceClient<Channel>,
}

#[allow(unused_variables)]
Expand All @@ -53,10 +50,18 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
_option: RPCOption,
) -> Result<AppendEntriesResponse<TypeConfig>, typ::RPCError> {
let server_addr = self.target_node.addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Err(e) => {
return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e)));
}
};

let mut client = InternalServiceClient::new(channel);
let request = tonic::Request::new(RaftRequest {
data: serde_json::to_string(&req).expect("Failed to convert to string"),
});
let response = self.client.append_entries(request).await.unwrap();
let response = client.append_entries(request).await.unwrap();
Ok(serde_json::from_str(&response.into_inner().data).expect("Failed to deserialize from RaftReply"))
}

Expand All @@ -66,12 +71,20 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
_option: RPCOption,
) -> Result<InstallSnapshotResponse<TypeConfig>, typ::RPCError<InstallSnapshotError>> {
let server_addr = self.target_node.addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Err(e) => {
return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e)));
}
};

let mut client = InternalServiceClient::new(channel);
let request = tonic::Request::new(SnapshotChunkRequest {
rpc_meta: String::from_utf8(req.data).unwrap(),
ver: Default::default(),
chunk: Default::default(),
});
let response = self.client.install_snapshot(request).await.unwrap();
let response = client.install_snapshot(request).await.unwrap();
Ok(serde_json::from_str(&response.into_inner().data).expect("Failed to deserialize from RaftReply"))
}

Expand All @@ -81,10 +94,18 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
_option: RPCOption,
) -> Result<VoteResponse<TypeConfig>, typ::RPCError> {
let server_addr = self.target_node.addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Err(e) => {
return Err(openraft::error::RPCError::Unreachable(Unreachable::new(&e)));
}
};

let mut client = InternalServiceClient::new(channel);
let request = tonic::Request::new(RaftRequest {
data: serde_json::to_string(&req).expect("Failed to deserialize to string"),
});
let response = self.client.vote(request).await.unwrap();
let response = client.vote(request).await.unwrap();
Ok(serde_json::from_str(&response.into_inner().data).expect("Failed to deserialize from RaftReply"))
}
}
124 changes: 121 additions & 3 deletions examples/raft-kv-memstore-grpc/test-cluster.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,121 @@
../../target/debug/raft-key-value --id 1 --addr 127.0.0.1:5051 > n1.log &
../../target/debug/raft-key-value --id 2 --addr 127.0.0.1:5052 > n2.log &
../../target/debug/raft-key-value --id 3 --addr 127.0.0.1:5053 > n3.log &
#!/bin/sh

set -o errexit

cargo build

kill() {
if [ "$(uname)" = "Darwin" ]; then
SERVICE='raft-key-value'
if pgrep -xq -- "${SERVICE}"; then
pkill -f "${SERVICE}"
fi
else
set +e # killall will error if finds no process to kill
killall raft-key-value
set -e
fi
}

rpc() {
local port=$1
local method=$2
local body="$3"

echo '---'" rpc(127.0.0.1:$port/$method, $body)"

{
time grpcurl -plaintext -proto ../../openraft/openraft-proto/protos/management_service.proto -d "$body" -import-path ../../openraft/openraft-proto/protos "localhost:$port" "openraftpb.ManagementService/$method"
} | {
if type jq > /dev/null 2>&1; then
jq 'if has("data") then .data |= fromjson else . end'
else
cat
fi
}

echo
echo
}

export RUST_LOG=trace
export RUST_BACKTRACE=full

echo "Killing all running raft-key-value"

kill

sleep 1

echo "Start 5 uninitialized raft-key-value servers..."

nohup ../../target/debug/raft-key-value --id 1 --addr 127.0.0.1:5051 > n1.log &
sleep 1
echo "Server 1 started"

nohup ../../target/debug/raft-key-value --id 2 --addr 127.0.0.1:5052 > n2.log &
sleep 1
echo "Server 2 started"

nohup ../../target/debug/raft-key-value --id 3 --addr 127.0.0.1:5053 > n3.log &
sleep 1
echo "Server 3 started"
sleep 1

nohup ../../target/debug/raft-key-value --id 4 --addr 127.0.0.1:5054 > n4.log &
sleep 1
echo "Server 4 started"
sleep 1

nohup ../../target/debug/raft-key-value --id 5 --addr 127.0.0.1:5055 > n5.log &
sleep 1
echo "Server 5 started"
sleep 1

echo "Initialize servers 1,2,3 as a 3-nodes cluster"
sleep 2
echo

rpc 5051 Init '{"nodes":[{"id":"1","addr":"127.0.0.1:5051"},{"id":"2","addr":"127.0.0.1:5052"},{"id":"3","addr":"127.0.0.1:5053"}]}'

echo "Server 1 is a leader now"

sleep 2

echo "Get metrics from the leader"
sleep 2
echo
rpc 5051 Metrics '{}'
sleep 1


echo "Adding node 4 and node 5 as learners, to receive log from leader node 1"

sleep 1
echo
rpc 5051 AddLearner '{"is_blocking":true,"node":{"addr":"127.0.0.1:5054","id":"4"}}'
echo "Node 4 added as learner"
sleep 1
echo
rpc 5051 AddLearner '{"is_blocking":true,"node":{"addr":"127.0.0.1:5055","id":"5"}}'
echo "Node 5 added as learner"
sleep 1

echo "Get metrics from the leader, after adding 2 learners"
sleep 2
echo
rpc 5051 Metrics '{}'
sleep 1

echo "Changing membership from [1, 2, 3] to 5 nodes cluster: [1, 2, 3, 4, 5]"
echo
rpc 5051 ChangeMembership '{"nodes":[{"id":"1","addr":"127.0.0.1:5051"},{"id":"2","addr":"127.0.0.1:5052"},{"id":"3","addr":"127.0.0.1:5053"},{"id":"4","addr":"127.0.0.1:5054"},{"id":"5","addr":"127.0.0.1:5055"}]}'
sleep 1
echo 'Membership changed to [1, 2, 3, 4, 5]'
sleep 1

echo "Get metrics from the leader again"
sleep 1
echo
rpc 5051 Metrics '{}'
sleep 1

0 comments on commit 57037ad

Please sign in to comment.