Skip to content

Commit

Permalink
Feature: Abstract Vote
Browse files Browse the repository at this point in the history
impl CompareByKey for RaftVote implementations

RaftTypeConfig: add associated type Vote

impl RaftVote for Vote; add C to CompareByKey

- Part of databendlabs#1278
  • Loading branch information
drmingdrmer committed Jan 2, 2025
1 parent da1eda9 commit a13b95a
Show file tree
Hide file tree
Showing 43 changed files with 345 additions and 135 deletions.
36 changes: 36 additions & 0 deletions openraft/src/base/cmp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/// A trait for types that can be compared via a key.
///
/// Types implementing this trait define how they should be compared by providing a key
/// that implements [`PartialOrd`].
///
/// OpenRaft uses this trait to compare types that may not be [`PartialOrd`] themselves.
///
/// # Type Parameters
/// - `Key<'k>`: The type of the comparison key, which must be partially ordered and must not out
/// live the value.
///
/// # Examples
/// ```
/// # use openraft::base::cmp::CompareByKey;
///
/// struct Person {
/// name: String,
/// age: u32,
/// }
///
/// impl CompareByKey<()> for Person {
/// type Key<'k> = &'k str;
///
/// fn cmp_key(&self) -> Self::Key<'_> {
/// &self.name
/// }
/// }
/// ```
pub(crate) trait CompareByKey<C> {
/// The key type used for comparison.
type Key<'k>: PartialOrd + 'k
where Self: 'k;

/// Returns the key used for comparing this value.
fn cmp_key(&self) -> Self::Key<'_>;
}
2 changes: 2 additions & 0 deletions openraft/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Basic types used in the Raft implementation.
pub(crate) mod cmp;

