From a51c0c3bed348adfc6d1d747cd81072f02176f98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 24 Dec 2024 10:32:19 +0800 Subject: [PATCH] Change: change `Vote` to `Vote` This refactoring moves Vote from a per-NodeId type to a per-TypeConfig type, to make it consistent with `RaftTypeConfig` usage across the codebase. - Part of: #1278 Upgrade tip: Vote is now parameterized by `RaftTypeConfig` instead of `NodeId` - Change `Vote` to `Vote where C: RaftTypeConfig`, for example, change `Vote` to `Vote`. --- cluster_benchmark/tests/benchmark/store.rs | 6 +-- examples/memstore/src/log_store.rs | 10 ++-- examples/raft-kv-memstore-grpc/src/lib.rs | 3 +- .../raft-kv-memstore-grpc/src/network/mod.rs | 2 +- .../raft-kv-memstore-network-v2/src/lib.rs | 4 +- .../src/network.rs | 2 +- .../src/lib.rs | 3 +- .../src/network.rs | 2 +- .../src/store.rs | 6 +-- examples/raft-kv-rocksdb/src/store.rs | 8 +-- openraft/src/core/notification.rs | 2 +- openraft/src/core/raft_msg/mod.rs | 4 +- openraft/src/engine/command.rs | 2 +- openraft/src/engine/engine_impl.rs | 8 +-- .../src/engine/handler/vote_handler/mod.rs | 4 +- openraft/src/error.rs | 10 ++-- openraft/src/metrics/metric.rs | 2 +- openraft/src/metrics/raft_metrics.rs | 4 +- openraft/src/metrics/wait.rs | 2 +- openraft/src/network/snapshot_transport.rs | 4 +- openraft/src/network/v2/adapt_v1.rs | 2 +- openraft/src/network/v2/network.rs | 2 +- openraft/src/proposer/candidate.rs | 6 +-- openraft/src/raft/message/append_entries.rs | 4 +- openraft/src/raft/message/install_snapshot.rs | 8 +-- openraft/src/raft/message/transfer_leader.rs | 6 +-- openraft/src/raft/message/vote.rs | 10 ++-- openraft/src/raft/mod.rs | 2 +- openraft/src/raft_state/io_state.rs | 2 +- openraft/src/raft_state/io_state/io_id.rs | 4 +- openraft/src/raft_state/mod.rs | 8 +-- openraft/src/raft_state/vote_state_reader.rs | 8 +-- openraft/src/replication/mod.rs | 2 +- .../src/replication/replication_session_id.rs | 2 +- openraft/src/storage/v2/raft_log_reader.rs | 2 +- openraft/src/storage/v2/raft_log_storage.rs | 2 +- .../src/storage/v2/raft_log_storage_ext.rs | 2 +- openraft/src/testing/log/suite.rs | 4 +- openraft/src/type_config.rs | 2 +- openraft/src/vote/committed.rs | 7 ++- .../src/vote/leader_id/impl_into_leader_id.rs | 8 +-- openraft/src/vote/non_committed.rs | 7 ++- openraft/src/vote/vote.rs | 54 ++++++++++--------- stores/memstore/src/lib.rs | 6 +-- stores/rocksstore/src/lib.rs | 7 ++- tests/tests/fixtures/mod.rs | 2 +- 46 files changed, 129 insertions(+), 128 deletions(-) diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index 4ba7a7335..a539bf0e2 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -57,7 +57,7 @@ pub struct StateMachine { } pub struct LogStore { - vote: RwLock>>, + vote: RwLock>>, log: RwLock>>, last_purged_log_id: RwLock>>, } @@ -116,7 +116,7 @@ impl RaftLogReader for Arc { Ok(entries) } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { Ok(self.vote.read().await.clone()) } } @@ -196,7 +196,7 @@ impl RaftLogStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { let mut v = self.vote.write().await; *v = Some(*vote); Ok(()) diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index 8397091ca..cc8e5114a 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -33,7 +33,7 @@ pub struct LogStoreInner { committed: Option>, /// The current granted vote. - vote: Option>, + vote: Option>, } impl Default for LogStoreInner { @@ -84,12 +84,12 @@ impl LogStoreInner { Ok(self.committed.clone()) } - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.vote = Some(vote.clone()); Ok(()) } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { Ok(self.vote.clone()) } @@ -157,7 +157,7 @@ mod impl_log_store { inner.try_get_log_entries(range).await } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { let mut inner = self.inner.lock().await; inner.read_vote().await } @@ -183,7 +183,7 @@ mod impl_log_store { inner.read_committed().await } - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { let mut inner = self.inner.lock().await; inner.save_vote(vote).await } diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs index f335440f8..deeae55ae 100644 --- a/examples/raft-kv-memstore-grpc/src/lib.rs +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -37,10 +37,9 @@ pub mod protobuf { pub mod typ { - use crate::NodeId; use crate::TypeConfig; - pub type Vote = openraft::Vote; + pub type Vote = openraft::Vote; pub type SnapshotMeta = openraft::SnapshotMeta; pub type SnapshotData = ::SnapshotData; pub type Snapshot = openraft::Snapshot; diff --git a/examples/raft-kv-memstore-grpc/src/network/mod.rs b/examples/raft-kv-memstore-grpc/src/network/mod.rs index e92ca9afb..e9ecf1ae8 100644 --- a/examples/raft-kv-memstore-grpc/src/network/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/network/mod.rs @@ -78,7 +78,7 @@ impl RaftNetworkV2 for NetworkConnection { async fn full_snapshot( &mut self, - vote: openraft::Vote<::NodeId>, + vote: openraft::Vote, snapshot: openraft::Snapshot, _cancel: impl std::future::Future + openraft::OptionalSend + 'static, _option: RPCOption, diff --git a/examples/raft-kv-memstore-network-v2/src/lib.rs b/examples/raft-kv-memstore-network-v2/src/lib.rs index 5cf113e95..6bbda6497 100644 --- a/examples/raft-kv-memstore-network-v2/src/lib.rs +++ b/examples/raft-kv-memstore-network-v2/src/lib.rs @@ -34,13 +34,11 @@ pub type LogStore = store::LogStore; pub type StateMachineStore = store::StateMachineStore; pub mod typ { - - use crate::NodeId; use crate::TypeConfig; pub type Raft = openraft::Raft; - pub type Vote = openraft::Vote; + pub type Vote = openraft::Vote; pub type SnapshotMeta = openraft::SnapshotMeta; pub type SnapshotData = ::SnapshotData; pub type Snapshot = openraft::Snapshot; diff --git a/examples/raft-kv-memstore-network-v2/src/network.rs b/examples/raft-kv-memstore-network-v2/src/network.rs index 7173f7151..b85bee8bb 100644 --- a/examples/raft-kv-memstore-network-v2/src/network.rs +++ b/examples/raft-kv-memstore-network-v2/src/network.rs @@ -48,7 +48,7 @@ impl RaftNetworkV2 for Connection { /// A real application should replace this method with customized implementation. async fn full_snapshot( &mut self, - vote: Vote, + vote: Vote, snapshot: Snapshot, _cancel: impl Future + OptionalSend + 'static, _option: RPCOption, diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs index d5178c481..6915e9a6e 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/lib.rs @@ -36,12 +36,11 @@ pub type StateMachineStore = store::StateMachineStore; pub mod typ { - use crate::NodeId; use crate::TypeConfig; pub type Raft = openraft::Raft; - pub type Vote = openraft::Vote; + pub type Vote = openraft::Vote; pub type SnapshotMeta = openraft::SnapshotMeta; pub type SnapshotData = ::SnapshotData; pub type Snapshot = openraft::Snapshot; diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs index 7173f7151..b85bee8bb 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/network.rs @@ -48,7 +48,7 @@ impl RaftNetworkV2 for Connection { /// A real application should replace this method with customized implementation. async fn full_snapshot( &mut self, - vote: Vote, + vote: Vote, snapshot: Snapshot, _cancel: impl Future + OptionalSend + 'static, _option: RPCOption, diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index 6ddb49437..1ed94bdb1 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -116,7 +116,7 @@ pub struct LogStore { committed: RefCell>>, /// The current granted vote. - vote: RefCell>>, + vote: RefCell>>, } impl RaftLogReader for Rc { @@ -129,7 +129,7 @@ impl RaftLogReader for Rc { Ok(response) } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { Ok(*self.vote.borrow()) } } @@ -312,7 +312,7 @@ impl RaftLogStorage for Rc { } #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { let mut v = self.vote.borrow_mut(); *v = Some(*vote); Ok(()) diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 38eb898ab..b5766f641 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -337,7 +337,7 @@ impl LogStore { .and_then(|v| serde_json::from_slice(&v).ok())) } - fn set_vote_(&self, vote: &Vote) -> StorageResult<()> { + fn set_vote_(&self, vote: &Vote) -> StorageResult<()> { self.db .put_cf(self.store(), b"vote", serde_json::to_vec(vote).unwrap()) .map_err(|e| StorageError::write_vote(&e))?; @@ -346,7 +346,7 @@ impl LogStore { Ok(()) } - fn get_vote_(&self) -> StorageResult>> { + fn get_vote_(&self) -> StorageResult>> { Ok(self .db .get_cf(self.store(), b"vote") @@ -381,7 +381,7 @@ impl RaftLogReader for LogStore { .collect() } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { self.get_vote_() } } @@ -418,7 +418,7 @@ impl RaftLogStorage for LogStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.set_vote_(vote) } diff --git a/openraft/src/core/notification.rs b/openraft/src/core/notification.rs index 68134d5d9..b3c69fee6 100644 --- a/openraft/src/core/notification.rs +++ b/openraft/src/core/notification.rs @@ -33,7 +33,7 @@ where C: RaftTypeConfig target: C::NodeId, /// The higher vote observed. - higher: Vote, + higher: Vote, /// The Leader that sent replication request. leader_vote: CommittedVote, diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 787f68b1f..e2317bf67 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -52,7 +52,7 @@ where C: RaftTypeConfig }, InstallFullSnapshot { - vote: Vote, + vote: Vote, snapshot: Snapshot, tx: ResultSender>, }, @@ -101,7 +101,7 @@ where C: RaftTypeConfig /// Otherwise, just reset Leader lease so that the node `to` can become Leader. HandleTransferLeader { /// The vote of the Leader that is transferring the leadership. - from: Vote, + from: Vote, /// The assigned node to be the next Leader. to: C::NodeId, }, diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 18b1d7160..08eb0f32c 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -112,7 +112,7 @@ where C: RaftTypeConfig }, /// Save vote to storage - SaveVote { vote: Vote }, + SaveVote { vote: Vote }, /// Send vote to all other members SendVote { vote_req: VoteRequest }, diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 24e3634ee..d01649217 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -111,7 +111,7 @@ where C: RaftTypeConfig /// /// The candidate `last_log_id` is initialized with the attributes of Acceptor part: /// [`RaftState`] - pub(crate) fn new_candidate(&mut self, vote: Vote) -> &mut Candidate> { + pub(crate) fn new_candidate(&mut self, vote: Vote) -> &mut Candidate> { let now = C::now(); let last_log_id = self.state.last_log_id().cloned(); @@ -378,7 +378,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn handle_append_entries( &mut self, - vote: &Vote, + vote: &Vote, prev_log_id: Option>, entries: Vec, tx: Option>, @@ -417,7 +417,7 @@ where C: RaftTypeConfig pub(crate) fn append_entries( &mut self, - vote: &Vote, + vote: &Vote, prev_log_id: Option>, entries: Vec, ) -> Result<(), RejectAppendEntries> { @@ -451,7 +451,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn handle_install_full_snapshot( &mut self, - vote: Vote, + vote: Vote, snapshot: Snapshot, tx: ResultSender>, ) { diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 3eb955a54..bea73a5b0 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -59,7 +59,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn accept_vote( &mut self, - vote: &Vote, + vote: &Vote, tx: ResultSender, f: F, ) -> Option> @@ -98,7 +98,7 @@ where C: RaftTypeConfig /// Note: This method does not check last-log-id. handle-vote-request has to deal with /// last-log-id itself. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_vote(&mut self, vote: &Vote) -> Result<(), RejectVoteRequest> { + pub(crate) fn update_vote(&mut self, vote: &Vote) -> Result<(), RejectVoteRequest> { // Partial ord compare: // Vote does not have to be total ord. // `!(a >= b)` does not imply `a < b`. diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 966ed53ba..b2d8e283d 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -346,8 +346,8 @@ where #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] #[error("seen a higher vote: {higher} GT mine: {sender_vote}")] pub(crate) struct HigherVote { - pub(crate) higher: Vote, - pub(crate) sender_vote: Vote, + pub(crate) higher: Vote, + pub(crate) sender_vote: Vote, } /// Error that indicates a **temporary** network error and when it is returned, Openraft will retry @@ -603,7 +603,7 @@ pub struct LearnerNotFound { #[error("not allowed to initialize due to current raft state: last_log_id: {last_log_id:?} vote: {vote}")] pub struct NotAllowed { pub last_log_id: Option>, - pub vote: Vote, + pub vote: Vote, } #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] @@ -636,7 +636,7 @@ pub enum NoForward {} #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub(crate) enum RejectVoteRequest { #[error("reject vote request by a greater vote: {0}")] - ByVote(Vote), + ByVote(Vote), #[allow(dead_code)] #[error("reject vote request by a greater last-log-id: {0:?}")] @@ -659,7 +659,7 @@ where C: RaftTypeConfig #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] pub(crate) enum RejectAppendEntries { #[error("reject AppendEntries by a greater vote: {0}")] - ByVote(Vote), + ByVote(Vote), #[error("reject AppendEntries because of conflicting log-id: {local:?}; expect to be: {expect:?}")] ByConflictingLogId { diff --git a/openraft/src/metrics/metric.rs b/openraft/src/metrics/metric.rs index 15c876c60..e8aceb187 100644 --- a/openraft/src/metrics/metric.rs +++ b/openraft/src/metrics/metric.rs @@ -15,7 +15,7 @@ pub enum Metric where C: RaftTypeConfig { Term(u64), - Vote(Vote), + Vote(Vote), LastLogIndex(Option), Applied(Option>), AppliedIndex(Option), diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index 2dcca9d53..c040913fc 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -32,7 +32,7 @@ pub struct RaftMetrics { pub current_term: u64, /// The last flushed vote. - pub vote: Vote, + pub vote: Vote, /// The last log index has been appended to this Raft node's log. pub last_log_index: Option, @@ -280,7 +280,7 @@ where C: RaftTypeConfig #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct RaftServerMetrics { pub id: C::NodeId, - pub vote: Vote, + pub vote: Vote, pub state: ServerState, pub current_leader: Option, diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 90fffc0ca..405e72f66 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -93,7 +93,7 @@ where C: RaftTypeConfig /// Wait for `vote` to become `want` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn vote(&self, want: Vote, msg: impl ToString) -> Result, WaitError> { + pub async fn vote(&self, want: Vote, msg: impl ToString) -> Result, WaitError> { self.eq(Metric::Vote(want), msg).await } diff --git a/openraft/src/network/snapshot_transport.rs b/openraft/src/network/snapshot_transport.rs index 12e5b8adc..89638fefe 100644 --- a/openraft/src/network/snapshot_transport.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -45,7 +45,7 @@ mod tokio_rt { { async fn send_snapshot( net: &mut Net, - vote: Vote, + vote: Vote, mut snapshot: Snapshot, mut cancel: impl Future + OptionalSend + 'static, option: RPCOption, @@ -299,7 +299,7 @@ pub trait SnapshotTransport { // TODO: consider removing dependency on RaftNetwork async fn send_snapshot( net: &mut Net, - vote: Vote, + vote: Vote, snapshot: Snapshot, cancel: impl Future + OptionalSend + 'static, option: RPCOption, diff --git a/openraft/src/network/v2/adapt_v1.rs b/openraft/src/network/v2/adapt_v1.rs index 5cb199886..7865ba7a5 100644 --- a/openraft/src/network/v2/adapt_v1.rs +++ b/openraft/src/network/v2/adapt_v1.rs @@ -38,7 +38,7 @@ where async fn full_snapshot( &mut self, - vote: Vote, + vote: Vote, snapshot: Snapshot, cancel: impl Future + OptionalSend + 'static, option: RPCOption, diff --git a/openraft/src/network/v2/network.rs b/openraft/src/network/v2/network.rs index 3a37abeb4..031858031 100644 --- a/openraft/src/network/v2/network.rs +++ b/openraft/src/network/v2/network.rs @@ -75,7 +75,7 @@ where C: RaftTypeConfig /// [`Raft::install_full_snapshot()`]: crate::raft::Raft::install_full_snapshot async fn full_snapshot( &mut self, - vote: Vote, + vote: Vote, snapshot: Snapshot, cancel: impl Future + OptionalSend + 'static, option: RPCOption, diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 37ee0b4a7..45bb89252 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -25,7 +25,7 @@ where starting_time: InstantOf, /// The vote. - vote: Vote, + vote: Vote, last_log_id: Option>, @@ -61,7 +61,7 @@ where { pub(crate) fn new( starting_time: InstantOf, - vote: Vote, + vote: Vote, last_log_id: Option>, quorum_set: QS, learner_ids: impl IntoIterator, @@ -76,7 +76,7 @@ where } } - pub(crate) fn vote_ref(&self) -> &Vote { + pub(crate) fn vote_ref(&self) -> &Vote { &self.vote } diff --git a/openraft/src/raft/message/append_entries.rs b/openraft/src/raft/message/append_entries.rs index 9d305520d..6f9a0e54d 100644 --- a/openraft/src/raft/message/append_entries.rs +++ b/openraft/src/raft/message/append_entries.rs @@ -16,7 +16,7 @@ use crate::Vote; #[derive(Clone)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct AppendEntriesRequest { - pub vote: Vote, + pub vote: Vote, pub prev_log_id: Option>, @@ -95,7 +95,7 @@ pub enum AppendEntriesResponse { /// Seen a vote `v` that does not hold `mine_vote >= v`. /// And a leader's vote(committed vote) must be total order with other vote. /// Therefore it has to be a higher vote: `mine_vote < v` - HigherVote(Vote), + HigherVote(Vote), } impl AppendEntriesResponse diff --git a/openraft/src/raft/message/install_snapshot.rs b/openraft/src/raft/message/install_snapshot.rs index 15ecc90b5..bb1bc7bac 100644 --- a/openraft/src/raft/message/install_snapshot.rs +++ b/openraft/src/raft/message/install_snapshot.rs @@ -9,7 +9,7 @@ use crate::Vote; #[derive(PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct InstallSnapshotRequest { - pub vote: Vote, + pub vote: Vote, /// Metadata of a snapshot: snapshot_id, last_log_ed membership etc. pub meta: SnapshotMeta, @@ -44,7 +44,7 @@ impl fmt::Display for InstallSnapshotRequest { #[display("{{vote:{}}}", vote)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct InstallSnapshotResponse { - pub vote: Vote, + pub vote: Vote, } /// The response to `Raft::install_full_snapshot` API. @@ -54,11 +54,11 @@ pub struct InstallSnapshotResponse { #[display("SnapshotResponse{{vote:{}}}", vote)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct SnapshotResponse { - pub vote: Vote, + pub vote: Vote, } impl SnapshotResponse { - pub fn new(vote: Vote) -> Self { + pub fn new(vote: Vote) -> Self { Self { vote } } } diff --git a/openraft/src/raft/message/transfer_leader.rs b/openraft/src/raft/message/transfer_leader.rs index 460651f05..3ff450523 100644 --- a/openraft/src/raft/message/transfer_leader.rs +++ b/openraft/src/raft/message/transfer_leader.rs @@ -12,7 +12,7 @@ pub struct TransferLeaderRequest where C: RaftTypeConfig { /// The vote of the Leader that is transferring the leadership. - pub(crate) from_leader: Vote, + pub(crate) from_leader: Vote, /// The assigned node to be the next Leader. pub(crate) to_node_id: C::NodeId, @@ -24,7 +24,7 @@ where C: RaftTypeConfig impl TransferLeaderRequest where C: RaftTypeConfig { - pub fn new(from: Vote, to: C::NodeId, last_log_id: Option>) -> Self { + pub fn new(from: Vote, to: C::NodeId, last_log_id: Option>) -> Self { Self { from_leader: from, to_node_id: to, @@ -33,7 +33,7 @@ where C: RaftTypeConfig } /// From which Leader the leadership is transferred. - pub fn from_leader(&self) -> &Vote { + pub fn from_leader(&self) -> &Vote { &self.from_leader } diff --git a/openraft/src/raft/message/vote.rs b/openraft/src/raft/message/vote.rs index 98ad9891d..556aba718 100644 --- a/openraft/src/raft/message/vote.rs +++ b/openraft/src/raft/message/vote.rs @@ -10,7 +10,7 @@ use crate::Vote; #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct VoteRequest { - pub vote: Vote, + pub vote: Vote, pub last_log_id: Option>, } @@ -25,7 +25,7 @@ where C: RaftTypeConfig impl VoteRequest where C: RaftTypeConfig { - pub fn new(vote: Vote, last_log_id: Option>) -> Self { + pub fn new(vote: Vote, last_log_id: Option>) -> Self { Self { vote, last_log_id } } } @@ -39,7 +39,7 @@ pub struct VoteResponse { /// /// `vote` that equals the candidate.vote does not mean the vote is granted. /// The `vote` may be updated when a previous Leader sees a higher vote. - pub vote: Vote, + pub vote: Vote, /// It is true if a node accepted and saved the VoteRequest. pub vote_granted: bool, @@ -51,7 +51,7 @@ pub struct VoteResponse { impl VoteResponse where C: RaftTypeConfig { - pub fn new(vote: impl Borrow>, last_log_id: Option>, granted: bool) -> Self { + pub fn new(vote: impl Borrow>, last_log_id: Option>, granted: bool) -> Self { Self { vote: vote.borrow().clone(), vote_granted: granted, @@ -61,7 +61,7 @@ where C: RaftTypeConfig /// Returns `true` if the response indicates that the target node has granted a vote to the /// candidate. - pub fn is_granted_to(&self, candidate_vote: &Vote) -> bool { + pub fn is_granted_to(&self, candidate_vote: &Vote) -> bool { &self.vote == candidate_vote } } diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 05ec57607..2a16b23a4 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -418,7 +418,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip_all)] pub async fn install_full_snapshot( &self, - vote: Vote, + vote: Vote, snapshot: Snapshot, ) -> Result, Fatal> { tracing::info!("Raft::install_full_snapshot()"); diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index 519788e77..00cd26807 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -103,7 +103,7 @@ impl IOState where C: RaftTypeConfig { pub(crate) fn new( - vote: &Vote, + vote: &Vote, applied: Option>, snapshot: Option>, purged: Option>, diff --git a/openraft/src/raft_state/io_state/io_id.rs b/openraft/src/raft_state/io_state/io_id.rs index 28481f4e8..c26bfab61 100644 --- a/openraft/src/raft_state/io_state/io_id.rs +++ b/openraft/src/raft_state/io_state/io_id.rs @@ -68,7 +68,7 @@ where C: RaftTypeConfig impl IOId where C: RaftTypeConfig { - pub(crate) fn new(vote: &Vote) -> Self { + pub(crate) fn new(vote: &Vote) -> Self { if vote.is_committed() { Self::new_log_io(vote.clone().into_committed(), None) } else { @@ -87,7 +87,7 @@ where C: RaftTypeConfig /// Returns the vote the io operation is submitted by. #[allow(clippy::wrong_self_convention)] // The above lint is disabled because in future Vote may not be `Copy` - pub(crate) fn to_vote(&self) -> Vote { + pub(crate) fn to_vote(&self) -> Vote { match self { Self::Vote(non_committed_vote) => non_committed_vote.clone().into_vote(), Self::Log(log_io_id) => log_io_id.committed_vote.clone().into_vote(), diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 3b6d60ae9..6cbdc1ebf 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -49,7 +49,7 @@ pub struct RaftState where C: RaftTypeConfig { /// The vote state of this node. - pub(crate) vote: Leased, InstantOf>, + pub(crate) vote: Leased, InstantOf>, /// The LogId of the last log committed(AKA applied) to the state machine. /// @@ -146,10 +146,10 @@ where C: RaftTypeConfig } } -impl VoteStateReader for RaftState +impl VoteStateReader for RaftState where C: RaftTypeConfig { - fn vote_ref(&self) -> &Vote { + fn vote_ref(&self) -> &Vote { self.vote.deref() } } @@ -187,7 +187,7 @@ impl RaftState where C: RaftTypeConfig { /// Get a reference to the current vote. - pub fn vote_ref(&self) -> &Vote { + pub fn vote_ref(&self) -> &Vote { self.vote.deref() } diff --git a/openraft/src/raft_state/vote_state_reader.rs b/openraft/src/raft_state/vote_state_reader.rs index 2781ad273..765b76a8a 100644 --- a/openraft/src/raft_state/vote_state_reader.rs +++ b/openraft/src/raft_state/vote_state_reader.rs @@ -1,10 +1,12 @@ -use crate::NodeId; +use crate::RaftTypeConfig; use crate::Vote; // TODO: remove it? /// APIs to get vote. #[allow(dead_code)] -pub(crate) trait VoteStateReader { +pub(crate) trait VoteStateReader +where C: RaftTypeConfig +{ /// Get a reference to the current vote. - fn vote_ref(&self) -> &Vote; + fn vote_ref(&self) -> &Vote; } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index cc0c9fb0f..c01e31929 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -747,7 +747,7 @@ where async fn send_snapshot( network: Arc>, - vote: Vote, + vote: Vote, snapshot: Snapshot, option: RPCOption, cancel: OneshotReceiverOf, diff --git a/openraft/src/replication/replication_session_id.rs b/openraft/src/replication/replication_session_id.rs index 6ef845cd0..eca020aaa 100644 --- a/openraft/src/replication/replication_session_id.rs +++ b/openraft/src/replication/replication_session_id.rs @@ -68,7 +68,7 @@ where C: RaftTypeConfig self.leader_vote.clone() } - pub(crate) fn vote(&self) -> Vote { + pub(crate) fn vote(&self) -> Vote { self.leader_vote.clone().into_vote() } } diff --git a/openraft/src/storage/v2/raft_log_reader.rs b/openraft/src/storage/v2/raft_log_reader.rs index b29615914..ea87993b9 100644 --- a/openraft/src/storage/v2/raft_log_reader.rs +++ b/openraft/src/storage/v2/raft_log_reader.rs @@ -50,7 +50,7 @@ where C: RaftTypeConfig /// See: [log-stream](`crate::docs::protocol::replication::log_stream`) /// /// [`RaftLogStorage::save_vote`]: crate::storage::RaftLogStorage::save_vote - async fn read_vote(&mut self) -> Result>, StorageError>; + async fn read_vote(&mut self) -> Result>, StorageError>; /// Returns log entries within range `[start, end)`, `end` is exclusive, /// potentially limited by implementation-defined constraints. diff --git a/openraft/src/storage/v2/raft_log_storage.rs b/openraft/src/storage/v2/raft_log_storage.rs index 6a0f532f6..26af8dd75 100644 --- a/openraft/src/storage/v2/raft_log_storage.rs +++ b/openraft/src/storage/v2/raft_log_storage.rs @@ -52,7 +52,7 @@ where C: RaftTypeConfig /// ### To ensure correctness: /// /// The vote must be persisted on disk before returning. - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError>; + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError>; /// Saves the last committed log id to storage. /// diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index e2192ef35..b2b795213 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -34,7 +34,7 @@ where C: RaftTypeConfig let (tx, mut rx) = C::mpsc_unbounded(); - let io_id = IOId::::new_log_io(Vote::::default().into_committed(), Some(last_log_id)); + let io_id = IOId::::new_log_io(Vote::::default().into_committed(), Some(last_log_id)); let notify = Notification::LocalIO { io_id }; let callback = IOFlushed::::new(notify, tx.downgrade()); diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index fcfee9de1..882f6c37e 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -71,7 +71,7 @@ where C: RaftTypeConfig } /// Proxy method to invoke [`RaftLogReader::read_vote`]. - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { self.get_log_reader().await.read_vote().await } @@ -1440,7 +1440,7 @@ where let (tx, mut rx) = C::mpsc_unbounded(); // Dummy log io id for blocking append - let io_id = IOId::::new_log_io(Vote::::default().into_committed(), Some(last_log_id)); + let io_id = IOId::::new_log_io(Vote::::default().into_committed(), Some(last_log_id)); let notify = Notification::LocalIO { io_id }; let cb = IOFlushed::new(notify, tx.downgrade()); diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 1a1eb17b9..94298d9ea 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -152,7 +152,7 @@ pub mod alias { // Usually used types pub type LogIdOf = crate::LogId>; - pub type VoteOf = crate::Vote>; + pub type VoteOf = crate::Vote; pub type LeaderIdOf = crate::LeaderId>; pub type CommittedLeaderIdOf = crate::CommittedLeaderId>; pub type SerdeInstantOf = crate::metrics::SerdeInstant>; diff --git a/openraft/src/vote/committed.rs b/openraft/src/vote/committed.rs index 49ca34eb4..0331aa9d6 100644 --- a/openraft/src/vote/committed.rs +++ b/openraft/src/vote/committed.rs @@ -1,7 +1,6 @@ use std::cmp::Ordering; use std::fmt; -use crate::type_config::alias::NodeIdOf; use crate::vote::ref_vote::RefVote; use crate::CommittedLeaderId; use crate::RaftTypeConfig; @@ -16,7 +15,7 @@ use crate::Vote; pub(crate) struct CommittedVote where C: RaftTypeConfig { - vote: Vote>, + vote: Vote, } /// The `CommittedVote` is totally ordered. @@ -37,7 +36,7 @@ where C: RaftTypeConfig impl CommittedVote where C: RaftTypeConfig { - pub(crate) fn new(mut vote: Vote>) -> Self { + pub(crate) fn new(mut vote: Vote) -> Self { vote.committed = true; Self { vote } } @@ -46,7 +45,7 @@ where C: RaftTypeConfig self.vote.leader_id().to_committed() } - pub(crate) fn into_vote(self) -> Vote> { + pub(crate) fn into_vote(self) -> Vote { self.vote } diff --git a/openraft/src/vote/leader_id/impl_into_leader_id.rs b/openraft/src/vote/leader_id/impl_into_leader_id.rs index ac1409bfa..25b30a7bb 100644 --- a/openraft/src/vote/leader_id/impl_into_leader_id.rs +++ b/openraft/src/vote/leader_id/impl_into_leader_id.rs @@ -1,9 +1,11 @@ -use crate::node::NodeId; use crate::vote::leader_id::CommittedLeaderId; use crate::vote::Vote; +use crate::RaftTypeConfig; -impl From> for CommittedLeaderId { - fn from(vote: Vote) -> Self { +impl From> for CommittedLeaderId +where C: RaftTypeConfig +{ + fn from(vote: Vote) -> Self { vote.leader_id.to_committed() } } diff --git a/openraft/src/vote/non_committed.rs b/openraft/src/vote/non_committed.rs index 13beeb560..86f30cd2f 100644 --- a/openraft/src/vote/non_committed.rs +++ b/openraft/src/vote/non_committed.rs @@ -1,7 +1,6 @@ use std::fmt; use crate::type_config::alias::LeaderIdOf; -use crate::type_config::alias::NodeIdOf; use crate::vote::ref_vote::RefVote; use crate::RaftTypeConfig; use crate::Vote; @@ -15,13 +14,13 @@ use crate::Vote; pub(crate) struct NonCommittedVote where C: RaftTypeConfig { - vote: Vote>, + vote: Vote, } impl NonCommittedVote where C: RaftTypeConfig { - pub(crate) fn new(vote: Vote>) -> Self { + pub(crate) fn new(vote: Vote) -> Self { debug_assert!(!vote.committed); Self { vote } } @@ -30,7 +29,7 @@ where C: RaftTypeConfig &self.vote.leader_id } - pub(crate) fn into_vote(self) -> Vote> { + pub(crate) fn into_vote(self) -> Vote { self.vote } diff --git a/openraft/src/vote/vote.rs b/openraft/src/vote/vote.rs index 67a290ce2..05fbf47e2 100644 --- a/openraft/src/vote/vote.rs +++ b/openraft/src/vote/vote.rs @@ -7,27 +7,30 @@ use crate::vote::ref_vote::RefVote; use crate::vote::vote_status::VoteStatus; use crate::vote::NonCommittedVote; use crate::LeaderId; -use crate::NodeId; use crate::RaftTypeConfig; /// `Vote` represent the privilege of a node. #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct Vote { +pub struct Vote { /// The id of the node that tries to become the leader. - pub leader_id: LeaderId, + pub leader_id: LeaderId, pub committed: bool, } -impl PartialOrd for Vote { +impl PartialOrd for Vote +where C: RaftTypeConfig +{ #[inline] - fn partial_cmp(&self, other: &Vote) -> Option { + fn partial_cmp(&self, other: &Vote) -> Option { PartialOrd::partial_cmp(&self.as_ref_vote(), &other.as_ref_vote()) } } -impl std::fmt::Display for Vote { +impl std::fmt::Display for Vote +where C: RaftTypeConfig +{ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, @@ -38,15 +41,17 @@ impl std::fmt::Display for Vote { } } -impl Vote { - pub fn new(term: u64, node_id: NID) -> Self { +impl Vote +where C: RaftTypeConfig +{ + pub fn new(term: u64, node_id: C::NodeId) -> Self { Self { leader_id: LeaderId::new(term, node_id), committed: false, } } - pub fn new_committed(term: u64, node_id: NID) -> Self { + pub fn new_committed(term: u64, node_id: C::NodeId) -> Self { Self { leader_id: LeaderId::new(term, node_id), committed: true, @@ -58,23 +63,20 @@ impl Vote { self.committed = true } - pub(crate) fn as_ref_vote(&self) -> RefVote<'_, NID> { + pub(crate) fn as_ref_vote(&self) -> RefVote<'_, C::NodeId> { RefVote::new(&self.leader_id, self.committed) } /// Convert this vote into a `CommittedVote` - pub(crate) fn into_committed(self) -> CommittedVote - where C: RaftTypeConfig { + pub(crate) fn into_committed(self) -> CommittedVote { CommittedVote::new(self) } - pub(crate) fn into_non_committed(self) -> NonCommittedVote - where C: RaftTypeConfig { + pub(crate) fn into_non_committed(self) -> NonCommittedVote { NonCommittedVote::new(self) } - pub(crate) fn into_vote_status(self) -> VoteStatus - where C: RaftTypeConfig { + pub(crate) fn into_vote_status(self) -> VoteStatus { if self.committed { VoteStatus::Committed(self.into_committed()) } else { @@ -89,11 +91,11 @@ impl Vote { /// Return the [`LeaderId`] this vote represents for. /// /// The leader may or may not be granted by a quorum. - pub fn leader_id(&self) -> &LeaderId { + pub fn leader_id(&self) -> &LeaderId { &self.leader_id } - pub(crate) fn is_same_leader(&self, leader_id: &CommittedLeaderId) -> bool { + pub(crate) fn is_same_leader(&self, leader_id: &CommittedLeaderId) -> bool { self.leader_id().is_same_as_committed(leader_id) } } @@ -103,6 +105,7 @@ impl Vote { mod tests { #[cfg(not(feature = "single-term-leader"))] mod feature_no_single_term_leader { + use crate::engine::testing::UTConfig; use crate::Vote; #[cfg(feature = "serde")] @@ -112,7 +115,7 @@ mod tests { let s = serde_json::to_string(&v)?; assert_eq!(r#"{"leader_id":{"term":1,"node_id":2},"committed":false}"#, s); - let v2: Vote = serde_json::from_str(&s)?; + let v2: Vote = serde_json::from_str(&s)?; assert_eq!(v, v2); Ok(()) @@ -121,10 +124,10 @@ mod tests { #[test] fn test_vote_total_order() -> anyhow::Result<()> { #[allow(clippy::redundant_closure)] - let vote = |term, node_id| Vote::::new(term, node_id); + let vote = |term, node_id| Vote::::new(term, node_id); #[allow(clippy::redundant_closure)] - let committed = |term, node_id| Vote::::new_committed(term, node_id); + let committed = |term, node_id| Vote::::new_committed(term, node_id); // Compare term first assert!(vote(2, 2) > vote(1, 2)); @@ -149,6 +152,7 @@ mod tests { mod feature_single_term_leader { use std::panic::UnwindSafe; + use crate::engine::testing::UTConfig; use crate::LeaderId; use crate::Vote; @@ -159,7 +163,7 @@ mod tests { let s = serde_json::to_string(&v)?; assert_eq!(r#"{"leader_id":{"term":1,"voted_for":2},"committed":false}"#, s); - let v2: Vote = serde_json::from_str(&s)?; + let v2: Vote = serde_json::from_str(&s)?; assert_eq!(v, v2); Ok(()) @@ -169,15 +173,15 @@ mod tests { #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_vote_partial_order() -> anyhow::Result<()> { #[allow(clippy::redundant_closure)] - let vote = |term, node_id| Vote::::new(term, node_id); + let vote = |term, node_id| Vote::::new(term, node_id); - let none = |term| Vote:: { + let none = |term| Vote:: { leader_id: LeaderId { term, voted_for: None }, committed: false, }; #[allow(clippy::redundant_closure)] - let committed = |term, node_id| Vote::::new_committed(term, node_id); + let committed = |term, node_id| Vote::::new_committed(term, node_id); // Compare term first assert!(vote(2, 2) > vote(1, 2)); diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 9de0315cd..ccba9ce7f 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -155,7 +155,7 @@ pub struct MemLogStore { block: BlockConfig, /// The current hard state. - vote: RwLock>>, + vote: RwLock>>, } impl MemLogStore { @@ -245,7 +245,7 @@ impl RaftLogReader for Arc { Ok(entries) } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { Ok(*self.vote.read().await) } } @@ -349,7 +349,7 @@ impl RaftLogStorage for Arc { } #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { tracing::debug!(?vote, "save_vote"); let mut h = self.vote.write().await; diff --git a/stores/rocksstore/src/lib.rs b/stores/rocksstore/src/lib.rs index 4da939294..976739e52 100644 --- a/stores/rocksstore/src/lib.rs +++ b/stores/rocksstore/src/lib.rs @@ -154,7 +154,6 @@ mod meta { use openraft::ErrorSubject; use openraft::LogId; - use crate::RocksNodeId; use crate::TypeConfig; /// Defines metadata key and value @@ -182,7 +181,7 @@ mod meta { } impl StoreMeta for Vote { const KEY: &'static str = "vote"; - type Value = openraft::Vote; + type Value = openraft::Vote; fn subject(_v: Option<&Self::Value>) -> ErrorSubject { ErrorSubject::Vote @@ -262,7 +261,7 @@ impl RaftLogReader for RocksLogStore { Ok(res) } - async fn read_vote(&mut self) -> Result>, StorageError> { + async fn read_vote(&mut self) -> Result>, StorageError> { self.get_meta::() } } @@ -338,7 +337,7 @@ impl RaftLogStorage for RocksLogStore { }) } - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.put_meta::(vote)?; self.db.flush_wal(true).map_err(|e| StorageError::write_vote(&e))?; Ok(()) diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 077232f0b..7e96055f3 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -1085,7 +1085,7 @@ impl RaftNetworkV2 for RaftRouterNetwork { async fn full_snapshot( &mut self, - vote: Vote, + vote: Vote, snapshot: Snapshot, _cancel: impl Future + OptionalSend + 'static, _option: RPCOption,