diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index a539bf0e2..157454c6c 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -52,14 +52,14 @@ pub struct StoredSnapshot { #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachine { - pub last_applied_log: Option>, + pub last_applied_log: Option>, pub last_membership: StoredMembership, } pub struct LogStore { vote: RwLock>>, log: RwLock>>, - last_purged_log_id: RwLock>>, + last_purged_log_id: RwLock>>, } impl LogStore { @@ -203,7 +203,7 @@ impl RaftLogStorage for Arc { } #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { let mut log = self.log.write().await; log.split_off(&log_id.index); @@ -211,7 +211,7 @@ impl RaftLogStorage for Arc { } #[tracing::instrument(level = "debug", skip_all)] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { { let mut p = self.last_purged_log_id.write().await; *p = Some(log_id); @@ -244,7 +244,7 @@ impl RaftLogStorage for Arc { impl RaftStateMachine for Arc { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { let sm = self.sm.read().await; Ok((sm.last_applied_log, sm.last_membership.clone())) } diff --git a/examples/memstore/src/log_store.rs b/examples/memstore/src/log_store.rs index cc8e5114a..4f667453f 100644 --- a/examples/memstore/src/log_store.rs +++ b/examples/memstore/src/log_store.rs @@ -24,13 +24,13 @@ pub struct LogStore { #[derive(Debug)] pub struct LogStoreInner { /// The last purged log id. - last_purged_log_id: Option>, + last_purged_log_id: Option>, /// The Raft log. log: BTreeMap, /// The commit log id. - committed: Option>, + committed: Option>, /// The current granted vote. vote: Option>, @@ -75,12 +75,12 @@ impl LogStoreInner { }) } - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { self.committed = committed; Ok(()) } - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result>, StorageError> { Ok(self.committed.clone()) } @@ -104,7 +104,7 @@ impl LogStoreInner { Ok(()) } - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { let keys = self.log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); for key in keys { self.log.remove(&key); @@ -113,7 +113,7 @@ impl LogStoreInner { Ok(()) } - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { { let ld = &mut self.last_purged_log_id; assert!(ld.as_ref() <= Some(&log_id)); @@ -173,12 +173,12 @@ mod impl_log_store { inner.get_log_state().await } - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { let mut inner = self.inner.lock().await; inner.save_committed(committed).await } - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result>, StorageError> { let mut inner = self.inner.lock().await; inner.read_committed().await } @@ -194,12 +194,12 @@ mod impl_log_store { inner.append(entries, callback).await } - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { let mut inner = self.inner.lock().await; inner.truncate(log_id).await } - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { let mut inner = self.inner.lock().await; inner.purge(log_id).await } diff --git a/examples/raft-kv-memstore-grpc/src/lib.rs b/examples/raft-kv-memstore-grpc/src/lib.rs index deeae55ae..8fcdd50c0 100644 --- a/examples/raft-kv-memstore-grpc/src/lib.rs +++ b/examples/raft-kv-memstore-grpc/src/lib.rs @@ -73,7 +73,7 @@ impl From for typ::Vote { } } -impl From for LogId { +impl From for LogId { fn from(proto_log_id: protobuf::LogId) -> Self { let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into(); LogId::new(leader_id, proto_log_id.index) @@ -116,8 +116,8 @@ impl From for protobuf::Vote { } } } -impl From> for protobuf::LogId { - fn from(log_id: LogId) -> Self { +impl From> for protobuf::LogId { + fn from(log_id: LogId) -> Self { protobuf::LogId { index: log_id.index, leader_id: Some(log_id.leader_id.into()), diff --git a/examples/raft-kv-memstore-grpc/src/store/mod.rs b/examples/raft-kv-memstore-grpc/src/store/mod.rs index 42f7707af..a6792902d 100644 --- a/examples/raft-kv-memstore-grpc/src/store/mod.rs +++ b/examples/raft-kv-memstore-grpc/src/store/mod.rs @@ -19,7 +19,6 @@ use serde::Serialize; use crate::protobuf::Response; use crate::typ; -use crate::NodeId; use crate::TypeConfig; pub type LogStore = memstore::LogStore; @@ -39,7 +38,7 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option>, pub last_membership: StoredMembership, @@ -126,7 +125,7 @@ impl RaftStateMachine for Arc { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { let state_machine = self.state_machine.lock().unwrap(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } diff --git a/examples/raft-kv-memstore-network-v2/src/store.rs b/examples/raft-kv-memstore-network-v2/src/store.rs index 089f9b323..53fe37aad 100644 --- a/examples/raft-kv-memstore-network-v2/src/store.rs +++ b/examples/raft-kv-memstore-network-v2/src/store.rs @@ -17,7 +17,6 @@ use serde::Deserialize; use serde::Serialize; use crate::typ; -use crate::NodeId; use crate::TypeConfig; pub type LogStore = memstore::LogStore; @@ -56,7 +55,7 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option>, pub last_membership: StoredMembership, @@ -133,7 +132,7 @@ impl RaftStateMachine for Arc { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { let state_machine = self.state_machine.lock().unwrap(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } diff --git a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs index 1f6603870..c9f95c391 100644 --- a/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs +++ b/examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs @@ -20,7 +20,6 @@ use serde::Serialize; use crate::decode_buffer; use crate::encode; use crate::typ; -use crate::NodeId; use crate::TypeConfig; pub type LogStore = memstore::LogStore; @@ -59,7 +58,7 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option>, pub last_membership: StoredMembership, @@ -154,7 +153,7 @@ impl RaftStateMachine for Arc { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { let state_machine = self.state_machine.lock().unwrap(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } diff --git a/examples/raft-kv-memstore-singlethreaded/src/store.rs b/examples/raft-kv-memstore-singlethreaded/src/store.rs index 1ed94bdb1..54f672640 100644 --- a/examples/raft-kv-memstore-singlethreaded/src/store.rs +++ b/examples/raft-kv-memstore-singlethreaded/src/store.rs @@ -24,7 +24,6 @@ use openraft::Vote; use serde::Deserialize; use serde::Serialize; -use crate::NodeId; use crate::TypeConfig; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -85,7 +84,7 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied: Option>, + pub last_applied: Option>, pub last_membership: StoredMembership, @@ -108,12 +107,12 @@ pub struct StateMachineStore { #[derive(Debug, Default)] pub struct LogStore { - last_purged_log_id: RefCell>>, + last_purged_log_id: RefCell>>, /// The Raft log. log: RefCell>>, - committed: RefCell>>, + committed: RefCell>>, /// The current granted vote. vote: RefCell>>, @@ -190,7 +189,7 @@ impl RaftStateMachine for Rc { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { let state_machine = self.state_machine.borrow(); Ok((state_machine.last_applied, state_machine.last_membership.clone())) } @@ -300,13 +299,13 @@ impl RaftLogStorage for Rc { }) } - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { let mut c = self.committed.borrow_mut(); *c = committed; Ok(()) } - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result>, StorageError> { let committed = self.committed.borrow(); Ok(*committed) } @@ -332,7 +331,7 @@ impl RaftLogStorage for Rc { } #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); let mut log = self.log.borrow_mut(); @@ -345,7 +344,7 @@ impl RaftLogStorage for Rc { } #[tracing::instrument(level = "debug", skip(self))] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: (-oo, {:?}]", log_id); { diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index d98dd855e..b5a83157f 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -19,7 +19,6 @@ use serde::Deserialize; use serde::Serialize; use tokio::sync::RwLock; -use crate::NodeId; use crate::TypeConfig; pub type LogStore = memstore::LogStore; @@ -63,7 +62,7 @@ pub struct StoredSnapshot { /// and value as String, but you could set any type of value that has the serialization impl. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct StateMachineData { - pub last_applied_log: Option>, + pub last_applied_log: Option>, pub last_membership: StoredMembership, @@ -136,7 +135,7 @@ impl RaftStateMachine for Arc { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { let state_machine = self.state_machine.read().await; Ok((state_machine.last_applied_log, state_machine.last_membership.clone())) } diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index b5766f641..e2b3ca931 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -36,7 +36,6 @@ use serde::Serialize; use tokio::sync::RwLock; use crate::typ; -use crate::NodeId; use crate::SnapshotData; use crate::TypeConfig; @@ -88,7 +87,7 @@ pub struct StateMachineStore { #[derive(Debug, Clone)] pub struct StateMachineData { - pub last_applied_log_id: Option>, + pub last_applied_log_id: Option>, pub last_membership: StoredMembership, @@ -195,7 +194,7 @@ impl RaftStateMachine for StateMachineStore { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { Ok((self.data.last_applied_log_id, self.data.last_membership.clone())) } @@ -299,7 +298,7 @@ impl LogStore { Ok(()) } - fn get_last_purged_(&self) -> StorageResult>> { + fn get_last_purged_(&self) -> StorageResult>> { Ok(self .db .get_cf(self.store(), b"last_purged_log_id") @@ -307,7 +306,7 @@ impl LogStore { .and_then(|v| serde_json::from_slice(&v).ok())) } - fn set_last_purged_(&self, log_id: LogId) -> StorageResult<()> { + fn set_last_purged_(&self, log_id: LogId) -> StorageResult<()> { self.db .put_cf( self.store(), @@ -320,7 +319,7 @@ impl LogStore { Ok(()) } - fn set_committed_(&self, committed: &Option>) -> Result<(), StorageError> { + fn set_committed_(&self, committed: &Option>) -> Result<(), StorageError> { let json = serde_json::to_vec(committed).unwrap(); self.db.put_cf(self.store(), b"committed", json).map_err(|e| StorageError::write(&e))?; @@ -329,7 +328,7 @@ impl LogStore { Ok(()) } - fn get_committed_(&self) -> StorageResult>> { + fn get_committed_(&self) -> StorageResult>> { Ok(self .db .get_cf(self.store(), b"committed") @@ -407,12 +406,12 @@ impl RaftLogStorage for LogStore { }) } - async fn save_committed(&mut self, _committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, _committed: Option>) -> Result<(), StorageError> { self.set_committed_(&_committed)?; Ok(()) } - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result>, StorageError> { let c = self.get_committed_()?; Ok(c) } @@ -446,7 +445,7 @@ impl RaftLogStorage for LogStore { } #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> StorageResult<()> { + async fn truncate(&mut self, log_id: LogId) -> StorageResult<()> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); let from = id_to_bin(log_id.index); @@ -455,7 +454,7 @@ impl RaftLogStorage for LogStore { } #[tracing::instrument(level = "debug", skip(self))] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [0, {:?}]", log_id); self.set_last_purged_(log_id)?; diff --git a/openraft/src/core/heartbeat/event.rs b/openraft/src/core/heartbeat/event.rs index 54c70a658..423bcfa7f 100644 --- a/openraft/src/core/heartbeat/event.rs +++ b/openraft/src/core/heartbeat/event.rs @@ -30,17 +30,13 @@ where C: RaftTypeConfig /// /// When there are no new logs to replicate, the Leader sends a heartbeat to replicate committed /// log id to followers to update their committed log id. - pub(crate) committed: Option>, + pub(crate) committed: Option>, } impl HeartbeatEvent where C: RaftTypeConfig { - pub(crate) fn new( - time: InstantOf, - session_id: ReplicationSessionId, - committed: Option>, - ) -> Self { + pub(crate) fn new(time: InstantOf, session_id: ReplicationSessionId, committed: Option>) -> Self { Self { time, session_id, diff --git a/openraft/src/core/notification.rs b/openraft/src/core/notification.rs index b3c69fee6..553882c45 100644 --- a/openraft/src/core/notification.rs +++ b/openraft/src/core/notification.rs @@ -39,7 +39,7 @@ where C: RaftTypeConfig leader_vote: CommittedVote, // TODO: need this? // /// The cluster this replication works for. - // membership_log_id: Option>, + // membership_log_id: Option>, }, /// [`StorageError`] error has taken place locally(not on remote node), diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index cd6c7410e..9345a1012 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -107,7 +107,7 @@ use crate::StorageError; /// A temp struct to hold the data for a node that is being applied. #[derive(Debug)] pub(crate) struct ApplyingEntry { - log_id: LogId, + log_id: LogId, membership: Option>, } @@ -124,7 +124,7 @@ where C: RaftTypeConfig } impl ApplyingEntry { - pub(crate) fn new(log_id: LogId, membership: Option>) -> Self { + pub(crate) fn new(log_id: LogId, membership: Option>) -> Self { Self { log_id, membership } } } @@ -133,7 +133,7 @@ impl ApplyingEntry { pub(crate) struct ApplyResult { pub(crate) since: u64, pub(crate) end: u64, - pub(crate) last_applied: LogId, + pub(crate) last_applied: LogId, pub(crate) applying_entries: Vec>, pub(crate) apply_results: Vec, } @@ -545,7 +545,7 @@ where .map(|(id, p)| { ( id.clone(), - as Borrow>>>::borrow(p).clone(), + as Borrow>>>::borrow(p).clone(), ) }) .collect(), @@ -761,8 +761,8 @@ where #[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn apply_to_state_machine( &mut self, - first: LogId, - last: LogId, + first: LogId, + last: LogId, ) -> Result<(), StorageError> { tracing::debug!("{}: {}..={}", func_name!(), first, last); diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 08eb0f32c..a9a57be27 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -65,12 +65,12 @@ where C: RaftTypeConfig }, /// Replicate the committed log id to other nodes - ReplicateCommitted { committed: Option> }, + ReplicateCommitted { committed: Option> }, /// Broadcast heartbeat to all other nodes. BroadcastHeartbeat { session_id: ReplicationSessionId, - committed: Option>, + committed: Option>, }, /// Save the committed log id to [`RaftLogStorage`]. @@ -79,7 +79,7 @@ where C: RaftTypeConfig /// latest state. /// /// [`RaftLogStorage`]: crate::storage::RaftLogStorage - SaveCommitted { committed: LogId }, + SaveCommitted { committed: LogId }, /// Commit log entries that are already persisted in the store, upto `upto`, inclusive. /// @@ -91,8 +91,8 @@ where C: RaftTypeConfig /// [`RaftLogStorage::save_committed()`]: crate::storage::RaftLogStorage::save_committed /// [`RaftStateMachine::apply()`]: crate::storage::RaftStateMachine::apply Apply { - already_committed: Option>, - upto: LogId, + already_committed: Option>, + upto: LogId, }, /// Replicate log entries or snapshot to a target. @@ -118,11 +118,11 @@ where C: RaftTypeConfig SendVote { vote_req: VoteRequest }, /// Purge log from the beginning to `upto`, inclusive. - PurgeLog { upto: LogId }, + PurgeLog { upto: LogId }, /// Delete logs that conflict with the leader from a follower/learner since log id `since`, /// inclusive. - TruncateLog { since: LogId }, + TruncateLog { since: LogId }, /// A command send to state machine worker [`sm::worker::Worker`]. /// @@ -296,14 +296,14 @@ where C: RaftTypeConfig /// This is only used by [`Raft::initialize()`], because when initializing there is no leader. /// /// [`Raft::initialize()`]: `crate::Raft::initialize()` - LogFlushed { log_id: Option> }, + LogFlushed { log_id: Option> }, /// Wait until the log is applied to the state machine. #[allow(dead_code)] - Applied { log_id: Option> }, + Applied { log_id: Option> }, /// Wait until snapshot is built and includes the log id. - Snapshot { log_id: Option> }, + Snapshot { log_id: Option> }, } impl fmt::Display for Condition diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index d01649217..c762e5e1e 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -379,7 +379,7 @@ where C: RaftTypeConfig pub(crate) fn handle_append_entries( &mut self, vote: &Vote, - prev_log_id: Option>, + prev_log_id: Option>, entries: Vec, tx: Option>, ) -> bool { @@ -418,7 +418,7 @@ where C: RaftTypeConfig pub(crate) fn append_entries( &mut self, vote: &Vote, - prev_log_id: Option>, + prev_log_id: Option>, entries: Vec, ) -> Result<(), RejectAppendEntries> { self.vote_handler().update_vote(vote)?; @@ -434,7 +434,7 @@ where C: RaftTypeConfig /// Commit entries for follower/learner. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn handle_commit_entries(&mut self, leader_committed: Option>) { + pub(crate) fn handle_commit_entries(&mut self, leader_committed: Option>) { tracing::debug!( leader_committed = display(leader_committed.display()), my_accepted = display(self.state.accepted_io().display()), @@ -654,7 +654,7 @@ where C: RaftTypeConfig self.leader_handler() .unwrap() - .leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); + .leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); } /// Check if a raft node is in a state that allows to initialize. diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index bf9078bff..1881d328e 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -59,7 +59,7 @@ where C: RaftTypeConfig /// /// Also clean conflicting entries and update membership state. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn append_entries(&mut self, prev_log_id: Option>, mut entries: Vec) { + pub(crate) fn append_entries(&mut self, prev_log_id: Option>, mut entries: Vec) { tracing::debug!( "{}: local last_log_id: {}, request: prev_log_id: {}, entries: {}", func_name!(), @@ -113,7 +113,7 @@ where C: RaftTypeConfig /// If not, truncate the local log and return an error. pub(crate) fn ensure_log_consecutive( &mut self, - prev_log_id: Option<&LogId>, + prev_log_id: Option<&LogId>, ) -> Result<(), RejectAppendEntries> { if let Some(prev) = prev_log_id { if !self.state.has_log_id(prev) { @@ -161,7 +161,7 @@ where C: RaftTypeConfig /// Commit entries that are already committed by the leader. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn commit_entries(&mut self, leader_committed: Option>) { + pub(crate) fn commit_entries(&mut self, leader_committed: Option>) { let accepted = self.state.accepted_io().cloned(); let accepted = accepted.and_then(|x| x.last_log_id().cloned()); let committed = std::cmp::min(accepted.clone(), leader_committed.clone()); diff --git a/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs b/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs index f1d3e88b3..fb03e2c44 100644 --- a/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs +++ b/openraft/src/engine/handler/log_handler/calc_purge_upto_test.rs @@ -4,8 +4,8 @@ use crate::engine::LogIdList; use crate::CommittedLeaderId; use crate::LogId; -fn log_id(term: u64, index: u64) -> LogId { - LogId:: { +fn log_id(term: u64, index: u64) -> LogId { + LogId { leader_id: CommittedLeaderId::new(term, 0), index, } diff --git a/openraft/src/engine/handler/log_handler/mod.rs b/openraft/src/engine/handler/log_handler/mod.rs index d66e3d72d..f72101a64 100644 --- a/openraft/src/engine/handler/log_handler/mod.rs +++ b/openraft/src/engine/handler/log_handler/mod.rs @@ -61,7 +61,7 @@ where C: RaftTypeConfig /// Update the log id it expect to purge up to. It won't trigger purge immediately. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_purge_upto(&mut self, purge_upto: LogId) { + pub(crate) fn update_purge_upto(&mut self, purge_upto: LogId) { debug_assert!(self.state.purge_upto() <= Some(&purge_upto)); self.state.purge_upto = Some(purge_upto); } @@ -74,7 +74,7 @@ where C: RaftTypeConfig /// `max_keep` specifies the number of applied logs to keep. /// `max_keep==0` means every applied log can be purged. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn calc_purge_upto(&self) -> Option> { + pub(crate) fn calc_purge_upto(&self) -> Option> { let st = &self.state; let max_keep = self.config.max_in_snapshot_log_to_keep; let batch_size = self.config.purge_batch_size; diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 7193c934d..c417d46f7 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -56,7 +56,7 @@ where C: RaftTypeConfig /// /// It is called by the leader when a new membership log is appended to log store. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn append_membership(&mut self, log_id: &LogId, m: &Membership) { + pub(crate) fn append_membership(&mut self, log_id: &LogId, m: &Membership) { tracing::debug!("update effective membership: log_id:{} {}", log_id, m); debug_assert!( @@ -149,7 +149,7 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_matching(&mut self, node_id: C::NodeId, log_id: Option>) { + pub(crate) fn update_matching(&mut self, node_id: C::NodeId, log_id: Option>) { tracing::debug!( node_id = display(&node_id), log_id = display(log_id.display()), @@ -182,7 +182,7 @@ where C: RaftTypeConfig /// /// In raft a log that is granted and in the leader term is committed. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option>) { + pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option>) { // Only when the log id is proposed by current leader, it is committed. if let Some(ref c) = granted { if !self.state.vote_ref().is_same_leader(c.committed_leader_id()) { @@ -213,7 +213,7 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) does not match follower/learner state /// and is rejected. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_conflicting(&mut self, target: C::NodeId, conflict: LogId) { + pub(crate) fn update_conflicting(&mut self, target: C::NodeId, conflict: LogId) { // TODO(2): test it? let prog_entry = self.leader.progress.get_mut(&target).unwrap(); @@ -390,7 +390,7 @@ where C: RaftTypeConfig /// /// Writing to local log store does not have to wait for a replication response from remote /// node. Thus it can just be done in a fast-path. - pub(crate) fn update_local_progress(&mut self, upto: Option>) { + pub(crate) fn update_local_progress(&mut self, upto: Option>) { tracing::debug!(upto = display(upto.display()), "{}", func_name!()); if upto.is_none() { diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index bea73a5b0..7da6c0b51 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -209,8 +209,7 @@ where C: RaftTypeConfig // If the leader has not yet proposed any log, propose a blank log and initiate replication; // Otherwise, just initiate replication. if last_log_id < noop_log_id { - self.leader_handler() - .leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); + self.leader_handler().leader_append_entries(vec![C::Entry::new_blank(LogId::::default())]); } else { self.replication_handler().initiate_replication(); } diff --git a/openraft/src/engine/log_id_list.rs b/openraft/src/engine/log_id_list.rs index 268ce45a9..87076ea85 100644 --- a/openraft/src/engine/log_id_list.rs +++ b/openraft/src/engine/log_id_list.rs @@ -22,7 +22,7 @@ use crate::StorageError; pub struct LogIdList where C: RaftTypeConfig { - key_log_ids: Vec>, + key_log_ids: Vec>, } impl LogIdList @@ -49,7 +49,7 @@ where C: RaftTypeConfig /// A-------C-------C : find(A,C) /// ``` pub(crate) async fn get_key_log_ids( - range: RangeInclusive>, + range: RangeInclusive>, sto: &mut LR, ) -> Result>, StorageError> where @@ -122,7 +122,7 @@ where C: RaftTypeConfig /// Create a new `LogIdList`. /// /// It stores the last purged log id, and a series of key log ids. - pub fn new(key_log_ids: impl IntoIterator>) -> Self { + pub fn new(key_log_ids: impl IntoIterator>) -> Self { Self { key_log_ids: key_log_ids.into_iter().collect(), } @@ -131,7 +131,7 @@ where C: RaftTypeConfig /// Extends a list of `log_id` that are proposed by a same leader. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { + pub(crate) fn extend_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { if let Some(first) = new_ids.first() { let first_id = first.get_log_id(); self.append(first_id.clone()); @@ -149,7 +149,7 @@ where C: RaftTypeConfig /// Extends a list of `log_id`. #[allow(dead_code)] - pub(crate) fn extend<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { + pub(crate) fn extend<'a, LID: RaftLogId + 'a>(&mut self, new_ids: &[LID]) { let mut prev = self.last().map(|x| x.leader_id.clone()); for x in new_ids.iter() { @@ -179,7 +179,7 @@ where C: RaftTypeConfig /// /// NOTE: The last two in `key_log_ids` may be with the same `leader_id`, because `last_log_id` /// always present in `log_ids`. - pub(crate) fn append(&mut self, new_log_id: LogId) { + pub(crate) fn append(&mut self, new_log_id: LogId) { let l = self.key_log_ids.len(); if l == 0 { self.key_log_ids.push(new_log_id); @@ -245,7 +245,7 @@ where C: RaftTypeConfig /// Purge log ids upto the log with index `upto_index`, inclusive. #[allow(dead_code)] - pub(crate) fn purge(&mut self, upto: &LogId) { + pub(crate) fn purge(&mut self, upto: &LogId) { let last = self.last().cloned(); // When installing snapshot it may need to purge across the `last_log_id`. @@ -277,7 +277,7 @@ where C: RaftTypeConfig /// Get the log id at the specified index. /// /// It will return `last_purged_log_id` if index is at the last purged index. - pub(crate) fn get(&self, index: u64) -> Option> { + pub(crate) fn get(&self, index: u64) -> Option> { let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index)); match res { @@ -292,17 +292,17 @@ where C: RaftTypeConfig } } - pub(crate) fn first(&self) -> Option<&LogId> { + pub(crate) fn first(&self) -> Option<&LogId> { self.key_log_ids.first() } - pub(crate) fn last(&self) -> Option<&LogId> { + pub(crate) fn last(&self) -> Option<&LogId> { self.key_log_ids.last() } // This method will only be used under feature tokio-rt #[cfg_attr(not(feature = "tokio-rt"), allow(dead_code))] - pub(crate) fn key_log_ids(&self) -> &[LogId] { + pub(crate) fn key_log_ids(&self) -> &[LogId] { &self.key_log_ids } diff --git a/openraft/src/entry/mod.rs b/openraft/src/entry/mod.rs index 60ab5c39f..7cd013b55 100644 --- a/openraft/src/entry/mod.rs +++ b/openraft/src/entry/mod.rs @@ -21,7 +21,7 @@ pub use traits::RaftPayload; pub struct Entry where C: RaftTypeConfig { - pub log_id: LogId, + pub log_id: LogId, /// This entry's payload. pub payload: EntryPayload, @@ -97,14 +97,14 @@ where C: RaftTypeConfig } } -impl RaftLogId for Entry +impl RaftLogId for Entry where C: RaftTypeConfig { - fn get_log_id(&self) -> &LogId { + fn get_log_id(&self) -> &LogId { &self.log_id } - fn set_log_id(&mut self, log_id: &LogId) { + fn set_log_id(&mut self, log_id: &LogId) { self.log_id = log_id.clone(); } } @@ -112,14 +112,14 @@ where C: RaftTypeConfig impl RaftEntry for Entry where C: RaftTypeConfig { - fn new_blank(log_id: LogId) -> Self { + fn new_blank(log_id: LogId) -> Self { Self { log_id, payload: EntryPayload::Blank, } } - fn new_membership(log_id: LogId, m: Membership) -> Self { + fn new_membership(log_id: LogId, m: Membership) -> Self { Self { log_id, payload: EntryPayload::Membership(m), diff --git a/openraft/src/entry/traits.rs b/openraft/src/entry/traits.rs index 59d24f469..53c80bb39 100644 --- a/openraft/src/entry/traits.rs +++ b/openraft/src/entry/traits.rs @@ -21,7 +21,7 @@ where C: RaftTypeConfig } /// Defines operations on an entry. -pub trait RaftEntry: RaftPayload + RaftLogId +pub trait RaftEntry: RaftPayload + RaftLogId where C: RaftTypeConfig, Self: OptionalSerde + Debug + Display + OptionalSend + OptionalSync, @@ -29,12 +29,12 @@ where /// Create a new blank log entry. /// /// The returned instance must return `true` for `Self::is_blank()`. - fn new_blank(log_id: LogId) -> Self; + fn new_blank(log_id: LogId) -> Self; /// Create a new membership log entry. /// /// The returned instance must return `Some()` for `Self::get_membership()`. - fn new_membership(log_id: LogId, m: Membership) -> Self; + fn new_membership(log_id: LogId, m: Membership) -> Self; } /// Build a raft log entry from app data. diff --git a/openraft/src/error.rs b/openraft/src/error.rs index b2d8e283d..18265319a 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -587,8 +587,8 @@ pub struct QuorumNotEnough { #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] #[error("the cluster is already undergoing a configuration change at log {membership_log_id:?}, last committed membership log id: {committed:?}")] pub struct InProgress { - pub committed: Option>, - pub membership_log_id: Option>, + pub committed: Option>, + pub membership_log_id: Option>, } #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] @@ -602,7 +602,7 @@ pub struct LearnerNotFound { #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] #[error("not allowed to initialize due to current raft state: last_log_id: {last_log_id:?} vote: {vote}")] pub struct NotAllowed { - pub last_log_id: Option>, + pub last_log_id: Option>, pub vote: Vote, } @@ -640,7 +640,7 @@ pub(crate) enum RejectVoteRequest { #[allow(dead_code)] #[error("reject vote request by a greater last-log-id: {0:?}")] - ByLastLogId(Option>), + ByLastLogId(Option>), } impl From> for AppendEntriesResponse @@ -662,10 +662,7 @@ pub(crate) enum RejectAppendEntries { ByVote(Vote), #[error("reject AppendEntries because of conflicting log-id: {local:?}; expect to be: {expect:?}")] - ByConflictingLogId { - expect: LogId, - local: Option>, - }, + ByConflictingLogId { expect: LogId, local: Option> }, } impl From> for RejectAppendEntries diff --git a/openraft/src/log_id/log_id_option_ext.rs b/openraft/src/log_id/log_id_option_ext.rs index 39a2fc35b..c8f412d56 100644 --- a/openraft/src/log_id/log_id_option_ext.rs +++ b/openraft/src/log_id/log_id_option_ext.rs @@ -1,5 +1,5 @@ use crate::LogId; -use crate::NodeId; +use crate::RaftTypeConfig; /// This helper trait extracts information from an `Option`. pub trait LogIdOptionExt { @@ -12,7 +12,9 @@ pub trait LogIdOptionExt { fn next_index(&self) -> u64; } -impl LogIdOptionExt for Option> { +impl LogIdOptionExt for Option> +where C: RaftTypeConfig +{ fn index(&self) -> Option { self.as_ref().map(|x| x.index) } @@ -25,7 +27,9 @@ impl LogIdOptionExt for Option> { } } -impl LogIdOptionExt for Option<&LogId> { +impl LogIdOptionExt for Option<&LogId> +where C: RaftTypeConfig +{ fn index(&self) -> Option { self.map(|x| x.index) } diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index 6d1f12384..0f799ac5d 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -13,7 +13,7 @@ pub use log_index_option_ext::LogIndexOptionExt; pub use raft_log_id::RaftLogId; use crate::CommittedLeaderId; -use crate::NodeId; +use crate::RaftTypeConfig; /// The identity of a raft log. /// @@ -21,41 +21,54 @@ use crate::NodeId; /// parts: a leader id, which refers to the leader that proposed this log, and an integer index. #[derive(Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -pub struct LogId { +pub struct LogId +where C: RaftTypeConfig +{ /// The id of the leader that proposed this log - pub leader_id: CommittedLeaderId, + pub leader_id: CommittedLeaderId, /// The index of a log in the storage. /// /// Log index is a consecutive integer. pub index: u64, } -impl Copy for LogId where NID: NodeId + Copy {} +impl Copy for LogId +where + C: RaftTypeConfig, + C::NodeId: Copy, +{ +} -impl RaftLogId for LogId { - fn get_log_id(&self) -> &LogId { +impl RaftLogId for LogId +where C: RaftTypeConfig +{ + fn get_log_id(&self) -> &LogId { self } - fn set_log_id(&mut self, log_id: &LogId) { + fn set_log_id(&mut self, log_id: &LogId) { *self = log_id.clone() } } -impl Display for LogId { +impl Display for LogId +where C: RaftTypeConfig +{ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}.{}", self.leader_id, self.index) } } -impl LogId { +impl LogId +where C: RaftTypeConfig +{ /// Creates a log id proposed by a committed leader with `leader_id` at the given index. - pub fn new(leader_id: CommittedLeaderId, index: u64) -> Self { + pub fn new(leader_id: CommittedLeaderId, index: u64) -> Self { LogId { leader_id, index } } /// Returns the leader id that proposed this log. - pub fn committed_leader_id(&self) -> &CommittedLeaderId { + pub fn committed_leader_id(&self) -> &CommittedLeaderId { &self.leader_id } } diff --git a/openraft/src/log_id/raft_log_id.rs b/openraft/src/log_id/raft_log_id.rs index 2b7839c75..c5f2a79f3 100644 --- a/openraft/src/log_id/raft_log_id.rs +++ b/openraft/src/log_id/raft_log_id.rs @@ -1,22 +1,24 @@ use crate::CommittedLeaderId; use crate::LogId; -use crate::NodeId; +use crate::RaftTypeConfig; /// Defines API to operate an object that contains a log-id, such as a log entry or a log id. -pub trait RaftLogId { +pub trait RaftLogId +where C: RaftTypeConfig +{ /// Returns a reference to the leader id that proposed this log id. /// /// When a `LeaderId` is committed, some of its data can be discarded. /// For example, a leader id in standard raft is `(term, node_id)`, but a log id does not have /// to store the `node_id`, because in standard raft there is at most one leader that can be /// established. - fn leader_id(&self) -> &CommittedLeaderId { + fn leader_id(&self) -> &CommittedLeaderId { self.get_log_id().committed_leader_id() } /// Return a reference to the log-id it stores. - fn get_log_id(&self) -> &LogId; + fn get_log_id(&self) -> &LogId; /// Update the log id it contains. - fn set_log_id(&mut self, log_id: &LogId); + fn set_log_id(&mut self, log_id: &LogId); } diff --git a/openraft/src/log_id_range.rs b/openraft/src/log_id_range.rs index 0be3fc60b..5a1f6ed0a 100644 --- a/openraft/src/log_id_range.rs +++ b/openraft/src/log_id_range.rs @@ -14,16 +14,23 @@ use crate::RaftTypeConfig; /// A log id range of continuous series of log entries. /// /// The range of log to send is left open right close: `(prev, last]`. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] #[derive(PartialEq, Eq)] pub(crate) struct LogIdRange where C: RaftTypeConfig { /// The prev log id before the first to send, exclusive. - pub(crate) prev: Option>, + pub(crate) prev: Option>, /// The last log id to send, inclusive. - pub(crate) last: Option>, + pub(crate) last: Option>, +} + +impl Copy for LogIdRange +where + C: RaftTypeConfig, + C::NodeId: Copy, +{ } impl Display for LogIdRange @@ -46,7 +53,7 @@ where C: RaftTypeConfig impl LogIdRange where C: RaftTypeConfig { - pub(crate) fn new(prev: Option>, last: Option>) -> Self { + pub(crate) fn new(prev: Option>, last: Option>) -> Self { Self { prev, last } } @@ -65,7 +72,7 @@ mod tests { use crate::CommittedLeaderId; use crate::LogId; - fn log_id(index: u64) -> LogId { + fn log_id(index: u64) -> LogId { LogId { leader_id: CommittedLeaderId::new(1, 1), index, diff --git a/openraft/src/membership/effective_membership.rs b/openraft/src/membership/effective_membership.rs index 2805c590d..7afc17e4e 100644 --- a/openraft/src/membership/effective_membership.rs +++ b/openraft/src/membership/effective_membership.rs @@ -55,7 +55,7 @@ where C: RaftTypeConfig impl From<(&LID, Membership)> for EffectiveMembership where C: RaftTypeConfig, - LID: RaftLogId, + LID: RaftLogId, { fn from(v: (&LID, Membership)) -> Self { EffectiveMembership::new(Some(v.0.get_log_id().clone()), v.1) @@ -65,11 +65,11 @@ where impl EffectiveMembership where C: RaftTypeConfig { - pub(crate) fn new_arc(log_id: Option>, membership: Membership) -> Arc { + pub(crate) fn new_arc(log_id: Option>, membership: Membership) -> Arc { Arc::new(Self::new(log_id, membership)) } - pub fn new(log_id: Option>, membership: Membership) -> Self { + pub fn new(log_id: Option>, membership: Membership) -> Self { let voter_ids = membership.voter_ids().collect(); let configs = membership.get_joint_config(); @@ -95,7 +95,7 @@ where C: RaftTypeConfig &self.stored_membership } - pub fn log_id(&self) -> &Option> { + pub fn log_id(&self) -> &Option> { self.stored_membership.log_id() } diff --git a/openraft/src/membership/stored_membership.rs b/openraft/src/membership/stored_membership.rs index 004714e25..78dd5ed61 100644 --- a/openraft/src/membership/stored_membership.rs +++ b/openraft/src/membership/stored_membership.rs @@ -21,7 +21,7 @@ pub struct StoredMembership where C: RaftTypeConfig { /// The id of the log that stores this membership config - log_id: Option>, + log_id: Option>, /// Membership config membership: Membership, @@ -30,11 +30,11 @@ where C: RaftTypeConfig impl StoredMembership where C: RaftTypeConfig { - pub fn new(log_id: Option>, membership: Membership) -> Self { + pub fn new(log_id: Option>, membership: Membership) -> Self { Self { log_id, membership } } - pub fn log_id(&self) -> &Option> { + pub fn log_id(&self) -> &Option> { &self.log_id } diff --git a/openraft/src/metrics/metric.rs b/openraft/src/metrics/metric.rs index e8aceb187..afba2ce7e 100644 --- a/openraft/src/metrics/metric.rs +++ b/openraft/src/metrics/metric.rs @@ -17,10 +17,10 @@ where C: RaftTypeConfig Term(u64), Vote(Vote), LastLogIndex(Option), - Applied(Option>), + Applied(Option>), AppliedIndex(Option), - Snapshot(Option>), - Purged(Option>), + Snapshot(Option>), + Purged(Option>), } impl Metric diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index c040913fc..584cb5ebc 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -38,17 +38,17 @@ pub struct RaftMetrics { pub last_log_index: Option, /// The last log index has been applied to this Raft node's state machine. - pub last_applied: Option>, + pub last_applied: Option>, /// The id of the last log included in snapshot. /// If there is no snapshot, it is (0,0). - pub snapshot: Option>, + pub snapshot: Option>, /// The last log id that has purged from storage, inclusive. /// /// `purged` is also the first log id Openraft knows, although the corresponding log entry has /// already been deleted. - pub purged: Option>, + pub purged: Option>, // --- // --- cluster --- @@ -189,10 +189,10 @@ where C: RaftTypeConfig #[derive(Clone, Debug, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct RaftDataMetrics { - pub last_log: Option>, - pub last_applied: Option>, - pub snapshot: Option>, - pub purged: Option>, + pub last_log: Option>, + pub last_applied: Option>, + pub snapshot: Option>, + pub purged: Option>, /// For a leader, it is the elapsed time in milliseconds since the most recently acknowledged /// timestamp by a quorum. diff --git a/openraft/src/metrics/wait.rs b/openraft/src/metrics/wait.rs index 405e72f66..2428411a4 100644 --- a/openraft/src/metrics/wait.rs +++ b/openraft/src/metrics/wait.rs @@ -212,7 +212,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] pub async fn snapshot( &self, - snapshot_last_log_id: LogId, + snapshot_last_log_id: LogId, msg: impl ToString, ) -> Result, WaitError> { self.eq(Metric::Snapshot(Some(snapshot_last_log_id)), msg).await @@ -220,11 +220,7 @@ where C: RaftTypeConfig /// Wait for `purged` to become `want` or timeout. #[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))] - pub async fn purged( - &self, - want: Option>, - msg: impl ToString, - ) -> Result, WaitError> { + pub async fn purged(&self, want: Option>, msg: impl ToString) -> Result, WaitError> { self.eq(Metric::Purged(want), msg).await } diff --git a/openraft/src/progress/entry/mod.rs b/openraft/src/progress/entry/mod.rs index 2ae91de4d..bdefd983c 100644 --- a/openraft/src/progress/entry/mod.rs +++ b/openraft/src/progress/entry/mod.rs @@ -24,7 +24,7 @@ pub(crate) struct ProgressEntry where C: RaftTypeConfig { /// The id of the last matching log on the target following node. - pub(crate) matching: Option>, + pub(crate) matching: Option>, /// The data being transmitted in flight. /// @@ -48,7 +48,7 @@ impl ProgressEntry where C: RaftTypeConfig { #[allow(dead_code)] - pub(crate) fn new(matching: Option>) -> Self { + pub(crate) fn new(matching: Option>) -> Self { Self { matching: matching.clone(), inflight: Inflight::None, @@ -85,7 +85,7 @@ where C: RaftTypeConfig /// Return if a range of log id `..=log_id` is inflight sending. /// /// `prev_log_id` is never inflight. - pub(crate) fn is_log_range_inflight(&self, upto: &LogId) -> bool { + pub(crate) fn is_log_range_inflight(&self, upto: &LogId) -> bool { match &self.inflight { Inflight::None => false, Inflight::Logs { log_id_range, .. } => { @@ -175,10 +175,10 @@ where C: RaftTypeConfig } } -impl Borrow>> for ProgressEntry +impl Borrow>> for ProgressEntry where C: RaftTypeConfig { - fn borrow(&self) -> &Option> { + fn borrow(&self) -> &Option> { &self.matching } } diff --git a/openraft/src/progress/entry/tests.rs b/openraft/src/progress/entry/tests.rs index 466e339db..9ab6ae960 100644 --- a/openraft/src/progress/entry/tests.rs +++ b/openraft/src/progress/entry/tests.rs @@ -8,7 +8,7 @@ use crate::raft_state::LogStateReader; use crate::CommittedLeaderId; use crate::LogId; -fn log_id(index: u64) -> LogId { +fn log_id(index: u64) -> LogId { LogId { leader_id: CommittedLeaderId::new(1, 1), index, @@ -87,10 +87,10 @@ fn test_update_conflicting() -> anyhow::Result<()> { /// LogStateReader impl for testing struct LogState { - last: Option>, - snap_last: Option>, - purge_upto: Option>, - purged: Option>, + last: Option>, + snap_last: Option>, + purge_upto: Option>, + purged: Option>, } impl LogState { @@ -107,7 +107,7 @@ impl LogState { } impl LogStateReader for LogState { - fn get_log_id(&self, index: u64) -> Option> { + fn get_log_id(&self, index: u64) -> Option> { let x = Some(log_id(index)); if x >= self.purged && x <= self.last { x @@ -116,35 +116,35 @@ impl LogStateReader for LogState { } } - fn last_log_id(&self) -> Option<&LogId> { + fn last_log_id(&self) -> Option<&LogId> { self.last.as_ref() } - fn committed(&self) -> Option<&LogId> { + fn committed(&self) -> Option<&LogId> { unimplemented!("testing") } - fn io_applied(&self) -> Option<&LogId> { + fn io_applied(&self) -> Option<&LogId> { todo!() } - fn io_snapshot_last_log_id(&self) -> Option<&LogId> { + fn io_snapshot_last_log_id(&self) -> Option<&LogId> { todo!() } - fn io_purged(&self) -> Option<&LogId> { + fn io_purged(&self) -> Option<&LogId> { todo!() } - fn snapshot_last_log_id(&self) -> Option<&LogId> { + fn snapshot_last_log_id(&self) -> Option<&LogId> { self.snap_last.as_ref() } - fn purge_upto(&self) -> Option<&LogId> { + fn purge_upto(&self) -> Option<&LogId> { self.purge_upto.as_ref() } - fn last_purged_log_id(&self) -> Option<&LogId> { + fn last_purged_log_id(&self) -> Option<&LogId> { self.purged.as_ref() } } diff --git a/openraft/src/progress/entry/update.rs b/openraft/src/progress/entry/update.rs index 04f211699..2ea98b618 100644 --- a/openraft/src/progress/entry/update.rs +++ b/openraft/src/progress/entry/update.rs @@ -76,7 +76,7 @@ where C: RaftTypeConfig } } - pub(crate) fn update_matching(&mut self, matching: Option>) { + pub(crate) fn update_matching(&mut self, matching: Option>) { tracing::debug!( "update_matching: current progress_entry: {}; matching: {}", self.entry, diff --git a/openraft/src/progress/inflight/mod.rs b/openraft/src/progress/inflight/mod.rs index 82bdf5db3..73112e0b0 100644 --- a/openraft/src/progress/inflight/mod.rs +++ b/openraft/src/progress/inflight/mod.rs @@ -17,7 +17,7 @@ use crate::RaftTypeConfig; /// /// If inflight data is non-None, it's waiting for responses from a follower/learner. /// The follower/learner respond with `ack()` or `conflict()` to update the state of inflight data. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] #[derive(PartialEq, Eq)] pub(crate) enum Inflight where C: RaftTypeConfig @@ -34,10 +34,17 @@ where C: RaftTypeConfig /// The last log id snapshot includes. /// /// It is None, if the snapshot is empty. - last_log_id: Option>, + last_log_id: Option>, }, } +impl Copy for Inflight +where + C: RaftTypeConfig, + C::NodeId: Copy, +{ +} + impl Validate for Inflight where C: RaftTypeConfig { @@ -68,7 +75,7 @@ impl Inflight where C: RaftTypeConfig { /// Create inflight state for sending logs. - pub(crate) fn logs(prev: Option>, last: Option>) -> Self { + pub(crate) fn logs(prev: Option>, last: Option>) -> Self { #![allow(clippy::nonminimal_bool)] if !(prev < last) { Self::None @@ -80,7 +87,7 @@ where C: RaftTypeConfig } /// Create inflight state for sending snapshot. - pub(crate) fn snapshot(snapshot_last_log_id: Option>) -> Self { + pub(crate) fn snapshot(snapshot_last_log_id: Option>) -> Self { Self::Snapshot { last_log_id: snapshot_last_log_id, } @@ -103,7 +110,7 @@ where C: RaftTypeConfig } /// Update inflight state when log upto `upto` is acknowledged by a follower/learner. - pub(crate) fn ack(&mut self, upto: Option>) { + pub(crate) fn ack(&mut self, upto: Option>) { match self { Inflight::None => { unreachable!("no inflight data") diff --git a/openraft/src/progress/inflight/tests.rs b/openraft/src/progress/inflight/tests.rs index b0d4e04ec..6bf7bd1fc 100644 --- a/openraft/src/progress/inflight/tests.rs +++ b/openraft/src/progress/inflight/tests.rs @@ -6,7 +6,7 @@ use crate::progress::Inflight; use crate::CommittedLeaderId; use crate::LogId; -fn log_id(index: u64) -> LogId { +fn log_id(index: u64) -> LogId { LogId { leader_id: CommittedLeaderId::new(1, 1), index, diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 45bb89252..21f802a63 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -80,7 +80,7 @@ where &self.vote } - pub(crate) fn last_log_id(&self) -> Option<&LogId> { + pub(crate) fn last_log_id(&self) -> Option<&LogId> { self.last_log_id.as_ref() } diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index 14c09b1c1..742fb7a4e 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -160,7 +160,7 @@ where /// Assign log ids to the entries. /// /// This method update the `self.last_log_id`. - pub(crate) fn assign_log_ids<'a, LID: RaftLogId + 'a>( + pub(crate) fn assign_log_ids<'a, LID: RaftLogId + 'a>( &mut self, entries: impl IntoIterator, ) { diff --git a/openraft/src/raft/message/append_entries.rs b/openraft/src/raft/message/append_entries.rs index 6f9a0e54d..484d56559 100644 --- a/openraft/src/raft/message/append_entries.rs +++ b/openraft/src/raft/message/append_entries.rs @@ -18,7 +18,7 @@ use crate::Vote; pub struct AppendEntriesRequest { pub vote: Vote, - pub prev_log_id: Option>, + pub prev_log_id: Option>, /// The new log entries to store. /// @@ -27,7 +27,7 @@ pub struct AppendEntriesRequest { pub entries: Vec, /// The leader's committed log id. - pub leader_commit: Option>, + pub leader_commit: Option>, } impl fmt::Debug for AppendEntriesRequest { @@ -86,7 +86,7 @@ pub enum AppendEntriesResponse { /// /// [`RPCError`]: crate::error::RPCError /// [`RaftNetwork::append_entries`]: crate::network::RaftNetwork::append_entries - PartialSuccess(Option>), + PartialSuccess(Option>), /// The first log id([`AppendEntriesRequest::prev_log_id`]) of the entries to send does not /// match on the remote target node. diff --git a/openraft/src/raft/message/client_write.rs b/openraft/src/raft/message/client_write.rs index 32cff5bbd..8ff2adad6 100644 --- a/openraft/src/raft/message/client_write.rs +++ b/openraft/src/raft/message/client_write.rs @@ -20,7 +20,7 @@ pub type ClientWriteResult = Result, ClientWriteError< )] pub struct ClientWriteResponse { /// The id of the log that is applied. - pub log_id: LogId, + pub log_id: LogId, /// Application specific response data. pub data: C::R, @@ -35,7 +35,7 @@ where C: RaftTypeConfig /// Create a new instance of `ClientWriteResponse`. #[allow(dead_code)] #[since(version = "0.9.5")] - pub(crate) fn new_app_response(log_id: LogId, data: C::R) -> Self { + pub(crate) fn new_app_response(log_id: LogId, data: C::R) -> Self { Self { log_id, data, @@ -44,7 +44,7 @@ where C: RaftTypeConfig } #[since(version = "0.9.5")] - pub fn log_id(&self) -> &LogId { + pub fn log_id(&self) -> &LogId { &self.log_id } diff --git a/openraft/src/raft/message/transfer_leader.rs b/openraft/src/raft/message/transfer_leader.rs index 3ff450523..f5f7ed474 100644 --- a/openraft/src/raft/message/transfer_leader.rs +++ b/openraft/src/raft/message/transfer_leader.rs @@ -18,13 +18,13 @@ where C: RaftTypeConfig pub(crate) to_node_id: C::NodeId, /// The last log id the `to_node_id` node should at least have to become Leader. - pub(crate) last_log_id: Option>, + pub(crate) last_log_id: Option>, } impl TransferLeaderRequest where C: RaftTypeConfig { - pub fn new(from: Vote, to: C::NodeId, last_log_id: Option>) -> Self { + pub fn new(from: Vote, to: C::NodeId, last_log_id: Option>) -> Self { Self { from_leader: from, to_node_id: to, @@ -45,7 +45,7 @@ where C: RaftTypeConfig /// The last log id on the `to_node_id` node should at least have to become Leader. /// /// This is the last log id on the Leader when the leadership is transferred. - pub fn last_log_id(&self) -> Option<&LogId> { + pub fn last_log_id(&self) -> Option<&LogId> { self.last_log_id.as_ref() } } diff --git a/openraft/src/raft/message/vote.rs b/openraft/src/raft/message/vote.rs index 556aba718..7737ebf0c 100644 --- a/openraft/src/raft/message/vote.rs +++ b/openraft/src/raft/message/vote.rs @@ -11,7 +11,7 @@ use crate::Vote; #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct VoteRequest { pub vote: Vote, - pub last_log_id: Option>, + pub last_log_id: Option>, } impl fmt::Display for VoteRequest @@ -25,7 +25,7 @@ where C: RaftTypeConfig impl VoteRequest where C: RaftTypeConfig { - pub fn new(vote: Vote, last_log_id: Option>) -> Self { + pub fn new(vote: Vote, last_log_id: Option>) -> Self { Self { vote, last_log_id } } } @@ -45,13 +45,13 @@ pub struct VoteResponse { pub vote_granted: bool, /// The last log id stored on the remote voter. - pub last_log_id: Option>, + pub last_log_id: Option>, } impl VoteResponse where C: RaftTypeConfig { - pub fn new(vote: impl Borrow>, last_log_id: Option>, granted: bool) -> Self { + pub fn new(vote: impl Borrow>, last_log_id: Option>, granted: bool) -> Self { Self { vote: vote.borrow().clone(), vote_granted: granted, diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 2a16b23a4..d2bad2d7e 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -532,7 +532,7 @@ where C: RaftTypeConfig /// ``` /// Read more about how it works: [Read Operation](crate::docs::protocol::read) #[tracing::instrument(level = "debug", skip(self))] - pub async fn ensure_linearizable(&self) -> Result>, RaftError>> { + pub async fn ensure_linearizable(&self) -> Result>, RaftError>> { let (read_log_id, applied) = self.get_read_log_id().await?; if read_log_id.index() > applied.index() { @@ -581,7 +581,7 @@ where C: RaftTypeConfig #[tracing::instrument(level = "debug", skip(self))] pub async fn get_read_log_id( &self, - ) -> Result<(Option>, Option>), RaftError>> { + ) -> Result<(Option>, Option>), RaftError>> { let (tx, rx) = C::oneshot(); let (read_log_id, applied) = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?; Ok((read_log_id, applied)) @@ -772,8 +772,8 @@ where C: RaftTypeConfig &self, metrics: &RaftMetrics, node_id: &C::NodeId, - membership_log_id: Option<&LogId>, - ) -> Result>, ()> { + membership_log_id: Option<&LogId>, + ) -> Result>, ()> { if metrics.membership_config.log_id().as_ref() < membership_log_id { // Waiting for the latest metrics to report. return Err(()); diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index 00cd26807..9ad261663 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -69,17 +69,17 @@ where C: RaftTypeConfig pub(crate) io_progress: Valid>>, /// The last log id that has been applied to state machine. - pub(crate) applied: Option>, + pub(crate) applied: Option>, /// The last log id in the currently persisted snapshot. - pub(crate) snapshot: Option>, + pub(crate) snapshot: Option>, /// The last log id that has been purged from storage. /// /// `RaftState::last_purged_log_id()` /// is just the log id that is going to be purged, i.e., there is a `PurgeLog` command queued to /// be executed, and it may not be the actually purged log id. - pub(crate) purged: Option>, + pub(crate) purged: Option>, } impl Validate for IOState @@ -104,9 +104,9 @@ where C: RaftTypeConfig { pub(crate) fn new( vote: &Vote, - applied: Option>, - snapshot: Option>, - purged: Option>, + applied: Option>, + snapshot: Option>, + purged: Option>, ) -> Self { let mut io_progress = Valid::new(IOProgress::default()); @@ -123,7 +123,7 @@ where C: RaftTypeConfig } } - pub(crate) fn update_applied(&mut self, log_id: Option>) { + pub(crate) fn update_applied(&mut self, log_id: Option>) { tracing::debug!(applied = display(DisplayOption(&log_id)), "{}", func_name!()); // TODO: should we update flushed if applied is newer? @@ -137,11 +137,11 @@ where C: RaftTypeConfig self.applied = log_id; } - pub(crate) fn applied(&self) -> Option<&LogId> { + pub(crate) fn applied(&self) -> Option<&LogId> { self.applied.as_ref() } - pub(crate) fn update_snapshot(&mut self, log_id: Option>) { + pub(crate) fn update_snapshot(&mut self, log_id: Option>) { tracing::debug!(snapshot = display(DisplayOption(&log_id)), "{}", func_name!()); debug_assert!( @@ -154,7 +154,7 @@ where C: RaftTypeConfig self.snapshot = log_id; } - pub(crate) fn snapshot(&self) -> Option<&LogId> { + pub(crate) fn snapshot(&self) -> Option<&LogId> { self.snapshot.as_ref() } @@ -166,11 +166,11 @@ where C: RaftTypeConfig self.building_snapshot } - pub(crate) fn update_purged(&mut self, log_id: Option>) { + pub(crate) fn update_purged(&mut self, log_id: Option>) { self.purged = log_id; } - pub(crate) fn purged(&self) -> Option<&LogId> { + pub(crate) fn purged(&self) -> Option<&LogId> { self.purged.as_ref() } } diff --git a/openraft/src/raft_state/io_state/io_id.rs b/openraft/src/raft_state/io_state/io_id.rs index 2d3aa3b21..0e59d03d4 100644 --- a/openraft/src/raft_state/io_state/io_id.rs +++ b/openraft/src/raft_state/io_state/io_id.rs @@ -80,7 +80,7 @@ where C: RaftTypeConfig Self::Vote(vote) } - pub(crate) fn new_log_io(committed_vote: CommittedVote, last_log_id: Option>) -> Self { + pub(crate) fn new_log_io(committed_vote: CommittedVote, last_log_id: Option>) -> Self { Self::Log(LogIOId::new(committed_vote, last_log_id)) } @@ -101,7 +101,7 @@ where C: RaftTypeConfig } } - pub(crate) fn last_log_id(&self) -> Option<&LogId> { + pub(crate) fn last_log_id(&self) -> Option<&LogId> { match self { Self::Vote(_) => None, Self::Log(log_io_id) => log_io_id.log_id.as_ref(), diff --git a/openraft/src/raft_state/io_state/log_io_id.rs b/openraft/src/raft_state/io_state/log_io_id.rs index 2fc67249e..45e9f7679 100644 --- a/openraft/src/raft_state/io_state/log_io_id.rs +++ b/openraft/src/raft_state/io_state/log_io_id.rs @@ -30,7 +30,7 @@ where C: RaftTypeConfig pub(crate) committed_vote: CommittedVote, /// The last log id that has been flushed to storage. - pub(crate) log_id: Option>, + pub(crate) log_id: Option>, } impl fmt::Display for LogIOId @@ -44,7 +44,7 @@ where C: RaftTypeConfig impl LogIOId where C: RaftTypeConfig { - pub(crate) fn new(committed_vote: CommittedVote, log_id: Option>) -> Self { + pub(crate) fn new(committed_vote: CommittedVote, log_id: Option>) -> Self { Self { committed_vote, log_id } } } diff --git a/openraft/src/raft_state/log_state_reader.rs b/openraft/src/raft_state/log_state_reader.rs index b6fa931dd..ec9e28558 100644 --- a/openraft/src/raft_state/log_state_reader.rs +++ b/openraft/src/raft_state/log_state_reader.rs @@ -9,7 +9,7 @@ pub(crate) trait LogStateReader where C: RaftTypeConfig { /// Get previous log id, i.e., the log id at index - 1 - fn prev_log_id(&self, index: u64) -> Option> { + fn prev_log_id(&self, index: u64) -> Option> { if index == 0 { None } else { @@ -20,7 +20,7 @@ where C: RaftTypeConfig /// Return if a log id exists. /// /// It assumes a committed log will always get positive return value, according to raft spec. - fn has_log_id(&self, log_id: &LogId) -> bool { + fn has_log_id(&self, log_id: &LogId) -> bool { if log_id.index < self.committed().next_index() { debug_assert!(Some(log_id) <= self.committed()); return true; @@ -39,40 +39,40 @@ where C: RaftTypeConfig /// It will return `last_purged_log_id` if index is at the last purged index. /// If the log at the specified index is smaller than `last_purged_log_id`, or greater than /// `last_log_id`, it returns None. - fn get_log_id(&self, index: u64) -> Option>; + fn get_log_id(&self, index: u64) -> Option>; /// The last known log id in the store. /// /// The range of all stored log ids are `(last_purged_log_id(), last_log_id()]`, left open right /// close. - fn last_log_id(&self) -> Option<&LogId>; + fn last_log_id(&self) -> Option<&LogId>; /// The last known committed log id, i.e., the id of the log that is accepted by a quorum of /// voters. - fn committed(&self) -> Option<&LogId>; + fn committed(&self) -> Option<&LogId>; /// The last known applied log id, i.e., the id of the log that is applied to state machine. /// /// This is actually happened io-state which might fall behind committed log id. - fn io_applied(&self) -> Option<&LogId>; + fn io_applied(&self) -> Option<&LogId>; /// The last log id in the last persisted snapshot. /// /// This is actually happened io-state which might fall behind `Self::snapshot_last_log_id()`. - fn io_snapshot_last_log_id(&self) -> Option<&LogId>; + fn io_snapshot_last_log_id(&self) -> Option<&LogId>; /// The last known purged log id, inclusive. /// /// This is actually purged log id from storage. - fn io_purged(&self) -> Option<&LogId>; + fn io_purged(&self) -> Option<&LogId>; /// Return the last log id the snapshot includes. - fn snapshot_last_log_id(&self) -> Option<&LogId>; + fn snapshot_last_log_id(&self) -> Option<&LogId>; /// Return the log id it wants to purge up to. /// /// Logs may not be able to be purged at once because they are in use by replication tasks. - fn purge_upto(&self) -> Option<&LogId>; + fn purge_upto(&self) -> Option<&LogId>; /// The greatest log id that has been purged after being applied to state machine, i.e., the /// oldest known log id. @@ -81,5 +81,5 @@ where C: RaftTypeConfig /// left open and right close. /// /// `last_purged_log_id == last_log_id` means there is no log entry in the storage. - fn last_purged_log_id(&self) -> Option<&LogId>; + fn last_purged_log_id(&self) -> Option<&LogId>; } diff --git a/openraft/src/raft_state/membership_state/mod.rs b/openraft/src/raft_state/membership_state/mod.rs index ba2eeba9e..e0e0050b7 100644 --- a/openraft/src/raft_state/membership_state/mod.rs +++ b/openraft/src/raft_state/membership_state/mod.rs @@ -80,7 +80,7 @@ where C: RaftTypeConfig } /// Update membership state if the specified committed_log_id is greater than `self.effective` - pub(crate) fn commit(&mut self, committed_log_id: &Option>) { + pub(crate) fn commit(&mut self, committed_log_id: &Option>) { if committed_log_id >= self.effective().log_id() { debug_assert!(committed_log_id.index() >= self.effective().log_id().index()); self.committed = self.effective.clone(); diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 6cbdc1ebf..c7a3d47a0 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -57,7 +57,7 @@ where C: RaftTypeConfig /// of the leader. /// /// - A quorum could be a uniform quorum or joint quorum. - pub committed: Option>, + pub committed: Option>, pub(crate) purged_next: u64, @@ -82,7 +82,7 @@ where C: RaftTypeConfig /// /// If a log is in use by a replication task, the purge is postponed and is stored in this /// field. - pub(crate) purge_upto: Option>, + pub(crate) purge_upto: Option>, } impl Default for RaftState @@ -106,39 +106,39 @@ where C: RaftTypeConfig impl LogStateReader for RaftState where C: RaftTypeConfig { - fn get_log_id(&self, index: u64) -> Option> { + fn get_log_id(&self, index: u64) -> Option> { self.log_ids.get(index) } - fn last_log_id(&self) -> Option<&LogId> { + fn last_log_id(&self) -> Option<&LogId> { self.log_ids.last() } - fn committed(&self) -> Option<&LogId> { + fn committed(&self) -> Option<&LogId> { self.committed.as_ref() } - fn io_applied(&self) -> Option<&LogId> { + fn io_applied(&self) -> Option<&LogId> { self.io_state.applied() } - fn io_snapshot_last_log_id(&self) -> Option<&LogId> { + fn io_snapshot_last_log_id(&self) -> Option<&LogId> { self.io_state.snapshot() } - fn io_purged(&self) -> Option<&LogId> { + fn io_purged(&self) -> Option<&LogId> { self.io_state.purged() } - fn snapshot_last_log_id(&self) -> Option<&LogId> { + fn snapshot_last_log_id(&self) -> Option<&LogId> { self.snapshot_meta.last_log_id.as_ref() } - fn purge_upto(&self) -> Option<&LogId> { + fn purge_upto(&self) -> Option<&LogId> { self.purge_upto.as_ref() } - fn last_purged_log_id(&self) -> Option<&LogId> { + fn last_purged_log_id(&self) -> Option<&LogId> { if self.purged_next == 0 { return None; } @@ -252,11 +252,11 @@ where C: RaftTypeConfig /// Append a list of `log_id`. /// /// The log ids in the input has to be continuous. - pub(crate) fn extend_log_ids_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_log_ids: &[LID]) { + pub(crate) fn extend_log_ids_from_same_leader<'a, LID: RaftLogId + 'a>(&mut self, new_log_ids: &[LID]) { self.log_ids.extend_from_same_leader(new_log_ids) } - pub(crate) fn extend_log_ids<'a, LID: RaftLogId + 'a>(&mut self, new_log_id: &[LID]) { + pub(crate) fn extend_log_ids<'a, LID: RaftLogId + 'a>(&mut self, new_log_id: &[LID]) { self.log_ids.extend(new_log_id) } @@ -297,7 +297,7 @@ where C: RaftTypeConfig /// Find the first entry in the input that does not exist on local raft-log, /// by comparing the log id. pub(crate) fn first_conflicting_index(&self, entries: &[Ent]) -> usize - where Ent: RaftLogId { + where Ent: RaftLogId { let l = entries.len(); for (i, ent) in entries.iter().enumerate() { diff --git a/openraft/src/raft_state/tests/forward_to_leader_test.rs b/openraft/src/raft_state/tests/forward_to_leader_test.rs index 775170bc6..cf5d76b47 100644 --- a/openraft/src/raft_state/tests/forward_to_leader_test.rs +++ b/openraft/src/raft_state/tests/forward_to_leader_test.rs @@ -6,23 +6,15 @@ use maplit::btreeset; use crate::engine::testing::UTConfig; use crate::error::ForwardToLeader; +use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; -use crate::CommittedLeaderId; use crate::EffectiveMembership; -use crate::LogId; use crate::Membership; use crate::MembershipState; use crate::RaftState; use crate::Vote; -fn log_id(term: u64, index: u64) -> LogId { - LogId:: { - leader_id: CommittedLeaderId::new(term, 0), - index, - } -} - fn m12() -> Membership { Membership::new_with_defaults(vec![btreeset! {1,2}], []) } @@ -32,8 +24,8 @@ fn test_forward_to_leader_vote_not_committed() { let rs = RaftState:: { vote: Leased::new(UTConfig::<()>::now(), Duration::from_millis(500), Vote::new(1, 2)), membership_state: MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m12())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m12())), ), ..Default::default() }; @@ -50,8 +42,8 @@ fn test_forward_to_leader_not_a_member() { Vote::new_committed(1, 3), ), membership_state: MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m12())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m12())), ), ..Default::default() }; @@ -70,8 +62,8 @@ fn test_forward_to_leader_has_leader() { Vote::new_committed(1, 3), ), membership_state: MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m123())), - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m123())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m123())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 0, 1)), m123())), ), ..Default::default() }; diff --git a/openraft/src/raft_state/tests/log_state_reader_test.rs b/openraft/src/raft_state/tests/log_state_reader_test.rs index d817a5747..27ed5c9fd 100644 --- a/openraft/src/raft_state/tests/log_state_reader_test.rs +++ b/openraft/src/raft_state/tests/log_state_reader_test.rs @@ -5,8 +5,8 @@ use crate::CommittedLeaderId; use crate::LogId; use crate::RaftState; -fn log_id(term: u64, index: u64) -> LogId { - LogId:: { +fn log_id(term: u64, index: u64) -> LogId { + LogId { leader_id: CommittedLeaderId::new(term, 0), index, } diff --git a/openraft/src/raft_state/tests/validate_test.rs b/openraft/src/raft_state/tests/validate_test.rs index c2e176be2..7587da25a 100644 --- a/openraft/src/raft_state/tests/validate_test.rs +++ b/openraft/src/raft_state/tests/validate_test.rs @@ -7,8 +7,8 @@ use crate::CommittedLeaderId; use crate::LogId; use crate::RaftState; -fn log_id(term: u64, index: u64) -> LogId { - LogId:: { +fn log_id(term: u64, index: u64) -> LogId { + LogId { leader_id: CommittedLeaderId::new(term, 0), index, } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index c01e31929..bded915bb 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -134,10 +134,10 @@ where config: Arc, /// The log id of the highest log entry which is known to be committed in the cluster. - committed: Option>, + committed: Option>, /// Last matching log id on a follower/learner - matching: Option>, + matching: Option>, /// Next replication action to run. next_action: Option>, @@ -161,8 +161,8 @@ where target: C::NodeId, session_id: ReplicationSessionId, config: Arc, - committed: Option>, - matching: Option>, + committed: Option>, + matching: Option>, network: N::Network, snapshot_network: N::Network, log_reader: LS::LogReader, @@ -816,7 +816,7 @@ where } /// If there are more logs to send, it returns a new `Some(Data::Logs)` to send. - fn next_action_to_send(&mut self, matching: Option>, log_ids: LogIdRange) -> Option> { + fn next_action_to_send(&mut self, matching: Option>, log_ids: LogIdRange) -> Option> { if matching < log_ids.last { Some(Data::new_logs(LogIdRange::new(matching, log_ids.last))) } else { @@ -825,7 +825,7 @@ where } /// Check if partial success result(`matching`) is valid for a given log range to send. - fn debug_assert_partial_success(to_send: &LogIdRange, matching: &Option>) { + fn debug_assert_partial_success(to_send: &LogIdRange, matching: &Option>) { debug_assert!( matching <= &to_send.last, "matching ({}) should be <= last_log_id ({})", diff --git a/openraft/src/replication/replication_session_id.rs b/openraft/src/replication/replication_session_id.rs index eca020aaa..355a5030c 100644 --- a/openraft/src/replication/replication_session_id.rs +++ b/openraft/src/replication/replication_session_id.rs @@ -38,7 +38,7 @@ where C: RaftTypeConfig pub(crate) leader_vote: CommittedVote, /// The log id of the membership log this replication works for. - pub(crate) membership_log_id: Option>, + pub(crate) membership_log_id: Option>, } impl Display for ReplicationSessionId @@ -57,7 +57,7 @@ where C: RaftTypeConfig impl ReplicationSessionId where C: RaftTypeConfig { - pub(crate) fn new(vote: CommittedVote, membership_log_id: Option>) -> Self { + pub(crate) fn new(vote: CommittedVote, membership_log_id: Option>) -> Self { Self { leader_vote: vote, membership_log_id, diff --git a/openraft/src/replication/request.rs b/openraft/src/replication/request.rs index a2db2733a..6212381b9 100644 --- a/openraft/src/replication/request.rs +++ b/openraft/src/replication/request.rs @@ -9,7 +9,7 @@ pub(crate) enum Replicate where C: RaftTypeConfig { /// Inform replication stream to forward the committed log id to followers/learners. - Committed(Option>), + Committed(Option>), /// Send a chunk of data, e.g., logs or snapshot. Data(Data), diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 991950005..106457486 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -79,11 +79,11 @@ mod tests { // NOTE that with single-term-leader, log id is `1-3` let result = ReplicationResult::(Ok(Some(log_id(1, 2, 3)))); - let want = format!("(Match:{})", log_id(1, 2, 3)); + let want = format!("(Match:{})", log_id::(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); let result = ReplicationResult::(Err(log_id(1, 2, 3))); - let want = format!("(Conflict:{})", log_id(1, 2, 3)); + let want = format!("(Conflict:{})", log_id::(1, 2, 3)); assert!(result.to_string().ends_with(&want), "{}", result.to_string()); } } diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index 833a1dae1..793bb7b15 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -101,8 +101,8 @@ where C: RaftTypeConfig pub struct LogApplied where C: RaftTypeConfig { - last_log_id: LogId, - tx: OneshotSenderOf, Vec), StorageError>>, + last_log_id: LogId, + tx: OneshotSenderOf, Vec), StorageError>>, } impl LogApplied @@ -110,8 +110,8 @@ where C: RaftTypeConfig { #[allow(dead_code)] pub(crate) fn new( - last_log_id: LogId, - tx: OneshotSenderOf, Vec), StorageError>>, + last_log_id: LogId, + tx: OneshotSenderOf, Vec), StorageError>>, ) -> Self { Self { last_log_id, tx } } diff --git a/openraft/src/storage/log_reader_ext.rs b/openraft/src/storage/log_reader_ext.rs index 55509b71b..f355591fa 100644 --- a/openraft/src/storage/log_reader_ext.rs +++ b/openraft/src/storage/log_reader_ext.rs @@ -20,7 +20,7 @@ where C: RaftTypeConfig } /// Get the log id of the entry at `index`. - async fn get_log_id(&mut self, log_index: u64) -> Result, StorageError> { + async fn get_log_id(&mut self, log_index: u64) -> Result, StorageError> { let entries = self.try_get_log_entries(log_index..=log_index).await?; if entries.is_empty() { diff --git a/openraft/src/storage/log_state.rs b/openraft/src/storage/log_state.rs index fd6317af6..9b456e11c 100644 --- a/openraft/src/storage/log_state.rs +++ b/openraft/src/storage/log_state.rs @@ -7,9 +7,9 @@ use crate::RaftTypeConfig; #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct LogState { /// The greatest log id that has been purged after being applied to state machine. - pub last_purged_log_id: Option>, + pub last_purged_log_id: Option>, /// The log id of the last present entry if there are any entries. /// Otherwise the same value as `last_purged_log_id`. - pub last_log_id: Option>, + pub last_log_id: Option>, } diff --git a/openraft/src/storage/snapshot_meta.rs b/openraft/src/storage/snapshot_meta.rs index 4dfdb8e75..6eedf3c58 100644 --- a/openraft/src/storage/snapshot_meta.rs +++ b/openraft/src/storage/snapshot_meta.rs @@ -18,7 +18,7 @@ pub struct SnapshotMeta where C: RaftTypeConfig { /// Log entries upto which this snapshot includes, inclusive. - pub last_log_id: Option>, + pub last_log_id: Option>, /// The last applied membership config. pub last_membership: StoredMembership, @@ -55,7 +55,7 @@ where C: RaftTypeConfig } /// Returns a ref to the id of the last log that is included in this snapshot. - pub fn last_log_id(&self) -> Option<&LogId> { + pub fn last_log_id(&self) -> Option<&LogId> { self.last_log_id.as_ref() } } diff --git a/openraft/src/storage/snapshot_signature.rs b/openraft/src/storage/snapshot_signature.rs index 270e00418..b618136b6 100644 --- a/openraft/src/storage/snapshot_signature.rs +++ b/openraft/src/storage/snapshot_signature.rs @@ -9,10 +9,10 @@ pub struct SnapshotSignature where C: RaftTypeConfig { /// Log entries upto which this snapshot includes, inclusive. - pub last_log_id: Option>, + pub last_log_id: Option>, /// The last applied membership log id. - pub last_membership_log_id: Option>, + pub last_membership_log_id: Option>, /// To identify a snapshot when transferring. pub snapshot_id: SnapshotId, diff --git a/openraft/src/storage/v2/raft_log_reader.rs b/openraft/src/storage/v2/raft_log_reader.rs index ea87993b9..59a60318c 100644 --- a/openraft/src/storage/v2/raft_log_reader.rs +++ b/openraft/src/storage/v2/raft_log_reader.rs @@ -104,10 +104,7 @@ where C: RaftTypeConfig /// /// [`RaftLogStorage`]: crate::storage::RaftLogStorage #[since(version = "0.10.0")] - async fn get_key_log_ids( - &mut self, - range: RangeInclusive>, - ) -> Result>, StorageError> { + async fn get_key_log_ids(&mut self, range: RangeInclusive>) -> Result>, StorageError> { LogIdList::get_key_log_ids(range, self).await } } diff --git a/openraft/src/storage/v2/raft_log_storage.rs b/openraft/src/storage/v2/raft_log_storage.rs index 26af8dd75..8cc77bbe8 100644 --- a/openraft/src/storage/v2/raft_log_storage.rs +++ b/openraft/src/storage/v2/raft_log_storage.rs @@ -67,13 +67,13 @@ where C: RaftTypeConfig /// See: [`docs::data::log_pointers`]. /// /// [`docs::data::log_pointers`]: `crate::docs::data::log_pointers#optionally-persisted-committed` - async fn save_committed(&mut self, _committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, _committed: Option>) -> Result<(), StorageError> { // By default `committed` log id is not saved Ok(()) } /// Return the last saved committed log id by [`Self::save_committed`]. - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result>, StorageError> { // By default `committed` log id is not saved and this method just return None. Ok(None) } @@ -106,12 +106,12 @@ where C: RaftTypeConfig /// ### To ensure correctness: /// /// - It must not leave a **hole** in logs. - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError>; + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError>; /// Purge logs upto `log_id`, inclusive /// /// ### To ensure correctness: /// /// - It must not leave a **hole** in logs. - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError>; + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError>; } diff --git a/openraft/src/storage/v2/raft_state_machine.rs b/openraft/src/storage/v2/raft_state_machine.rs index a2a675418..9b28f6d46 100644 --- a/openraft/src/storage/v2/raft_state_machine.rs +++ b/openraft/src/storage/v2/raft_state_machine.rs @@ -35,7 +35,7 @@ where C: RaftTypeConfig /// last-applied-log-id. /// Because upon startup, the last membership will be loaded by scanning logs from the /// `last-applied-log-id`. - async fn applied_state(&mut self) -> Result<(Option>, StoredMembership), StorageError>; + async fn applied_state(&mut self) -> Result<(Option>, StoredMembership), StorageError>; /// Apply the given payload of entries to the state machine. /// diff --git a/openraft/src/storage_error.rs b/openraft/src/storage_error.rs index 4814dc173..04e1436c3 100644 --- a/openraft/src/storage_error.rs +++ b/openraft/src/storage_error.rs @@ -48,13 +48,13 @@ where C: RaftTypeConfig Logs, /// Error about a single log entry - Log(LogId), + Log(LogId), /// Error about a single log entry without knowing the log term. LogIndex(u64), /// Error happened when applying a log entry - Apply(LogId), + Apply(LogId), /// Error happened when operating state machine. StateMachine, @@ -137,7 +137,7 @@ where C: RaftTypeConfig } } - pub fn write_log_entry(log_id: LogId, source: impl Into) -> Self { + pub fn write_log_entry(log_id: LogId, source: impl Into) -> Self { Self::new(ErrorSubject::Log(log_id), ErrorVerb::Write, source) } @@ -145,7 +145,7 @@ where C: RaftTypeConfig Self::new(ErrorSubject::LogIndex(log_index), ErrorVerb::Read, source) } - pub fn read_log_entry(log_id: LogId, source: impl Into) -> Self { + pub fn read_log_entry(log_id: LogId, source: impl Into) -> Self { Self::new(ErrorSubject::Log(log_id), ErrorVerb::Read, source) } @@ -165,7 +165,7 @@ where C: RaftTypeConfig Self::new(ErrorSubject::Vote, ErrorVerb::Read, source) } - pub fn apply(log_id: LogId, source: impl Into) -> Self { + pub fn apply(log_id: LogId, source: impl Into) -> Self { Self::new(ErrorSubject::Apply(log_id), ErrorVerb::Write, source) } diff --git a/openraft/src/summary.rs b/openraft/src/summary.rs index c4139b770..28dd9923d 100644 --- a/openraft/src/summary.rs +++ b/openraft/src/summary.rs @@ -83,9 +83,10 @@ mod tests { #[cfg(not(feature = "single-term-leader"))] #[test] fn test_display() { + use crate::engine::testing::UTConfig; use crate::MessageSummary; - let lid = crate::testing::log_id(1, 2, 3); + let lid = crate::testing::log_id::(1, 2, 3); assert_eq!("T1-N2.3", lid.to_string()); assert_eq!("T1-N2.3", lid.summary()); assert_eq!("Some(T1-N2.3)", Some(&lid).summary()); diff --git a/openraft/src/testing/common.rs b/openraft/src/testing/common.rs index 8047b11a5..427c707b1 100644 --- a/openraft/src/testing/common.rs +++ b/openraft/src/testing/common.rs @@ -8,8 +8,9 @@ use crate::LogId; use crate::RaftTypeConfig; /// Builds a log id, for testing purposes. -pub fn log_id(term: u64, node_id: NID, index: u64) -> LogId { - LogId:: { +pub fn log_id(term: u64, node_id: C::NodeId, index: u64) -> LogId +where C: RaftTypeConfig { + LogId:: { leader_id: CommittedLeaderId::new(term, node_id), index, } diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index 882f6c37e..33a908a7d 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -27,7 +27,6 @@ use crate::type_config::TypeConfigExt; use crate::vote::CommittedLeaderId; use crate::LogId; use crate::Membership; -use crate::NodeId; use crate::OptionalSend; use crate::RaftLogReader; use crate::RaftSnapshotBuilder; @@ -53,7 +52,7 @@ trait ReaderExt: RaftLogStorage where C: RaftTypeConfig { /// Proxy method to invoke [`RaftLogReaderExt::get_log_id`]. - async fn get_log_id(&mut self, log_index: u64) -> Result, StorageError> { + async fn get_log_id(&mut self, log_index: u64) -> Result, StorageError> { self.get_log_reader().await.get_log_id(log_index).await } @@ -633,7 +632,7 @@ where } pub async fn get_initial_state_log_ids(mut store: LS, mut sm: SM) -> Result<(), StorageError> { - let log_id = |t, n: u64, i| LogId:: { + let log_id = |t, n: u64, i| LogId:: { leader_id: CommittedLeaderId::new(t, n.into()), index: i, }; @@ -641,7 +640,7 @@ where tracing::info!("--- empty store, expect []"); { let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?; - assert_eq!(Vec::>::new(), initial.log_ids.key_log_ids()); + assert_eq!(Vec::>::new(), initial.log_ids.key_log_ids()); } tracing::info!("--- log terms: [0], last_purged_log_id is None, expect [(0,0)]"); @@ -1376,8 +1375,11 @@ where } /// Create a log id with node id 0 for testing. -fn log_id_0(term: u64, index: u64) -> LogId -where NID: NodeId + From { +fn log_id_0(term: u64, index: u64) -> LogId +where + C: RaftTypeConfig, + C::NodeId: From, +{ LogId { leader_id: CommittedLeaderId::new(term, NODE_ID.into()), index, diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 94298d9ea..69dea33ed 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -151,7 +151,7 @@ pub mod alias { pub type MutexOf = as AsyncRuntime>::Mutex; // Usually used types - pub type LogIdOf = crate::LogId>; + pub type LogIdOf = crate::LogId; pub type VoteOf = crate::Vote; pub type LeaderIdOf = crate::LeaderId>; pub type CommittedLeaderIdOf = crate::CommittedLeaderId>; diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index ccba9ce7f..9ba350af3 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -95,7 +95,7 @@ pub struct MemStoreSnapshot { /// The state machine of the `MemStore`. #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct MemStoreStateMachine { - pub last_applied_log: Option>, + pub last_applied_log: Option>, pub last_membership: StoredMembership, @@ -139,14 +139,14 @@ impl BlockConfig { /// An in-memory log storage implementing the `RaftLogStorage` trait. pub struct MemLogStore { - last_purged_log_id: RwLock>>, + last_purged_log_id: RwLock>>, /// Saving committed log id is optional in Openraft. /// /// This flag switches on the saving for testing purposes. pub enable_saving_committed: AtomicBool, - committed: RwLock>>, + committed: RwLock>>, /// The Raft log. Logs are stored in serialized json. log: RwLock>, @@ -357,7 +357,7 @@ impl RaftLogStorage for Arc { Ok(()) } - async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { + async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { let enabled = self.enable_saving_committed.load(Ordering::Relaxed); tracing::debug!(?committed, "save_committed, enabled: {}", enabled); if !enabled { @@ -368,7 +368,7 @@ impl RaftLogStorage for Arc { Ok(()) } - async fn read_committed(&mut self) -> Result>, StorageError> { + async fn read_committed(&mut self) -> Result>, StorageError> { let enabled = self.enable_saving_committed.load(Ordering::Relaxed); tracing::debug!("read_committed, enabled: {}", enabled); if !enabled { @@ -393,7 +393,7 @@ impl RaftLogStorage for Arc { } #[tracing::instrument(level = "debug", skip(self))] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); { @@ -409,7 +409,7 @@ impl RaftLogStorage for Arc { } #[tracing::instrument(level = "debug", skip_all)] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("purge_log_upto: {:?}", log_id); if let Some(d) = self.block.get_blocking(&BlockOperation::PurgeLog) { @@ -441,7 +441,7 @@ impl RaftStateMachine for Arc { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { let sm = self.sm.read().await; Ok((sm.last_applied_log, sm.last_membership.clone())) } diff --git a/stores/rocksstore/src/lib.rs b/stores/rocksstore/src/lib.rs index 976739e52..d466d6824 100644 --- a/stores/rocksstore/src/lib.rs +++ b/stores/rocksstore/src/lib.rs @@ -92,7 +92,7 @@ pub struct RocksSnapshot { #[derive(Default)] #[derive(Serialize, Deserialize)] pub struct StateMachine { - pub last_applied_log: Option>, + pub last_applied_log: Option>, pub last_membership: StoredMembership, @@ -173,7 +173,7 @@ mod meta { impl StoreMeta for LastPurged { const KEY: &'static str = "last_purged_log_id"; - type Value = LogId; + type Value = LogId; fn subject(_v: Option<&Self::Value>) -> ErrorSubject { ErrorSubject::Store @@ -368,7 +368,7 @@ impl RaftLogStorage for RocksLogStore { Ok(()) } - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("truncate: [{:?}, +oo)", log_id); let from = id_to_bin(log_id.index); @@ -379,7 +379,7 @@ impl RaftLogStorage for RocksLogStore { Ok(()) } - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [0, {:?}]", log_id); // Write the last-purged log id before purging the logs. @@ -401,7 +401,7 @@ impl RaftStateMachine for RocksStateMachine { async fn applied_state( &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { + ) -> Result<(Option>, StoredMembership), StorageError> { Ok((self.sm.last_applied_log, self.sm.last_membership.clone())) } diff --git a/tests/tests/client_api/t16_with_state_machine.rs b/tests/tests/client_api/t16_with_state_machine.rs index d87dac895..0d3a44ce8 100644 --- a/tests/tests/client_api/t16_with_state_machine.rs +++ b/tests/tests/client_api/t16_with_state_machine.rs @@ -16,7 +16,6 @@ use openraft::RaftTypeConfig; use openraft::StorageError; use openraft::StoredMembership; use openraft_memstore::ClientResponse; -use openraft_memstore::MemNodeId; use openraft_memstore::TypeConfig; use crate::fixtures::ut_harness; @@ -99,7 +98,7 @@ async fn with_state_machine_wrong_sm_type() -> Result<()> { impl RaftStateMachine for FooSM { type SnapshotBuilder = Self; - async fn applied_state(&mut self) -> Result<(Option>, StoredMembership), Err> { + async fn applied_state(&mut self) -> Result<(Option>, StoredMembership), Err> { todo!() } diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 7e96055f3..f7610b89b 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -725,7 +725,7 @@ impl TypedRaftRouter { pub async fn wait_for_snapshot( &self, node_ids: &BTreeSet, - want: LogId, + want: LogId, timeout: Option, msg: &str, ) -> anyhow::Result<()> { @@ -863,7 +863,7 @@ impl TypedRaftRouter { expect_term: u64, expect_last_log: u64, expect_voted_for: Option, - expect_sm_last_applied_log: LogId, + expect_sm_last_applied_log: LogId, expect_snapshot: &Option<(ValueTest, u64)>, ) -> anyhow::Result<()> { let last_log_id = storage.get_log_state().await?.last_log_id; @@ -955,7 +955,7 @@ impl TypedRaftRouter { expect_term: u64, expect_last_log: u64, expect_voted_for: Option, - expect_sm_last_applied_log: LogId, + expect_sm_last_applied_log: LogId, expect_snapshot: Option<(ValueTest, u64)>, ) -> anyhow::Result<()> { let node_ids = { diff --git a/tests/tests/membership/t11_add_learner.rs b/tests/tests/membership/t11_add_learner.rs index dc793d219..2db937379 100644 --- a/tests/tests/membership/t11_add_learner.rs +++ b/tests/tests/membership/t11_add_learner.rs @@ -14,6 +14,7 @@ use openraft::LogId; use openraft::Membership; use openraft::RaftLogReader; use openraft::StorageHelper; +use openraft_memstore::TypeConfig; use tokio::time::sleep; use crate::fixtures::ut_harness; @@ -345,8 +346,8 @@ fn timeout() -> Option { Some(Duration::from_millis(3_000)) } -pub fn log_id(term: u64, node_id: u64, index: u64) -> LogId { - LogId:: { +pub fn log_id(term: u64, node_id: u64, index: u64) -> LogId { + LogId { leader_id: CommittedLeaderId::new(term, node_id), index, }