pub use serde_able::OptionalSerde;
pub use threaded::BoxAny;
pub use threaded::BoxAsyncOnceMut;
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ use crate::type_config::async_runtime::MpscUnboundedReceiver;
use crate::type_config::TypeConfigExt;
use crate::vote::committed::CommittedVote;
use crate::vote::non_committed::NonCommittedVote;
use crate::vote::raft_vote::RaftVoteExt;
use crate::vote::vote_status::VoteStatus;
use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
Expand Down Expand Up @@ -392,7 +394,7 @@ where
// request.
if let AppendEntriesResponse::HigherVote(vote) = append_res {
debug_assert!(
vote > my_vote,
vote.as_ref_vote() > my_vote.as_ref_vote(),
"committed vote({}) has total order relation with other votes({})",
my_vote,
vote
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ mod tests {
type Node = ();
type Term = u64;
type LeaderId = crate::impls::leader_id_adv::LeaderId<Self>;
type Entry = crate::Entry<TickUTConfig>;
type Vote = crate::impls::Vote<Self>;
type Entry = crate::Entry<Self>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
Expand Down
17 changes: 9 additions & 8 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::time::Duration;

use validit::Valid;

use crate::alias::LeaderIdOf;

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unresolved import `crate::alias`

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / rt-monoio

unresolved import `crate::alias`

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / test-crate-openraft (stable)

unresolved import `crate::alias`

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / test-crate-openraft (nightly)

unresolved import `crate::alias`

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

unresolved import `crate::alias`

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unresolved import `crate::alias`

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

unresolved import `crate::alias`

Check failure on line 5 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde,singlethreaded)

unresolved import `crate::alias`
use crate::base::cmp::CompareByKey;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ResultSender;
use crate::core::sm;
Expand Down Expand Up @@ -48,14 +50,15 @@ use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
use crate::vote::RaftLeaderId;
use crate::vote::RaftTerm;
use crate::vote::RaftVote;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftLogId;
use crate::RaftTypeConfig;
use crate::Vote;

/// Raft protocol algorithm.
///
Expand Down Expand Up @@ -195,10 +198,7 @@ where C: RaftTypeConfig
self.check_members_contain_me(m)?;

// FollowingHandler requires vote to be committed.
let vote = Vote {
committed: true,
..Default::default()
};
let vote = <VoteOf<C> as RaftVote<C>>::from_leader_id(Default::default(), true);
self.state.vote.update(C::now(), Duration::default(), vote);
self.following_handler().do_append_entries(vec![entry]);

Expand All @@ -211,8 +211,9 @@ where C: RaftTypeConfig
/// Start to elect this node as leader
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
let new_term = self.state.vote.leader_id().term().next();
let new_vote = Vote::new(new_term, self.config.id.clone());
let new_term = self.state.vote.leader_id_ref().term().next();
let leader_id = LeaderIdOf::<C>::new(new_term, self.config.id.clone());
let new_vote = VoteOf::<C>::from_leader_id(leader_id, false);

let candidate = self.new_candidate(new_vote.clone());

Expand Down Expand Up @@ -754,7 +755,7 @@ where C: RaftTypeConfig
};

debug_assert!(
leader.committed_vote_ref().clone().into_vote() >= *self.state.vote_ref(),
leader.committed_vote_ref().cmp_key() >= self.state.vote_ref().cmp_key(),

Check failure on line 758 in openraft/src/engine/engine_impl.rs

View workflow job for this annotation

GitHub Actions / test-crate-openraft (stable)

the method `cmp_key` exists for reference `&CommittedVote<C>`, but its trait bounds were not satisfied
"leader.vote({}) >= state.vote({})",
leader.committed_vote_ref(),
self.state.vote_ref()
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/engine/handler/establish_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::base::cmp::CompareByKey;
use crate::engine::EngineConfig;
use crate::proposer::Candidate;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
use crate::proposer::LeaderState;
use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
use crate::RaftTypeConfig;

/// Establish a leader for the Engine, when Candidate finishes voting stage.
Expand All @@ -25,14 +27,14 @@ where C: RaftTypeConfig
let vote = candidate.vote_ref().clone();

debug_assert_eq!(
vote.leader_id().node_id_ref(),
vote.leader_id_ref().node_id_ref(),
Some(&self.config.id),
"it can only commit its own vote"
);

if let Some(l) = self.leader.as_ref() {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
if !(vote > l.committed_vote_ref().clone().into_vote()) {
if !(vote.cmp_key() > l.committed_vote_ref().cmp_key()) {

Check failure on line 37 in openraft/src/engine/handler/establish_handler/mod.rs

View workflow job for this annotation

GitHub Actions / test-crate-openraft (stable)

the method `cmp_key` exists for reference `&CommittedVote<C>`, but its trait bounds were not satisfied
tracing::warn!(
"vote is not greater than current existing leader vote. Do not establish new leader and quit"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::MembershipState;
Expand All @@ -28,15 +30,13 @@ fn m23() -> Membership<UTConfig> {
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
let mut eng: Engine<UTConfig> = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.config.id = 2;
eng.state.vote.update(
UTConfig::<()>::now(),
Duration::from_millis(500),
Vote::new_committed(2, 1),
);
let vote = VoteOf::<UTConfig>::new_committed(2, 1);
let now = UTConfig::<()>::now();
eng.state.vote.update(now, Duration::from_millis(500), vote);
eng.state.log_ids.append(log_id(1, 1, 1));
eng.state.log_ids.append(log_id(2, 1, 3));
eng.state.membership_state = MembershipState::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::raft_state::LogStateReader;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::MembershipState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::EntryPayload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use crate::raft_state::LogStateReader;
use crate::storage::Snapshot;
use crate::storage::SnapshotMeta;
use crate::testing::log_id;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::StoredMembership;
Expand All @@ -31,14 +33,12 @@ fn m1234() -> Membership<UTConfig> {
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
let mut eng: Engine<UTConfig> = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.state.vote.update(
UTConfig::<()>::now(),
Duration::from_millis(500),
Vote::new_committed(2, 1),
);
let now = UTConfig::<()>::now();
let vote = VoteOf::<UTConfig>::new_committed(2, 1);
eng.state.vote.update(now, Duration::from_millis(500), vote);
eng.state.committed = Some(log_id(4, 1, 5));
eng.state.log_ids = LogIdList::new(vec![
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::Membership;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::replication::ReplicationSessionId;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::MembershipState;
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::raft_state::LogStateReader;
use crate::replication::request::Replicate;
use crate::replication::response::ReplicationResult;
use crate::type_config::alias::InstantOf;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::LogId;
use crate::LogIdOptionExt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::testing::log_id;
use crate::type_config::alias::EntryOf;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Membership;
use crate::Vote;
Expand Down
11 changes: 7 additions & 4 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;
use std::time::Duration;

use crate::base::cmp::CompareByKey;
use crate::core::raft_msg::ResultSender;
use crate::engine::handler::leader_handler::LeaderHandler;
use crate::engine::handler::replication_handler::ReplicationHandler;
Expand All @@ -19,7 +20,9 @@ use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftState;
Expand Down Expand Up @@ -103,7 +106,7 @@ where C: RaftTypeConfig
// Partial ord compare:
// Vote does not have to be total ord.
// `!(a >= b)` does not imply `a < b`.
if vote >= self.state.vote_ref() {
if vote.cmp_key() >= self.state.vote_ref().cmp_key() {
// Ok
} else {
tracing::info!("vote {} is rejected by local vote: {}", vote, self.state.vote_ref());
Expand All @@ -123,7 +126,7 @@ where C: RaftTypeConfig
Duration::default()
};

if vote > self.state.vote_ref() {
if vote.cmp_key() > self.state.vote_ref().cmp_key() {
tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote);

self.state.vote.update(C::now(), leader_lease, vote.clone());
Expand Down Expand Up @@ -168,7 +171,7 @@ where C: RaftTypeConfig
if let Some(l) = self.leader.as_mut() {
tracing::debug!("leading vote: {}", l.committed_vote,);

if l.committed_vote.clone().into_vote().leader_id() == self.state.vote_ref().leader_id() {
if l.committed_vote.clone().into_vote().leader_id_ref() == self.state.vote_ref().leader_id_ref() {

Check failure on line 174 in openraft/src/engine/handler/vote_handler/mod.rs

View workflow job for this annotation

GitHub Actions / rt-monoio

mismatched types

Check failure on line 174 in openraft/src/engine/handler/vote_handler/mod.rs

View workflow job for this annotation

GitHub Actions / test-crate-openraft (nightly)

mismatched types

Check failure on line 174 in openraft/src/engine/handler/vote_handler/mod.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

mismatched types

Check failure on line 174 in openraft/src/engine/handler/vote_handler/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

mismatched types

Check failure on line 174 in openraft/src/engine/handler/vote_handler/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

mismatched types

Check failure on line 174 in openraft/src/engine/handler/vote_handler/mod.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde,singlethreaded)

mismatched types
tracing::debug!(
"vote still belongs to the same leader. Just updating vote is enough: node-{}, {}",
self.config.id,
Expand All @@ -177,7 +180,7 @@ where C: RaftTypeConfig
// TODO: this is not gonna happen,
// because `self.leader`(previous `internal_server_state`)
// does not include Candidate any more.
l.committed_vote = self.state.vote_ref().clone().into_committed();
l.committed_vote = self.state.vote_ref().to_committed();
self.server_state_handler().update_server_state_if_changed();
return;
}
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ where N: Node + Ord
type R = ();
type NodeId = u64;
type Node = N;
type Entry = crate::Entry<Self>;
type Term = u64;
type LeaderId = crate::impls::leader_id_adv::LeaderId<Self>;
type Vote = crate::impls::Vote<Self>;
type Entry = crate::Entry<Self>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/tests/append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::Membership;
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::replication::request::Replicate;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::Membership;
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/tests/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::raft_state::LogStateReader;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::Entry;
use crate::LogId;
use crate::Membership;
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/tests/install_full_snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::storage::Snapshot;
use crate::storage::SnapshotMeta;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
use crate::Membership;
use crate::StoredMembership;
use crate::Vote;
Expand All @@ -30,7 +31,7 @@ fn m1234() -> Membership<UTConfig> {
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
let mut eng: Engine<UTConfig> = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.state.vote.update(
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/tests/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::replication::request::Replicate;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
use crate::utime::Leased;
use crate::vote::raft_vote::RaftVoteExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::Membership;
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ pub mod leader_id_adv {
pub mod leader_id_std {
pub use crate::vote::leader_id::leader_id_std::LeaderId;
}

/// Default [`RaftVote`] implementation for both standard Raft mode and multi-leader-per-term mode.
///
/// The difference between the two modes is the implementation of [`RaftLeaderId`].
///
/// [`RaftVote`]: crate::vote::raft_vote::RaftVote
/// [`RaftLeaderId`]: crate::vote::RaftLeaderId
pub use crate::vote::Vote;
Loading

0 comments on commit a13b95a

Please sign in to comment.