Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Clarification of storage methods and persistence of log #702

Open
schreter opened this issue Mar 8, 2023 · 10 comments
Open

RFC: Clarification of storage methods and persistence of log #702

schreter opened this issue Mar 8, 2023 · 10 comments
Labels
C-docs Category: documentation

Comments

@schreter
Copy link
Collaborator

schreter commented Mar 8, 2023

Hi @drmingdrmer,

I'm now finally implementing binding of openraft to our persistency. The documentation of Storage trait is a bit suboptimal, therefore I'd like to clarify it (and it should be then updated).

Our persistency is composed of log, which is asynchronously persisted (but a synchronous flush can be done) and a state machine, which is updated in-memory and occasionally snapshotted (depending on various rules), so we can shorten the log. Note that the snapshot is an incremental one (shadow paging), so it's fairly cheap, it's not a file.

Voting

I think save_vote()/read_vote() are pretty much clear - simply save the vote with immediate persistency to the disk and read it from the persistency upon restart as-is. I.e., the persistency of the vote is independent of the state machine or log. Right?

Log

get_log_state() documents that it should return the last log ID, independent of the state machine. I.e., the log is parsed until the end and the last log ID is returned. Fine.

delete_conflict_logs_since()/purge_logs_upto() are pretty much straightforward. Fine.

append_to_log() is a bit unclear. I take that entries will always be presented in order means that the log index is monotonically increasing and will not be sent into the log twice. Right?
There might be potentially holes, but IIRC there are none.

What is unclear to me is when the call needs to return. Is it strictly necessary to persist these log entries durably until the call returns? That would cause blocking of other operations which can progress in the meantime (handling responses, applying to state machine, ...). We need to clarify this (and potentially improve it).

State machine

apply_to_state_machine() only tells the state machine that a log entry is agreed upon, so the state machine can produce a client reply. This is clear, except for when a membership change is to be applied. I assume, the membership needs to be then applied synchronously to a separate persistent location.

last_applied_state() returns both the log ID of the last snapshot of the state machine and of the membership. Here again, I assume that the persistency of the membership (when applied to the state machine) is to be done separately from snapshot. I.e., if I have the sequence of operations:

  • LogId(1): Membership change 1
  • LogId(2): (some other operation)
  • Snapshot (completed with last contained LogId(2))
  • LogId(3): (some other operations)
  • LogId(4): Membership change 2 (already committed)
  • LogId(5): (some other operations)

then I suppose the restart from this would return LogId(2) for the snapshot and LogId(4) + membership stored there for membership. Right?

If the membership change is NOT committed yet (i.e., it was not applied to the state machine), just written to the log, then it should return the membership from LogId(1), right?

I.e., the membership returned is the last one which was actually applied via apply_to_state_machine(). Right?

Snapshots

So far it seems like snapshot handling is clear enough, though very heavily geared towards having the snapshot in a file. We'll have hard time sending it via an arbitrary seekable interface. I think the interface should be made more strict (though easily mappable to the current trait requirements), but I have no exact proposal yet.

More on log persistence

I was wondering, whether it is possible to at least optimize log persistence to overlap it with other operations/network transfer.

On the leader:

  • Log append would start the I/O, but not wait for the completion. Instead, log writer would track the completed log index.
  • Apply to the state machine would require I/O to be complete by the time apply is called. I.e., only committed and persisted log would be actually applied to the state machine.
  • This can be hidden in the state machine implementation, i.e., it doesn't require any changes in openraft per se.

On the follower:

  • Log append would wait for the I/O completion, before reporting back to the leader.
  • This is still suboptimal, since we could overlap the log I/O with applying data to the state machine. But the load on the follower should be lower, since no read operations are executed there.
  • Again, no changes required in openraft.

