Skip to content

Commit

Permalink
Rework stitch to an explicit state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
sourcefrog committed Dec 3, 2023
1 parent 5c4eb6d commit a133073
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 64 deletions.
8 changes: 6 additions & 2 deletions src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,12 @@ impl BackupWriter {
if gc_lock::GarbageCollectionLock::is_locked(archive)? {
return Err(Error::GarbageCollectionLockHeld);
}
let basis_index = IterStitchedIndexHunks::new(archive, archive.last_band_id()?)
.iter_entries(Apath::root(), Exclude::nothing());
let basis_index = if let Some(basis_band_id) = archive.last_band_id()? {
IterStitchedIndexHunks::new(archive, basis_band_id)
} else {
IterStitchedIndexHunks::empty(archive)
}
.iter_entries(Apath::root(), Exclude::nothing());

// Create the new band only after finding the basis band!
let band = Band::create(archive)?;
Expand Down
145 changes: 87 additions & 58 deletions src/stitch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,36 @@
//! seen.
//! * Bands might be deleted, so their numbers are not contiguous.
use tracing::warn;
use tracing::{error, trace};

use crate::index::IndexEntryIter;
use crate::index::{IndexEntryIter, IndexHunkIter};
use crate::*;

pub struct IterStitchedIndexHunks {
/// Current band_id: initially the requested band_id.
/// This moves back to earlier bands when we reach the end of an incomplete band.
/// Might be none, if no more bands are available.
band_id: Option<BandId>,

/// The latest (and highest-ordered) apath we have already yielded.
last_apath: Option<Apath>,

/// Currently pending index hunks.
index_hunks: Option<crate::index::IndexHunkIter>,

archive: Archive,

state: State,
}

/// What state is a stitch iter in, and what should happen next?
enum State {
/// We've read to the end of a finished band, or to the earliest existing band, and there is no more content.
Done,

/// We have know the band to read and have not yet read it at all.
BeforeBand(BandId),

/// We have some index hunks from a band and can return them gradually.
InBand {
band_id: BandId,
index_hunks: IndexHunkIter,
},

/// We finished reading a band
AfterBand(BandId),
}

impl IterStitchedIndexHunks {
Expand All @@ -58,12 +70,19 @@ impl IterStitchedIndexHunks {
/// the same point in the previous band, continuing backwards recursively
/// until either there are no more previous indexes, or a complete index
/// is found.
pub(crate) fn new(archive: &Archive, band_id: Option<BandId>) -> IterStitchedIndexHunks {
pub(crate) fn new(archive: &Archive, band_id: BandId) -> IterStitchedIndexHunks {
IterStitchedIndexHunks {
archive: archive.clone(),
band_id,
last_apath: None,
index_hunks: None,
state: State::BeforeBand(band_id),
}
}

pub(crate) fn empty(archive: &Archive) -> IterStitchedIndexHunks {
IterStitchedIndexHunks {
archive: archive.clone(),
last_apath: None,
state: State::Done,
}
}

Expand All @@ -81,51 +100,60 @@ impl Iterator for IterStitchedIndexHunks {

fn next(&mut self) -> Option<Self::Item> {
loop {
// Until we find the next hunk or run out of bands.
// If we're already reading an index, and it has more content, return that.
if let Some(index_hunks) = &mut self.index_hunks {
// An index iterator must be assigned to a band.
debug_assert!(self.band_id.is_some());

for hunk in index_hunks {
if let Some(last_entry) = hunk.last() {
self.last_apath = Some(last_entry.apath().clone());
self.state = match &mut self.state {
State::Done => return None,
State::InBand {
band_id,
index_hunks,
} => {
if let Some(hunk) = index_hunks.next() {
if let Some(last_apath) = hunk.last().map(|entry| entry.apath.clone()) {
trace!(%last_apath, "return hunk");
self.last_apath = Some(last_apath);
} else {
trace!("return empty hunk");
}
return Some(hunk);
} // otherwise, empty, try the next
} else {
State::AfterBand(*band_id)
}
}
// There are no more index hunks in the current band.
self.index_hunks = None;

let band_id = self.band_id.take().expect("last band id should be present");
if self.archive.band_is_closed(band_id).unwrap_or(false) {
// We reached the end of a complete index in this band,
// so there's no need to look at any earlier bands, and we're done iterating.
return None;
State::BeforeBand(band_id) => {
// Start reading this new index and skip forward until after last_apath
match Band::open(&self.archive, *band_id) {
Ok(band) => {
let mut index_hunks = band.index().iter_hunks();
if let Some(last) = &self.last_apath {
index_hunks = index_hunks.advance_to_after(last)
}
State::InBand {
band_id: *band_id,
index_hunks,
}
}
Err(err) => {
error!(?band_id, ?err, "Failed to open next band");
State::AfterBand(*band_id)
}
}
}

// self.band_id might be None afterwards, if there is no previous band.
// If so, we're done.
self.band_id = previous_existing_band(&self.archive, band_id);
}

if let Some(band_id) = self.band_id {
let band = match Band::open(&self.archive, band_id) {
Ok(band) => band,
Err(err) => {
warn!(?err, ?band_id, "Failed to open band, skipping it");
self.band_id = previous_existing_band(&self.archive, band_id);
continue;
State::AfterBand(band_id) => {
if self.archive.band_is_closed(*band_id).unwrap_or(false) {
trace!(?band_id, "band is closed; stitched iteration complete");
State::Done
} else if let Some(prev_band_id) =
previous_existing_band(&self.archive, *band_id)
{
trace!(?band_id, ?prev_band_id, "moving back to previous band");
State::BeforeBand(prev_band_id)
} else {
trace!(
?band_id,
"no previous band to stitch; stitched iteration is complete"
);
State::Done
}
};
// Start reading this new index and skip forward until after last_apath
let mut iter_hunks = band.index().iter_hunks();
if let Some(last) = &self.last_apath {
iter_hunks = iter_hunks.advance_to_after(last)
}
self.index_hunks = Some(iter_hunks);
} else {
// We got no more bands with possible new index information.
return None;
}
}
}
Expand All @@ -136,9 +164,10 @@ fn previous_existing_band(archive: &Archive, mut band_id: BandId) -> Option<Band
// TODO: It might be faster to list the present bands and calculate
// from that, rather than walking backwards one at a time...
if let Some(prev_band_id) = band_id.previous() {
band_id = prev_band_id;
if archive.band_exists(band_id).unwrap_or(false) {
return Some(band_id);
if archive.band_exists(prev_band_id).unwrap_or(false) {
return Some(prev_band_id);
} else {
band_id = prev_band_id;
}
} else {
return None;
Expand Down Expand Up @@ -167,7 +196,7 @@ mod test {
}

fn simple_ls(archive: &Archive, band_id: BandId) -> String {
let strs: Vec<String> = IterStitchedIndexHunks::new(archive, Some(band_id))
let strs: Vec<String> = IterStitchedIndexHunks::new(archive, band_id)
.flatten()
.map(|entry| format!("{}:{}", &entry.apath, entry.target.unwrap()))
.collect();
Expand Down Expand Up @@ -301,7 +330,7 @@ mod test {

let band_id = band_ids.first().expect("expected at least one band");

let mut iter = IterStitchedIndexHunks::new(&af, Some(*band_id));
let mut iter = IterStitchedIndexHunks::new(&af, *band_id);
// Get the first and only index entry.
// `index_hunks` and `band_id` should be `Some`.
assert!(iter.next().is_some());
Expand Down
6 changes: 2 additions & 4 deletions src/stored_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ impl ReadTree for StoredTree {
/// Return an iter of index entries in this stored tree.
// TODO: Should return an iter of Result<Entry> so that we can inspect them...
fn iter_entries(&self, subtree: Apath, exclude: Exclude) -> Result<Self::IT> {
Ok(
IterStitchedIndexHunks::new(&self.archive, Some(self.band.id()))
.iter_entries(subtree, exclude),
)
Ok(IterStitchedIndexHunks::new(&self.archive, self.band.id())
.iter_entries(subtree, exclude))
}
}

Expand Down

0 comments on commit a133073

Please sign in to comment.