From 843e6947080168bc72278a110723565a470a0503 Mon Sep 17 00:00:00 2001 From: Jordan Hunt <65152573+jordanhunt22@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:32:34 -0400 Subject: [PATCH] Make checkpoint metric in relation to `max_repeatable_ts` (#23087) This makes the metrics aligned with how we calculate the min valid snapshot for retention. GitOrigin-RevId: 341cc3d9b96e5b1ae271de154a1e07f9bfaa01e0 --- crates/database/src/retention.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/database/src/retention.rs b/crates/database/src/retention.rs index 5c1c9e35..e4b4ec90 100644 --- a/crates/database/src/retention.rs +++ b/crates/database/src/retention.rs @@ -285,6 +285,7 @@ impl LeaderRetentionManager { follower_retention_manager, receive_min_snapshot, checkpoint_writer, + snapshot_reader.clone(), ), ); Ok(Self { @@ -321,7 +322,6 @@ impl LeaderRetentionManager { async fn advance_timestamp( bounds_writer: &Writer, - rt: &RT, persistence: &dyn Persistence, snapshot_reader: &Reader, retention_type: RetentionType, @@ -358,7 +358,7 @@ impl LeaderRetentionManager { tracing::debug!("Advance {retention_type:?} min snapshot to {new_min_snapshot_ts}"); // Also log the deletion checkpoint here, so it is periodically reported // even if the deletion future is stuck. - Self::get_checkpoint(rt, persistence.reader().as_ref()).await?; + Self::get_checkpoint(persistence.reader().as_ref(), snapshot_reader.clone()).await?; Ok(Some(new_min_snapshot_ts)) } @@ -405,7 +405,6 @@ impl LeaderRetentionManager { let index_ts = Self::advance_timestamp( &bounds_writer, - &rt, persistence.as_ref(), &snapshot_reader, RetentionType::Index, @@ -420,7 +419,6 @@ impl LeaderRetentionManager { let document_ts = Self::advance_timestamp( &bounds_writer, - &rt, persistence.as_ref(), &snapshot_reader, RetentionType::Document, @@ -685,6 +683,7 @@ impl LeaderRetentionManager { retention_validator: Arc, min_snapshot_rx: Receiver, checkpoint_writer: Writer, + snapshot_reader: Reader, ) { let reader = persistence.reader(); let mut all_indexes = indexes_at_min_snapshot; @@ -712,7 +711,7 @@ impl LeaderRetentionManager { ); let r: anyhow::Result<()> = try { let _timer = retention_delete_timer(); - let cursor = Self::get_checkpoint(&rt, reader.as_ref()).await?; + let cursor = Self::get_checkpoint(reader.as_ref(), snapshot_reader.clone()).await?; tracing::trace!("go_delete: loaded checkpoint: {cursor:?}"); Self::accumulate_indexes( persistence.as_ref(), @@ -790,8 +789,8 @@ impl LeaderRetentionManager { } async fn get_checkpoint( - rt: &RT, persistence: &dyn PersistenceReader, + snapshot_reader: Reader, ) -> anyhow::Result { let checkpoint_value = persistence .get_persistence_global(PersistenceGlobalKey::RetentionConfirmedDeletedTimestamp) @@ -802,7 +801,7 @@ impl LeaderRetentionManager { Some(ConvexValue::Int64(ts)) => { let checkpoint = Timestamp::try_from(ts)?; log_retention_cursor_age( - Timestamp::try_from(rt.system_time())?.secs_since_f64(checkpoint), + (*snapshot_reader.lock().latest_ts()).secs_since_f64(checkpoint), ); checkpoint },