In the worst case we can lose part of the committed log on the crashed leader, so when it restarts, it will have shorter log than expected. However, this will not cause any correctness issues, since any committed entry will be already persisted on one of the followers and one of the followers with the "longest" log will be elected as the new leader, later re-replicating the missing log to the former leader (in the worst case it will need to send the snapshot, if the log is gone, but that's anyway the case for lagging replicas).
Or am I missing something?

A "cleaner" alternative would be to overlap log writing with other operations on the state machine. This could be done:

  • By splitting Storage interface into StateMachine and LogWriter (since only one mutable reference is possible, thus we need independent objects) and feeding the LogWriter in a separate task.
  • By making the append_to_log() only initiate the I/O operation and report back the persisted log index by a callback via the central command queue.

The former would keep the async interface, but prevent pipelining of I/O requests. The latter allows easy pipelining, but the implementation is a bit more complex (OTOH, one can build it as a second trait method start_append_to_log(), which calls the original append_to_log() and immediately schedules the follow-up operation on the passed back-channel object).

I personally prefer the latter, since it is probably cleaner, doesn't require extra task and extra queues, and it will not require any changes to existing implementations on top of openraft, unless one wants to optimize the I/O.

BTW, something similar could be done for applying to the state machine (i.e., also just start applying to the state machine, with back-channel for completions), but that would likely make state machine updates more complex. Considering that the state machine update is basically an in-memory operation typically not needing yielding the future (at least in our case), I wouldn't complicate it.

Thoughts?

Thanks & regards,

Ivan

@schreter schreter added the C-docs Category: documentation label Mar 8, 2023
@github-actions
Copy link

github-actions bot commented Mar 8, 2023

👋 Thanks for opening this issue!

Get help or engage by:

  • /help : to print help messages.
  • /assignme : to assign this issue to you.

@drmingdrmer
Copy link
Member

drmingdrmer commented Mar 9, 2023

Voting

I think save_vote()/read_vote() are pretty much clear - simply save the vote with immediate persistency to the disk and read it from the persistency upon restart as-is. I.e., the persistency of the vote is independent of the state machine or log. Right?

Yes :D

Log

append_to_log() is a bit unclear. I take that entries will always be presented in order means that the log index is monotonically increasing and will not be sent into the log twice. Right? There might be potentially holes, but IIRC there are none.

There must not be a hole. If there is, it is a severe bug.

What is unclear to me is when the call needs to return. Is it strictly necessary to persist these log entries durably until the call returns? That would cause blocking of other operations which can progress in the meantime (handling responses, applying to state machine, ...). We need to clarify this (and potentially improve it).

Yes when append_to_log() returns, the input entries must be persisted on disk.

  • TODO: clarify this in doc.

Yes it blocks the RaftCore. In the latest version 0.8.3, before it returns, nothing else can be done.
I've been thinking about and preparing an optimization on it: currently, the Engine outputs a series of IO/network commands(including append_to_logs()) to let RaftCore run it. These commands can be run in another tokio::task, or tasks so that RaftCore is able to handle other requests.

State machine

apply_to_state_machine() only tells the state machine that a log entry is agreed upon, so the state machine can produce a client reply. This is clear, except for when a membership change is to be applied. I assume, the membership needs to be then applied synchronously to a separate persistent location.

In short, membership should be part of the state machine and part of the snapshot, just like a normal log entry.

  • Doc: how should state-machine deal with membership-entry.

last_applied_state() returns both the log ID of the last snapshot of the state machine and of the membership. Here again, I assume that the persistency of the membership (when applied to the state machine) is to be done separately from snapshot. I.e., if I have the sequence of operations:

  • LogId(1): Membership change 1
  • LogId(2): (some other operation)
  • Snapshot (completed with last contained LogId(2))
  • LogId(3): (some other operations)
  • LogId(4): Membership change 2 (already committed)
  • LogId(5): (some other operations)

then I suppose the restart from this would return LogId(2) for the snapshot and LogId(4) + membership stored there for membership. Right?

No.
Storage APIs must consistently provide reliable results.
This means that a snapshot view should always be retrieved by applying logs from index-0 up to a specific position. For instance, if log-4 is visible, all previous logs should also be visible.

In the case where the state-machine is not persistent but the snapshot is persistent, last_applied_state() should return last_log_id: log-2 and last-membership: (log-1, MembershipConfig).

On the other hand, if the application has a persistent state machine where the result of applying a log entry is stored on disk, last_applied_state() should return last_log_id: log-5 and last_membership: (log-4, MembershipConfig).

  • Doc: clarify last_appied_state() behavior, for persistent state machine and non-persistent state machine.

If the membership change is NOT committed yet (i.e., it was not applied to the state machine), just written to the log, then it should return the membership from LogId(1), right?

Right.

I.e., the membership returned is the last one which was actually applied via apply_to_state_machine(). Right?

The returned last_membership must be consistent in view of the persistent data. In our case, the persistent data is snapshot, thus it should be log-1. If state machine persists data on disk before apply_to_state_machine() returns, the last-membership it returns should be log-4

Snapshots

So far it seems like snapshot handling is clear enough, though very heavily geared towards having the snapshot in a file. We'll have hard time sending it via an arbitrary seekable interface. I think the interface should be made more strict (though easily mappable to the current trait requirements), but I have no exact proposal yet.

Ok. Let me re-define the install-snapshot API with a stream:

And I'm gonna answer your other question in the next comment:)

@schreter
Copy link
Collaborator Author

schreter commented Mar 9, 2023

Thanks for the answers.

Maybe a bit of background: Our state machine is a "traditional" database engine with a snapshot/checkpoint/savepoint/whatever you want to name it and a write-ahead log. That means, we can replay the log up to the last log entry persisted to recover all the data in the state machine.

However, we can't really do it like this at recovery, since that would also recover and apply the log which is not yet committed. Therefore, we "trick" a bit here. During the recovery, we determine the log end (i.e., the last log ID which was safely written to the log, but not necessarily committed yet by the quorum) and the last membership information which was actually committed (typically part of the state machine checkpoint, but it can be also somewhere later in the log). I.e., we can recover the state machine to any state between the membership log index (which is definitely committed and typically equal to the checkpoint log index) and the log end (which is potentially not committed).

During the election, if we get elected as a leader, we can ship previously not committed log to other replicas to bring them up-to-date and roll-forward the state machine to the common committed index across quorum of replicas (which is also determined during election and can't be lower than our checkpoint log index). Further log entries are applied as the committed index is moved forward by regular replication. If our replica is a follower, then the leader determines the common portion of the log, which must lie definitely behind the max(LogId(snapshot), LogId(last membership change)), which was committed before. If it lies behind our log, then the new leader will replicate log entries to us. If there was a divergence, the new leader will send a request to purge the log somewhere behind the last-committed index from our PoV and then replicate the log.

Note that we do not want to persist the committed log ID, since that can be re-determined during election (i.e., it is actually volatile, see Figure 2 in the Raft paper https://raft.github.io/raft.pdf).

Some additional questions:

Regarding Log:

There must not be a hole. If there is, it is a severe bug.

OK, perfect.

BTW, I thought you removed the Blank log, so why there is EntryPayload::Blank still? Just not refactored yet?

Regarding Storage:

If we store the committed membership separately of the database snapshot/checkpoint (this implies that the log is persisted at least until the committed membership position and until the snapshot), then we can roll forward until (committed) max(LogId(snapshot), LogId(last membership change)) independent of the outcome of election. This can be made before election is completed.

But I still have a question regarding last_applied_state(). The committed index is not to be persisted in "standard" Raft, since there is no point to persist it. It is just an indication until where we can roll-forward the state machine. If I understand it correctly, the last_applied_state() can return any committed log ID, not necessarily the newest one. Then, why return a committed log ID at all? The committed field of RaftState can be also initialized to zero, in principle. Right?

The RequestVote request offers the last persisted log index, i.e., the end of the local log, so we don't lose any log anyway.

Thanks.

@drmingdrmer
Copy link
Member

More on log persistence

I was wondering, whether it is possible to at least optimize log persistence to overlap it with other operations/network transfer.

Yes it is possible. I think the optimal way is to split append_to_logs() into two phase: append-logs and flush them.
Replicating logs depends on append-logs, but does not depends on flushing-logs.
Committing logs has to wait for replicating logs(to a quorum) to complete.

On the leader:

  • Log append would start the I/O, but not wait for the completion. Instead, log writer would track the completed log index.
  • Apply to the state machine would require I/O to be complete by the time apply is called. I.e., only committed and persisted log would be actually applied to the state machine.

In fact, the leader does not have to flush a log to its local store to commit it. A log that is replicated and flushed to a quorum(including the leader or not) can be committed by the leader. The quorum does not have to include the leader.

On the follower:

  • Log append would wait for the I/O completion, before reporting back to the leader.

Yes this is mandatory.

  • This is still suboptimal, since we could overlap the log I/O with applying data to the state machine. But the load on the follower should be lower, since no read operations are executed there.
  • Again, no changes required in openraft.

Hmm... All right.

In the worst case we can lose part of the committed log on the crashed leader, so when it restarts, it will have shorter log than expected. However, this will not cause any correctness issues, since any committed entry will be already persisted on one of the followers and one of the followers with the "longest" log will be elected as the new leader, later re-replicating the missing log to the former leader (in the worst case it will need to send the snapshot, if the log is gone, but that's anyway the case for lagging replicas). Or am I missing something?

Correct. A log that is replicated to a quorum can not be lost under any circumstance.

A "cleaner" alternative would be to overlap log writing with other operations on the state machine. This could be done:

  • By splitting Storage interface into StateMachine and LogWriter (since only one mutable reference is possible, thus we need independent objects) and feeding the LogWriter in a separate task.
  • By making the append_to_log() only initiate the I/O operation and report back the persisted log index by a callback via the central command queue.

The former would keep the async interface, but prevent pipelining of I/O requests. The latter allows easy pipelining, but the implementation is a bit more complex (OTOH, one can build it as a second trait method start_append_to_log(), which calls the original append_to_log() and immediately schedules the follow-up operation on the passed back-channel object).

I personally prefer the latter, since it is probably cleaner, doesn't require extra task and extra queues, and it will not require any changes to existing implementations on top of openraft, unless one wants to optimize the I/O.

I like the second approach too.

BTW, something similar could be done for applying to the state machine (i.e., also just start applying to the state machine, with back-channel for completions), but that would likely make state machine updates more complex. Considering that the state machine update is basically an in-memory operation typically not needing yielding the future (at least in our case), I wouldn't complicate it.

You are right the state machine can just be pure in-memory.

Thank you for you thoughtful advice:D

@drmingdrmer
Copy link
Member

Thanks for the answers.

Maybe a bit of background: Our state machine is a "traditional" database engine with a snapshot/checkpoint/savepoint/whatever you want to name it and a write-ahead log. That means, we can replay the log up to the last log entry persisted to recover all the data in the state machine.

However, we can't really do it like this at recovery, since that would also recover and apply the log which is not yet committed. Therefore, we "trick" a bit here. During the recovery, we determine the log end (i.e., the last log ID which was safely written to the log, but not necessarily committed yet by the quorum) and the last membership information which was actually committed (typically part of the state machine checkpoint, but it can be also somewhere later in the log). I.e., we can recover the state machine to any state between the membership log index (which is definitely committed and typically equal to the checkpoint log index) and the log end (which is potentially not committed).

During the election, if we get elected as a leader, we can ship previously not committed log to other replicas to bring them up-to-date and roll-forward the state machine to the common committed index across quorum of replicas (which is also determined during election and can't be lower than our checkpoint log index). Further log entries are applied as the committed index is moved forward by regular replication. If our replica is a follower, then the leader determines the common portion of the log, which must lie definitely behind the max(LogId(snapshot), LogId(last membership change)), which was committed before. If it lies behind our log, then the new leader will replicate log entries to us. If there was a divergence, the new leader will send a request to purge the log somewhere behind the last-committed index from our PoV and then replicate the log.

Interesting! My question is: If a server applies non-committed logs to the state machine during recovery, the state machine has to be able to revert to a previous state, once the non-committed but applied logs are truncated by the leader. Does your state machine support reverting to a previous state?

And I do not know how your application determines a common committed index.
As far as I know, only the leader itself knows what logs are committed. If this leader crashes, a new leader can not tell whether a log is committed without re-replicating it.

For example, If node-1 has log (1,1),(3,1) node-2 has log (1,1), node-3 has log (2,1);
When node-1 becomes the leader, without querying node-3, it can not tell whether log (1,1) on node-2 is committed or not:

  • log (1,1) on node-2 can be overridden by log (2,1) if node-3 establish leadership with quorum (N2,N3);
  • log (1,1) on node-2 can be committed if node-1 establish leadership with quorum (N1,N2);
(i,j) is log at term-i and index-j

N1   1,1  3,1  L(term=4)
N2   1,1
N3   2,1
------------------------------------->

BTW, I thought you removed the Blank log, so why there is EntryPayload::Blank still? Just not refactored yet?

I did not remove the blank log. I just stopped using a blank log as a heartbeat message. :(

Regarding Storage:

If we store the committed membership separately of the database snapshot/checkpoint (this implies that the log is persisted at least until the committed membership position and until the snapshot), then we can roll forward until (committed) max(LogId(snapshot), LogId(last membership change)) independent of the outcome of election. This can be made before election is completed.

I agree.

But I still have a question regarding last_applied_state(). The committed index is not to be persisted in "standard" Raft, since there is no point to persist it. It is just an indication until where we can roll-forward the state machine. If I understand it correctly, the last_applied_state() can return any committed log ID, not necessarily the newest one. Then, why return a committed log ID at all? The committed field of RaftState can be also initialized to zero, in principle. Right?

You are right. last_applied_state() is part of the state machine. It has nothing to do with Raft.
Because openraft does not actually rely on a snapshot to provide correctness(in other words, openraft allows the application not to persist a snapshot), but instead, the state machine must be persisted, and there has to be a method last_applied_state() to tell openraft what logs are there in the state machine.

@schreter
Copy link
Collaborator Author

schreter commented Mar 9, 2023

Thanks for the answers :-).

By making the append_to_log() only initiate the I/O operation and report back the persisted log index by a callback via the central command queue.

I like [this] approach too.

Perfect. We just have to think about how to bring it in w/o causing unnecessary compatibility issues. Likely by creating a second trait method, something on the lines of:

    async fn append_to_log_interleaved(&mut self, entries: &[&Entry<C>], callback: &LogIoComplete) -> Result<(), StorageError<C::NodeId>> {
        let res = self.append_to_log(entries);
        callback.log_io_completed(entries.last().map(|e| e.log_id), Ok(()));
        res
    }

where the callback can be cheaply cloned and/or it could be provided once by some other mechanism to prevent unnecessary clones. In the method it would just send an info back to RaftCore.

In fact, the leader does not have to flush a log to its local store to commit it. A log that is replicated and flushed to a quorum(including the leader or not) can be committed by the leader. The quorum does not have to include the leader.

That's correct, but to track that, openraft would need an extension where the logger would tell it when the local log is flushed, so the commit index can be advanced appropriately.

My idea was somewhat simpler. Let openraft believe the log was actually persisted on the leader and advance the commit index. Instead, ensure that not-yet-persisted log entries won't get applied to the state machine (persistently). That should be equivalent for all practical purposes. Since the RTT + log flush on the follower likely takes longer than just the log flush on the leader, we can assume that the flush will be finished by the time the quorum is built and apply_to_state_machine() is called (so it won't block on the I/O).

(In fact, we don't block on I/O in our implementation, except for a very few places. We use data flow graphs with flush and barrier disk I/O and/or network I/O dependencies instead to ensure correctness, so we can start an operation at any time and the end result would be as if we would wait for respective I/Os.)

Hm, on the other hand, this still may be insufficient - if the commit index is advanced too far and communicated to followers before log is actually persisted, we'll have an issue in certain cases. So probably there is no way around to make it "right" by actually communicating back from the log writer to the Raft core when the persistence has been reached (as described above).

Interesting! My question is: If a server applies non-committed logs to the state machine during recovery, the state machine has to be able to revert to a previous state, once the non-committed but applied logs are truncated by the leader. Does your state machine support reverting to a previous state?

No. The state machine can roll-forward at most to the index which is definitely known to be committed at the time of restart. This is either the snapshot index (i.e., no roll-forward) or the index of the explicitly-stored membership change (or other known commit index persistently stored elsewhere, should we have it). This would be also returned as of now from last_applied_state(). We roll-forward until this index before starting Raft.

The definitive answer up to where we can roll forward is only known after the election and re-replication of logs, as necessary. In fact, we simply rely on openraft to roll-forward the state machine from the last-known commit index in the state machine via apply_to_state_machine() (which either takes the log from the local log or from the replicated log from the new leader).

Potentially, the roll-forward before election can be simply skipped by returning always just the snapshot index and the membership stored in the snapshot. I'm just a bit worried what will happen if the membership changes completely in the meantime. Say,

  • we start with membership (1, 2, 3), leader being 1, at LogId(1)
  • state machine is snapshotted at LogId(2), so we have a starting point for recovery
  • now membership changes to (1, 4, 5) at LogId(10)
  • no new snapshot is written
  • leader node 1 crashes and the node 4 is elected as leader of (1, 4, 5)
  • node 1 restarts and tries to start election by sending messages to 2 and 3 (which don't exist anymore)

How does node 1 rejoin the consensus and learn about the new topology?

If this works as expected, then we can completely drop roll-forward in case of membership change after snapshot simply to rely on openraft here (and not handle the membership change as anything special).

And I do not know how your application determines a common committed index.
As far as I know, only the leader itself knows what logs are committed. If this leader crashes, a new leader can not tell whether a log is committed without re-replicating it.

Correct. This still holds for our implementation - every node knows its log entries and during election the match index is computed, which causes re-replicating some log, potentially, using standard Raft/implementation in openraft.

BTW, I thought you removed the Blank log, so why there is EntryPayload::Blank still? Just not refactored yet?

I did not remove the blank log. I just stopped using a blank log as a heartbeat message. :(

Are you planning to remove it?

@schreter
Copy link
Collaborator Author

schreter commented Mar 9, 2023

Ah, I was digging in openraft code a bit, so I can answer my question above:

How does node 1 rejoin the consensus and learn about the new topology?

If this works as expected, then we can completely drop roll-forward in case of membership change after snapshot simply to rely on openraft here (and not handle the membership change as anything special).

There is StorageHelper::last_membership_in_log(), which actually scans the log starting from the snapshot/last applied state in the state machine towards the end of the log.

Since this is quite inefficient (the log will be scanned twice on restart - once for membership, which is unlikely to be found, as it is exceptional operation, and once to roll-forward the state machine), I'd suggest to provide additional default-implemented method for this on Storage instead, which can be overridden to return the membership persisted somewhere else.

BTW, the function last_membership_in_log() returns potentially uncommitted (but logged) membership. Thus, the optimized version should do the same, right?

@drmingdrmer
Copy link
Member

Ah, I was digging in openraft code a bit, so I can answer my question above:

How does node 1 rejoin the consensus and learn about the new topology?
If this works as expected, then we can completely drop roll-forward in case of membership change after snapshot simply to rely on openraft here (and not handle the membership change as anything special).

There is StorageHelper::last_membership_in_log(), which actually scans the log starting from the snapshot/last applied state in the state machine towards the end of the log.

Right. In your case, it is also possible node-1 never sees the newer membership at log-10. In such a case, it has to wait for a new leader, which could be node-4 or node-5, to contact it.

Since this is quite inefficient (the log will be scanned twice on restart - once for membership, which is unlikely to be found, as it is exceptional operation, and once to roll-forward the state machine), I'd suggest to provide additional default-implemented method for this on Storage instead, which can be overridden to return the membership persisted somewhere else.

Yes. I trade efficiency for simplicity.

  • get_last_two_memberships() maybe?

BTW, the function last_membership_in_log() returns potentially uncommitted (but logged) membership. Thus, the optimized version should do the same, right?

Yes. in openraft a membership config takes effect immediately when it is seen, regardless of whether it is committed or not. This is unlike the raft implementation in etcd, which applies committed membership config.

Both of these two solutions have its own issues to address: Openraft must handle membership config fallback on a follower, while etcd raft must handle non-committed membership config during election.

@drmingdrmer
Copy link
Member

Perfect. We just have to think about how to bring it in w/o causing unnecessary compatibility issues. Likely by creating a second trait method, something on the lines of:

    async fn append_to_log_interleaved(&mut self, entries: &[&Entry<C>], callback: &LogIoComplete) -> Result<(), StorageError<C::NodeId>> {
        let res = self.append_to_log(entries);
        callback.log_io_completed(entries.last().map(|e| e.log_id), Ok(()));
        res
    }

where the callback can be cheaply cloned and/or it could be provided once by some other mechanism to prevent unnecessary clones. In the method it would just send an info back to RaftCore.

  • TODO: provides asynchronous append-to-log API: send callback when flushed.

In fact, the leader does not have to flush a log to its local store to commit it. A log that is replicated and flushed to a quorum(including the leader or not) can be committed by the leader. The quorum does not have to include the leader.

That's correct, but to track that, openraft would need an extension where the logger would tell it when the local log is flushed, so the commit index can be advanced appropriately.

Something like fn flush_log(upto_index: u64, callback: LogIoComplete)?

My idea was somewhat simpler. Let openraft believe the log was actually persisted on the leader and advance the commit index. Instead, ensure that not-yet-persisted log entries won't get applied to the state machine (persistently). That should be equivalent for all practical purposes. Since the RTT + log flush on the follower likely takes longer than just the log flush on the leader, we can assume that the flush will be finished by the time the quorum is built and apply_to_state_machine() is called (so it won't block on the I/O).

(In fact, we don't block on I/O in our implementation, except for a very few places. We use data flow graphs with flush and barrier disk I/O and/or network I/O dependencies instead to ensure correctness, so we can start an operation at any time and the end result would be as if we would wait for respective I/Os.)

Perfect!

Interesting! My question is: If a server applies non-committed logs to the state machine during recovery, the state machine has to be able to revert to a previous state, once the non-committed but applied logs are truncated by the leader. Does your state machine support reverting to a previous state?

No. The state machine can roll-forward at most to the index which is definitely known to be committed at the time of restart. This is either the snapshot index (i.e., no roll-forward) or the index of the explicitly-stored membership change (or other known commit index persistently stored elsewhere, should we have it). This would be also returned as of now from last_applied_state(). We roll-forward until this index before starting Raft.

Ok I see!

Potentially, the roll-forward before election can be simply skipped by returning always just the snapshot index and the membership stored in the snapshot. I'm just a bit worried what will happen if the membership changes completely in the meantime. Say,

  • we start with membership (1, 2, 3), leader being 1, at LogId(1)
  • state machine is snapshotted at LogId(2), so we have a starting point for recovery
  • now membership changes to (1, 4, 5) at LogId(10)
  • no new snapshot is written
  • leader node 1 crashes and the node 4 is elected as leader of (1, 4, 5)
  • node 1 restarts and tries to start election by sending messages to 2 and 3 (which don't exist anymore)

How does node 1 rejoin the consensus and learn about the new topology?

It has to wait for node-4, the leader, to contact it. Node-4 will then replicate logs to node-1, the last membership config log(10) then will be seen by node-1, then node-1 update its effective membership config. See:
https://github.com/datafuselabs/openraft/blob/0f80be947af28cf7343607e6dadc9c43e47fad09/openraft/src/engine/handler/following_handler.rs#L111-L130

If this works as expected, then we can completely drop roll-forward in case of membership change after snapshot simply to rely on openraft here (and not handle the membership change as anything special).

Yes.
I do not think your application has to manually roll-forward state machine, in any case.

And I do not know how your application determines a common committed index.
As far as I know, only the leader itself knows what logs are committed. If this leader crashes, a new leader can not tell whether a log is committed without re-replicating it.

Correct. This still holds for our implementation - every node knows its log entries and during election the match index is computed, which causes re-replicating some log, potentially, using standard Raft/implementation in openraft.

Hmm... Ok then.

I did not remove the blank log. I just stopped using a blank log as a heartbeat message. :(

Are you planning to remove it?

No. A leader must append a blank log as the last step in an election, to assert that its leadership is valid.
Without blank log, an application log entry with AppData or a membership config log must be replicated instead.
I do not quite get why you want to get rid of it. 🤔

drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Mar 10, 2023
drmingdrmer added a commit to drmingdrmer/openraft that referenced this issue Mar 10, 2023
@schreter
Copy link
Collaborator Author

Yes. I trade efficiency for simplicity.

That's understandable :-)

get_last_two_memberships() maybe?

What if there is only one committed/applied membership (the regular case)? Then the two are equal. Maybe get_last_applied_and_effective_memberhip() would catch it better. Then, the effective membership is either a different one or the last applied one.

Something like fn flush_log(upto_index: u64, callback: LogIoComplete)?

Well, something like it, yes. So you'd call it immediately after append_to_log() and the default implementation would simply call the callback? And in our case, we'd just put the log into the buffer in append_to_log() and then schedule the I/O from the log buffer and call the callback after the I/O completed.

I'd pass the callback by reference, though, to prevent unnecessary refcounting on the channel. (Also, I'd document and guarantee that the callback passed is always the same one, so one could store an instance in the Storage impl and reuse it without further refcounting as an optimization.)

The callback would be then something like this:

trait LogIoComplete: Clone + Send + Sync {
    fn log_io_completed(&self, index: u64, result: Result<(), std::io::Error>);
}

with an appropriate error type (likely, std::io::Error or even an empty error struct is sufficient here - Raft has to shut down if log can't be written).

I do not think your application has to manually roll-forward state machine, in any case.

OK, I'll drop that, then. Makes things simpler :-) (and with the above optimized membership query it's anyway equivalently fast).

No. A leader must append a blank log as the last step in an election, to assert that its leadership is valid. Without blank log, an application log entry with AppData or a membership config log must be replicated instead.

Thanks for the explanation.

I do not quite get why you want to get rid of it. 🤔

Not necessarily.

I just have this tick of optimizing stuff, since our application (database) is extremely performance-sensitive. Having an enum with two instead of three different possible values makes Rust optimize out the discriminator, if possible, making it 8B smaller. At 1M requests/second, that's 8MB/second not written :-). That was also the reason for #705 - it saves even more for the regular case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-docs Category: documentation
Projects
None yet
Development

No branches or pull requests

2 participants