Skip to content

Commit

Permalink
Make checkpoint metric in relation to max_repeatable_ts (#23087)
Browse files Browse the repository at this point in the history
This makes the metrics aligned with how we calculate the min valid snapshot for retention.

GitOrigin-RevId: 341cc3d9b96e5b1ae271de154a1e07f9bfaa01e0
  • Loading branch information
jordanhunt22 authored and Convex, Inc committed Mar 11, 2024
1 parent 8f480ad commit 843e694
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions crates/database/src/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
follower_retention_manager,
receive_min_snapshot,
checkpoint_writer,
snapshot_reader.clone(),
),
);
Ok(Self {
Expand Down Expand Up @@ -321,7 +322,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {

async fn advance_timestamp(
bounds_writer: &Writer<SnapshotBounds>,
rt: &RT,
persistence: &dyn Persistence,
snapshot_reader: &Reader<SnapshotManager>,
retention_type: RetentionType,
Expand Down Expand Up @@ -358,7 +358,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
tracing::debug!("Advance {retention_type:?} min snapshot to {new_min_snapshot_ts}");
// Also log the deletion checkpoint here, so it is periodically reported
// even if the deletion future is stuck.
Self::get_checkpoint(rt, persistence.reader().as_ref()).await?;
Self::get_checkpoint(persistence.reader().as_ref(), snapshot_reader.clone()).await?;
Ok(Some(new_min_snapshot_ts))
}

Expand Down Expand Up @@ -405,7 +405,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {

let index_ts = Self::advance_timestamp(
&bounds_writer,
&rt,
persistence.as_ref(),
&snapshot_reader,
RetentionType::Index,
Expand All @@ -420,7 +419,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {

let document_ts = Self::advance_timestamp(
&bounds_writer,
&rt,
persistence.as_ref(),
&snapshot_reader,
RetentionType::Document,
Expand Down Expand Up @@ -685,6 +683,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
retention_validator: Arc<dyn RetentionValidator>,
min_snapshot_rx: Receiver<Timestamp>,
checkpoint_writer: Writer<Checkpoint>,
snapshot_reader: Reader<SnapshotManager>,
) {
let reader = persistence.reader();
let mut all_indexes = indexes_at_min_snapshot;
Expand Down Expand Up @@ -712,7 +711,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
);
let r: anyhow::Result<()> = try {
let _timer = retention_delete_timer();
let cursor = Self::get_checkpoint(&rt, reader.as_ref()).await?;
let cursor = Self::get_checkpoint(reader.as_ref(), snapshot_reader.clone()).await?;
tracing::trace!("go_delete: loaded checkpoint: {cursor:?}");
Self::accumulate_indexes(
persistence.as_ref(),
Expand Down Expand Up @@ -790,8 +789,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
}

async fn get_checkpoint(
rt: &RT,
persistence: &dyn PersistenceReader,
snapshot_reader: Reader<SnapshotManager>,
) -> anyhow::Result<Timestamp> {
let checkpoint_value = persistence
.get_persistence_global(PersistenceGlobalKey::RetentionConfirmedDeletedTimestamp)
Expand All @@ -802,7 +801,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
Some(ConvexValue::Int64(ts)) => {
let checkpoint = Timestamp::try_from(ts)?;
log_retention_cursor_age(
Timestamp::try_from(rt.system_time())?.secs_since_f64(checkpoint),
(*snapshot_reader.lock().latest_ts()).secs_since_f64(checkpoint),
);
checkpoint
},
Expand Down

0 comments on commit 843e694

Please sign in to comment.