Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress, counters, and non-fatal errors through a new Monitor interface #227

Merged
merged 36 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8411e44
Sketch of a new Monitor interface
sourcefrog Sep 25, 2023
a130e33
Developing the Monitor idea
sourcefrog Sep 25, 2023
aac4f6a
Sketch TermUiMonitor; Debug for Counters
sourcefrog Sep 25, 2023
47a7f25
Use EnumCount
sourcefrog Sep 25, 2023
709830f
WIP: New Monitor approach
sourcefrog Sep 26, 2023
c243912
First working progress bar through monitor/task
sourcefrog Sep 29, 2023
f8a6f9e
Bit better validate progress
sourcefrog Sep 29, 2023
13b0fa5
Progress while reading index and listing blocks
sourcefrog Sep 29, 2023
b7634c9
Hook trace into new monitor
sourcefrog Sep 29, 2023
34c8c99
Don't rate limit updates in Nutmeg
sourcefrog Sep 29, 2023
2098730
Convert backup to monitor
sourcefrog Sep 29, 2023
449300f
Convert blockdir validate to monitor
sourcefrog Sep 29, 2023
2d40230
Convert Tree::size to monitor
sourcefrog Sep 29, 2023
93ca03a
Comment
sourcefrog Sep 30, 2023
9001149
Style guide about monitors and options
sourcefrog Sep 30, 2023
393a4e6
Convert restore to Monitor
sourcefrog Sep 30, 2023
11e5fd4
Convert more to Monitor
sourcefrog Sep 30, 2023
592fef5
More progress during gc
sourcefrog Sep 30, 2023
fb6d812
Better progress listing referenced blocks
sourcefrog Sep 30, 2023
807ce53
Parallel list block names
sourcefrog Sep 30, 2023
661d156
Rip out old progress
sourcefrog Sep 30, 2023
10753bb
Move counters to conserve::counters
sourcefrog Sep 30, 2023
b8a5692
Docs for counters, and remove unused values
sourcefrog Sep 30, 2023
4e29643
More counters during backup
sourcefrog Sep 30, 2023
8542254
Format thousands in progress bar
sourcefrog Sep 30, 2023
aeb15a1
Unify block-listind code
sourcefrog Sep 30, 2023
ddd0b95
Hook --no-progress to monitor
sourcefrog Sep 30, 2023
a4fd65e
Tests for counters, and mutants test it
sourcefrog Sep 30, 2023
9ea7f3f
Counters for block wriites
sourcefrog Sep 30, 2023
09385f1
Monitor block reads
sourcefrog Sep 30, 2023
89a8f62
Counters for block existence cache
sourcefrog Sep 30, 2023
c6914a6
Counters for block read
sourcefrog Sep 30, 2023
099cc37
Counters for index writes
sourcefrog Sep 30, 2023
736a241
Put back --metrics-json, writing counters
sourcefrog Sep 30, 2023
4b391ef
rm IndexWriterStats
sourcefrog Sep 30, 2023
187e4c2
todo
sourcefrog Oct 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cargo/mutants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ examine_globs = [
"src/blockdir.rs",
"src/blockhash.rs",
"src/bin/conserve.rs",
"src/counters.rs",
"src/jsonio.rs",
"src/restore.rs",
"src/transport.rs",
Expand Down
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ semver = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
snap = "1.0.0"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
thiserror = "1.0.19"
thousands = "0.2.0"
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Performance: Also keep a cache of the existence of blocks that have not yet been read.

- Changed: The format and keys written by `--metrics-json` has changed.

## 23.9.0

- S3 support! Enable it with `cargo install --features s3`, then e.g. `cargo backup s3://mybucket.example/`.
Expand Down
9 changes: 9 additions & 0 deletions doc/style.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ To make a new one, `::create()` which returns a `Writer`.
Versions that take a `Path` rather than a `Transport` should be called
`open_path` and `create_path`.

### Arguments

If the function takes a `Monitor` argument it should be the last.

If it takes some kind of `Options` that should be last before the monitor.

In general arguments that are conceptually inputs should be towards the left,
and those that are conceptually outputs should be towards the right.

### Variables

Local variables (not in a closure) that hold a "major" object should have a
Expand Down
83 changes: 35 additions & 48 deletions src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;

use itertools::Itertools;
Expand All @@ -27,7 +26,7 @@ use tracing::{debug, error, warn};

use crate::blockhash::BlockHash;
use crate::jsonio::{read_json, write_json};
use crate::progress::{Bar, Progress};
use crate::monitor::Monitor;
use crate::transport::local::LocalTransport;
use crate::transport::Transport;
use crate::*;
Expand Down Expand Up @@ -194,40 +193,34 @@ impl Archive {
/// Returns all blocks referenced by all bands.
///
/// Shows a progress bar as they're collected.
pub fn referenced_blocks(&self, band_ids: &[BandId]) -> Result<HashSet<BlockHash>> {
pub fn referenced_blocks(
&self,
band_ids: &[BandId],
monitor: Arc<dyn Monitor>,
) -> Result<HashSet<BlockHash>> {
let archive = self.clone();
// TODO: Percentage completion based on how many bands have been checked so far.
let bar = Bar::new();
let references_found = AtomicUsize::new(0);
let bands_started = AtomicUsize::new(0);
let total_bands = band_ids.len();
let start = Instant::now();
let task = monitor.start_task("Find referenced blocks".to_string());
Ok(band_ids
.par_iter()
.inspect(|_| {
bands_started.fetch_add(1, Ordering::Relaxed);
})
.map(move |band_id| Band::open(&archive, *band_id).expect("Failed to open band"))
.flat_map_iter(|band| band.index().iter_entries())
.flat_map_iter(|entry| entry.addrs)
.map(|addr| addr.hash)
.inspect(|_hash| {
bar.post(Progress::ReferencedBlocks {
references_found: references_found.fetch_add(1, Ordering::Relaxed),
bands_started: bands_started.load(Ordering::Relaxed),
total_bands,
start,
})
.inspect(|_| {
task.increment(1);
})
.collect())
}

/// Returns an iterator of blocks that are present and referenced by no index.
pub fn unreferenced_blocks(&self) -> Result<impl Iterator<Item = BlockHash>> {
let referenced = self.referenced_blocks(&self.list_band_ids()?)?;
pub fn unreferenced_blocks(
&self,
monitor: Arc<dyn Monitor>,
) -> Result<impl ParallelIterator<Item = BlockHash>> {
let referenced = self.referenced_blocks(&self.list_band_ids()?, monitor.clone())?;
Ok(self
.block_dir()
.iter_block_names()?
.blocks(monitor)?
.filter(move |h| !referenced.contains(h)))
}

Expand All @@ -239,6 +232,7 @@ impl Archive {
&self,
delete_band_ids: &[BandId],
options: &DeleteOptions,
monitor: Arc<dyn Monitor>,
) -> Result<DeleteStats> {
let mut stats = DeleteStats::default();
let start = Instant::now();
Expand All @@ -257,11 +251,11 @@ impl Archive {
keep_band_ids.retain(|b| !delete_band_ids.contains(b));

debug!("List referenced blocks...");
let referenced = self.referenced_blocks(&keep_band_ids)?;
let referenced = self.referenced_blocks(&keep_band_ids, monitor.clone())?;
debug!(referenced.len = referenced.len());

debug!("Find present blocks...");
let present = self.block_dir.block_names_set()?;
let present: HashSet<BlockHash> = self.block_dir.blocks(monitor.clone())?.collect();
debug!(present.len = present.len());

debug!("Find unreferenced blocks...");
Expand All @@ -271,44 +265,35 @@ impl Archive {
stats.unreferenced_block_count = unref_count;

debug!("Measure unreferenced blocks...");
let measure_bar = Bar::new();
let task = monitor.start_task("Measure unreferenced blocks".to_string());
task.set_total(unref_count);
let total_bytes = unref
.par_iter()
.enumerate()
.inspect(|(i, _)| {
measure_bar.post(Progress::MeasureUnreferenced {
blocks_done: *i,
blocks_total: unref_count,
})
.inspect(|_| {
task.increment(1);
})
.map(|(_i, block_id)| block_dir.compressed_size(block_id).unwrap_or_default())
.sum();
drop(measure_bar);
drop(task);
stats.unreferenced_block_bytes = total_bytes;

if !options.dry_run {
delete_guard.check()?;
let bar = Bar::new();
let task = monitor.start_task("Delete bands".to_string());

for (bands_done, band_id) in delete_band_ids.iter().enumerate() {
for band_id in delete_band_ids.iter() {
Band::delete(self, *band_id)?;
stats.deleted_band_count += 1;
bar.post(Progress::DeleteBands {
bands_done,
total_bands: delete_band_ids.len(),
});
task.increment(1);
}

let blocks_done: AtomicUsize = AtomicUsize::new(0);
let start = Instant::now();
let task = monitor.start_task("Delete blocks".to_string());
task.set_total(unref_count);
let error_count = unref
.par_iter()
.filter(|block_hash| {
bar.post(Progress::DeleteBlocks {
blocks_done: blocks_done.fetch_add(1, Ordering::Relaxed),
start,
total_blocks: unref_count,
});
task.increment(1);
block_dir.delete_block(block_hash).is_err()
})
.count();
Expand All @@ -325,7 +310,7 @@ impl Archive {
/// If problems are found, they are emitted as `warn` or `error` level
/// tracing messages. This function only returns an error if validation
/// stops due to a fatal error.
pub fn validate(&self, options: &ValidateOptions) -> Result<()> {
pub fn validate(&self, options: &ValidateOptions, monitor: Arc<dyn Monitor>) -> Result<()> {
self.validate_archive_dir()?;

debug!("List bands...");
Expand All @@ -334,14 +319,15 @@ impl Archive {

// 1. Walk all indexes, collecting a list of (block_hash6, min_length)
// values referenced by all the indexes.
let referenced_lens = validate::validate_bands(self, &band_ids)?;
let referenced_lens = validate::validate_bands(self, &band_ids, monitor.clone())?;

if options.skip_block_hashes {
// 3a. Check that all referenced blocks are present, without spending time reading their
// content.
debug!("List blocks...");
// TODO: Check for unexpected files or directories in the blockdir.
let present_blocks: HashSet<BlockHash> = self.block_dir.block_names_set()?;
let present_blocks: HashSet<BlockHash> =
self.block_dir.blocks(monitor.clone())?.collect();
for block_hash in referenced_lens.keys() {
if !present_blocks.contains(block_hash) {
error!(%block_hash, "Referenced block missing");
Expand All @@ -350,7 +336,8 @@ impl Archive {
} else {
// 2. Check the hash of all blocks are correct, and remember how long
// the uncompressed data is.
let block_lengths: HashMap<BlockHash, usize> = self.block_dir.validate()?;
let block_lengths: HashMap<BlockHash, usize> =
self.block_dir.validate(monitor.clone())?;
// 3b. Check that all referenced ranges are inside the present data.
for (block_hash, referenced_len) in referenced_lens {
if let Some(&actual_len) = block_lengths.get(&block_hash) {
Expand Down
Loading