Skip to content

Commit

Permalink
Change: change LogId<NID:NodeId> to LogId<C:RaftTypeConfig>
Browse files Browse the repository at this point in the history
This refactoring moves LogId from a per-NodeId type to a per-TypeConfig type,
to make it consistent with `RaftTypeConfig` usage across the codebase.

- Part of: databendlabs#1278

Upgrade tip:

LogId is now parameterized by `RaftTypeConfig` instead of `NodeId`

- Change `LogId<NodeId>` to `LogId<C> where C: RaftTypeConfig`, for
  example, change `LogId<u64>` to `LogId<YourTypeConfig>`.

- Change trait `RaftLogId<NID: NodeId>` to `RaftLogId<C: RaftTypeConfig>`.
  • Loading branch information
drmingdrmer committed Dec 25, 2024
1 parent 5714674 commit 4d36290
Show file tree
Hide file tree
Showing 75 changed files with 346 additions and 338 deletions.
10 changes: 5 additions & 5 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ pub struct StoredSnapshot {

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachine {
pub last_applied_log: Option<LogId<NodeId>>,
pub last_applied_log: Option<LogId<TypeConfig>>,
pub last_membership: StoredMembership<TypeConfig>,
}

pub struct LogStore {
vote: RwLock<Option<Vote<TypeConfig>>>,
log: RwLock<BTreeMap<u64, Entry<TypeConfig>>>,
last_purged_log_id: RwLock<Option<LogId<NodeId>>>,
last_purged_log_id: RwLock<Option<LogId<TypeConfig>>>,
}

impl LogStore {
Expand Down Expand Up @@ -203,15 +203,15 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn truncate(&mut self, log_id: LogId<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
let mut log = self.log.write().await;
log.split_off(&log_id.index);

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn purge(&mut self, log_id: LogId<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
{
let mut p = self.last_purged_log_id.write().await;
*p = Some(log_id);
Expand Down Expand Up @@ -244,7 +244,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}
Expand Down
20 changes: 10 additions & 10 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ pub struct LogStore<C: RaftTypeConfig> {
#[derive(Debug)]
pub struct LogStoreInner<C: RaftTypeConfig> {
/// The last purged log id.
last_purged_log_id: Option<LogId<C::NodeId>>,
last_purged_log_id: Option<LogId<C>>,

/// The Raft log.
log: BTreeMap<u64, C::Entry>,

/// The commit log id.
committed: Option<LogId<C::NodeId>>,
committed: Option<LogId<C>>,

/// The current granted vote.
vote: Option<Vote<C>>,
Expand Down Expand Up @@ -75,12 +75,12 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
})
}

async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C>> {
async fn save_committed(&mut self, committed: Option<LogId<C>>) -> Result<(), StorageError<C>> {
self.committed = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C>> {
async fn read_committed(&mut self) -> Result<Option<LogId<C>>, StorageError<C>> {
Ok(self.committed.clone())
}

Expand All @@ -104,7 +104,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(())
}

async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
async fn truncate(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
let keys = self.log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
self.log.remove(&key);
Expand All @@ -113,7 +113,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(())
}

async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
async fn purge(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
{
let ld = &mut self.last_purged_log_id;
assert!(ld.as_ref() <= Some(&log_id));
Expand Down Expand Up @@ -173,12 +173,12 @@ mod impl_log_store {
inner.get_log_state().await
}

async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C>> {
async fn save_committed(&mut self, committed: Option<LogId<C>>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.save_committed(committed).await
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C>> {
async fn read_committed(&mut self) -> Result<Option<LogId<C>>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.read_committed().await
}
Expand All @@ -194,12 +194,12 @@ mod impl_log_store {
inner.append(entries, callback).await
}

async fn truncate(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
async fn truncate(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.truncate(log_id).await
}

async fn purge(&mut self, log_id: LogId<C::NodeId>) -> Result<(), StorageError<C>> {
async fn purge(&mut self, log_id: LogId<C>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.purge(log_id).await
}
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl From<protobuf::Vote> for typ::Vote {
}
}

impl From<protobuf::LogId> for LogId<NodeId> {
impl From<protobuf::LogId> for LogId<TypeConfig> {
fn from(proto_log_id: protobuf::LogId) -> Self {
let leader_id: LeaderId<NodeId> = proto_log_id.leader_id.unwrap().into();
LogId::new(leader_id, proto_log_id.index)
Expand Down Expand Up @@ -116,8 +116,8 @@ impl From<typ::Vote> for protobuf::Vote {
}
}
}
impl From<LogId<NodeId>> for protobuf::LogId {
fn from(log_id: LogId<NodeId>) -> Self {
impl From<LogId<TypeConfig>> for protobuf::LogId {
fn from(log_id: LogId<TypeConfig>) -> Self {
protobuf::LogId {
index: log_id.index,
leader_id: Some(log_id.leader_id.into()),
Expand Down
5 changes: 2 additions & 3 deletions examples/raft-kv-memstore-grpc/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use serde::Serialize;

use crate::protobuf::Response;
use crate::typ;
use crate::NodeId;
use crate::TypeConfig;

pub type LogStore = memstore::LogStore<TypeConfig>;
Expand All @@ -39,7 +38,7 @@ pub struct StoredSnapshot {
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachineData {
pub last_applied: Option<LogId<NodeId>>,
pub last_applied: Option<LogId<TypeConfig>>,

pub last_membership: StoredMembership<TypeConfig>,

Expand Down Expand Up @@ -126,7 +125,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let state_machine = self.state_machine.lock().unwrap();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}
Expand Down
5 changes: 2 additions & 3 deletions examples/raft-kv-memstore-network-v2/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use serde::Deserialize;
use serde::Serialize;

use crate::typ;
use crate::NodeId;
use crate::TypeConfig;

pub type LogStore = memstore::LogStore<TypeConfig>;
Expand Down Expand Up @@ -56,7 +55,7 @@ pub struct StoredSnapshot {
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachineData {
pub last_applied: Option<LogId<NodeId>>,
pub last_applied: Option<LogId<TypeConfig>>,

pub last_membership: StoredMembership<TypeConfig>,

Expand Down Expand Up @@ -133,7 +132,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let state_machine = self.state_machine.lock().unwrap();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}
Expand Down
5 changes: 2 additions & 3 deletions examples/raft-kv-memstore-opendal-snapshot-data/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use serde::Serialize;
use crate::decode_buffer;
use crate::encode;
use crate::typ;
use crate::NodeId;
use crate::TypeConfig;

pub type LogStore = memstore::LogStore<TypeConfig>;
Expand Down Expand Up @@ -59,7 +58,7 @@ pub struct StoredSnapshot {
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachineData {
pub last_applied: Option<LogId<NodeId>>,
pub last_applied: Option<LogId<TypeConfig>>,

pub last_membership: StoredMembership<TypeConfig>,

Expand Down Expand Up @@ -154,7 +153,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let state_machine = self.state_machine.lock().unwrap();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}
Expand Down
17 changes: 8 additions & 9 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use openraft::Vote;
use serde::Deserialize;
use serde::Serialize;

use crate::NodeId;
use crate::TypeConfig;

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -85,7 +84,7 @@ pub struct StoredSnapshot {
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachineData {
pub last_applied: Option<LogId<NodeId>>,
pub last_applied: Option<LogId<TypeConfig>>,

pub last_membership: StoredMembership<TypeConfig>,

Expand All @@ -108,12 +107,12 @@ pub struct StateMachineStore {

#[derive(Debug, Default)]
pub struct LogStore {
last_purged_log_id: RefCell<Option<LogId<NodeId>>>,
last_purged_log_id: RefCell<Option<LogId<TypeConfig>>>,

/// The Raft log.
log: RefCell<BTreeMap<u64, Entry<TypeConfig>>>,

committed: RefCell<Option<LogId<NodeId>>>,
committed: RefCell<Option<LogId<TypeConfig>>>,

/// The current granted vote.
vote: RefCell<Option<Vote<TypeConfig>>>,
Expand Down Expand Up @@ -190,7 +189,7 @@ impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let state_machine = self.state_machine.borrow();
Ok((state_machine.last_applied, state_machine.last_membership.clone()))
}
Expand Down Expand Up @@ -300,13 +299,13 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
})
}

async fn save_committed(&mut self, committed: Option<LogId<NodeId>>) -> Result<(), StorageError<TypeConfig>> {
async fn save_committed(&mut self, committed: Option<LogId<TypeConfig>>) -> Result<(), StorageError<TypeConfig>> {
let mut c = self.committed.borrow_mut();
*c = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<NodeId>>, StorageError<TypeConfig>> {
async fn read_committed(&mut self) -> Result<Option<LogId<TypeConfig>>, StorageError<TypeConfig>> {
let committed = self.committed.borrow();
Ok(*committed)
}
Expand All @@ -332,7 +331,7 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn truncate(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn truncate(&mut self, log_id: LogId<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

let mut log = self.log.borrow_mut();
Expand All @@ -345,7 +344,7 @@ impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn purge(&mut self, log_id: LogId<NodeId>) -> Result<(), StorageError<TypeConfig>> {
async fn purge(&mut self, log_id: LogId<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
tracing::debug!("delete_log: (-oo, {:?}]", log_id);

{
Expand Down
5 changes: 2 additions & 3 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use serde::Deserialize;
use serde::Serialize;
use tokio::sync::RwLock;

use crate::NodeId;
use crate::TypeConfig;

pub type LogStore = memstore::LogStore<TypeConfig>;
Expand Down Expand Up @@ -63,7 +62,7 @@ pub struct StoredSnapshot {
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachineData {
pub last_applied_log: Option<LogId<NodeId>>,
pub last_applied_log: Option<LogId<TypeConfig>>,

pub last_membership: StoredMembership<TypeConfig>,

Expand Down Expand Up @@ -136,7 +135,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

async fn applied_state(
&mut self,
) -> Result<(Option<LogId<NodeId>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
) -> Result<(Option<LogId<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
let state_machine = self.state_machine.read().await;
Ok((state_machine.last_applied_log, state_machine.last_membership.clone()))
}
Expand Down
Loading

0 comments on commit 4d36290

Please sign in to comment.