Skip to content

Commit

Permalink
Fix for enforcing document retention (#23587)
Browse files Browse the repository at this point in the history
This PR enforces document retention on `load_documents` by adding a method to the `RetentionValidator` that checks if reading at a `ts` is valid.

To make this work with the deleter for index retention, we ensure that the invariant `index_retention_confirmed_deleted_ts` >= `min_documents_snapshot_ts` is always true.

Previously, this caused a bug because retention was loading documents outside of the valid `documents` snapshot window. Enforcing the above invariant will prevent this + keep us from deleting documents before the indexes are deleted.

This will be followed up by enforcing retention on `previous_revisions`

GitOrigin-RevId: cffead60258c67df3a2ca31213536bca285f0ec0
  • Loading branch information
jordanhunt22 authored and Convex, Inc. committed Mar 26, 2024
1 parent 704a39d commit 7c86f7a
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 23 deletions.
69 changes: 55 additions & 14 deletions crates/common/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ pub trait RetentionValidator: Sync + Send {
/// Call validate_snapshot *after* reading at the snapshot, to confirm all
/// data in the snapshot is within retention.
async fn validate_snapshot(&self, ts: Timestamp) -> anyhow::Result<()>;
/// Call validate_document_snapshot *after* reading at the snapshot, to
/// confirm the documents log is valid at this snapshot.
async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()>;
async fn min_snapshot_ts(&self) -> anyhow::Result<Timestamp>;
async fn min_document_snapshot_ts(&self) -> anyhow::Result<Timestamp>;

Expand All @@ -317,6 +320,7 @@ pub trait PersistenceReader: Send + Sync + 'static {
range: TimestampRange,
order: Order,
page_size: u32,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_>;

/// Loads documents within the given table and the given timestamp range.
Expand All @@ -333,21 +337,13 @@ pub trait PersistenceReader: Send + Sync + 'static {
range: TimestampRange,
order: Order,
page_size: u32,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
self.load_documents(range, order, page_size)
self.load_documents(range, order, page_size, retention_validator)
.try_filter(move |(_, doc_id, _)| future::ready(*doc_id.table() == table_id))
.boxed()
}

/// Returns all timestamps in ascending (ts, id) order.
fn load_all_documents(&self) -> DocumentStream {
self.load_documents(
TimestampRange::all(),
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
)
}

/// Look up the previous revision of `(id, ts)`, returning a map where for
/// each `(id, ts)` we have...
///
Expand Down Expand Up @@ -424,7 +420,15 @@ pub trait PersistenceReader: Send + Sync + 'static {
async fn max_ts(&self) -> anyhow::Result<Option<Timestamp>> {
// Fetch the document with the maximum timestamp and also MaxRepeatableTimestamp
// in parallel.
let mut stream = self.load_documents(TimestampRange::all(), Order::Desc, 1);
let mut stream = self.load_documents(
TimestampRange::all(),
Order::Desc,
1,
// We don't know the ID of the most recent document, so we
// need to scan the entire timestamp range to find it
// (this may include looking at the `documents` log outside of the retention window)
Arc::new(NoopRetentionValidator),
);
let max_repeatable =
self.get_persistence_global(PersistenceGlobalKey::MaxRepeatableTimestamp);
let (max_committed, max_repeatable) = try_join!(stream.try_next(), max_repeatable)?;
Expand All @@ -435,6 +439,18 @@ pub trait PersistenceReader: Send + Sync + 'static {
}

fn version(&self) -> PersistenceVersion;

/// Returns all timestamps and documents in ascending (ts, table_id, id)
/// order. Only should be used for testing
#[cfg(any(test, feature = "testing"))]
fn load_all_documents(&self) -> DocumentStream {
self.load_documents(
TimestampRange::all(),
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
Arc::new(NoopRetentionValidator),
)
}
}

pub fn now_ts<RT: Runtime>(max_ts: Timestamp, rt: &RT) -> anyhow::Result<Timestamp> {
Expand Down Expand Up @@ -500,9 +516,30 @@ impl RepeatablePersistence {
/// Same as [`Persistence::load_documents`] but only including documents in
/// the snapshot range.
pub fn load_documents(&self, range: TimestampRange, order: Order) -> DocumentStream<'_> {
let stream = self
.reader
.load_documents(range, order, *DEFAULT_DOCUMENTS_PAGE_SIZE);
let stream = self.reader.load_documents(
range,
order,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
self.retention_validator.clone(),
);
Box::pin(stream.try_filter(|(ts, ..)| future::ready(*ts <= *self.upper_bound)))
}

/// Same as `load_documents` but doesn't use the `RetentionValidator` from
/// this `RepeatablePersistence`. Instead, the caller can choose its
/// own validator.
pub fn load_documents_with_retention_validator(
&self,
range: TimestampRange,
order: Order,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
let stream = self.reader.load_documents(
range,
order,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
retention_validator,
);
Box::pin(stream.try_filter(|(ts, ..)| future::ready(*ts <= *self.upper_bound)))
}

Expand Down Expand Up @@ -673,6 +710,10 @@ impl RetentionValidator for NoopRetentionValidator {
Ok(())
}

async fn validate_document_snapshot(&self, _ts: Timestamp) -> anyhow::Result<()> {
Ok(())
}

async fn min_snapshot_ts(&self) -> anyhow::Result<Timestamp> {
Ok(Timestamp::MIN)
}
Expand Down
10 changes: 8 additions & 2 deletions crates/common/src/testing/persistence_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,13 @@ pub async fn test_load_documents_from_table<P: Persistence>(
for page_size in 1..3 {
let docs: Vec<_> = p
.reader()
.load_documents_from_table(table_id, range, order, page_size)
.load_documents_from_table(
table_id,
range,
order,
page_size,
Arc::new(NoopRetentionValidator),
)
.try_collect()
.await?;
let docs: Vec<_> = docs.into_iter().collect();
Expand All @@ -626,7 +632,7 @@ pub async fn test_load_documents<P: Persistence>(
) -> anyhow::Result<()> {
let docs: Vec<_> = p
.reader()
.load_documents(range, order, 10)
.load_documents(range, order, 10, Arc::new(NoopRetentionValidator))
.try_collect()
.await?;
let docs: Vec<_> = docs
Expand Down
1 change: 1 addition & 0 deletions crates/common/src/testing/test_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl PersistenceReader for TestPersistence {
range: TimestampRange,
order: Order,
_page_size: u32,
_retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
let log = { self.inner.lock().log.clone() };

Expand Down
13 changes: 12 additions & 1 deletion crates/database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ impl<RT: Runtime> Database<RT> {
timestamp_range,
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
self.retention_validator(),
)
.then(|val| async {
while let Err(not_until) = rate_limiter.check() {
Expand Down Expand Up @@ -1526,7 +1527,17 @@ impl<RT: Runtime> Database<RT> {
// should request another page.
let mut has_more = false;
let mut rows_read = 0;
while let Some((ts, id, maybe_doc)) = document_stream.try_next().await? {
while let Some((ts, id, maybe_doc)) = match document_stream.try_next().await {
Ok::<_, Error>(doc) => doc,
Err(e) if e.is_out_of_retention() => {
// Throws a user error if the documents window is out of retention
anyhow::bail!(ErrorMetadata::bad_request(
"InvalidWindowToReadDocuments",
format!("Timestamp {} is too old", range.min_timestamp_inclusive())
))
},
Err(e) => anyhow::bail!(e),
} {
rows_read += 1;
if let Some(new_cursor) = new_cursor
&& new_cursor < ts
Expand Down
50 changes: 45 additions & 5 deletions crates/database/src/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
let reader = persistence.reader();
let persistence_version = reader.version();
let snapshot_ts = new_static_repeatable_ts(min_snapshot_ts, reader.as_ref(), rt).await?;
let reader = RepeatablePersistence::new(reader, snapshot_ts, retention_validator);
let reader = RepeatablePersistence::new(reader, snapshot_ts, retention_validator.clone());

tracing::trace!("delete: about to grab chunks");
let expired_chunks = Self::expired_index_entries(
Expand Down Expand Up @@ -744,6 +744,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
&mut all_indexes,
&mut index_cursor,
index_table_id,
retention_validator.clone(),
)
.await?;
tracing::trace!("go_delete: Loaded initial indexes");
Expand All @@ -763,6 +764,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
&mut all_indexes,
&mut index_cursor,
index_table_id,
retention_validator.clone(),
)
.await?;
tracing::trace!("go_delete: loaded second round of indexes");
Expand Down Expand Up @@ -866,12 +868,14 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
all_indexes: &mut BTreeMap<IndexId, (GenericIndexName<TableId>, IndexedFields)>,
cursor: &mut Timestamp,
index_table_id: TableIdAndTableNumber,
retention_validator: Arc<dyn RetentionValidator>,
) -> anyhow::Result<()> {
let reader = persistence.reader();
let mut document_stream = reader.load_documents(
TimestampRange::greater_than(*cursor),
Order::Asc,
*DEFAULT_DOCUMENTS_PAGE_SIZE,
retention_validator,
);
while let Some((ts, _, maybe_doc)) = document_stream.try_next().await? {
Self::accumulate_index_document(maybe_doc, all_indexes, index_table_id)?;
Expand All @@ -889,7 +893,23 @@ impl<RT: Runtime> RetentionValidator for LeaderRetentionManager<RT> {
let min_snapshot_ts = self.bounds_reader.lock().min_snapshot_ts;
log_snapshot_verification_age(&self.rt, ts, min_snapshot_ts, false, true);
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(ts, min_snapshot_ts));
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Index
));
}
Ok(())
}

async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()> {
let min_snapshot_ts = self.bounds_reader.lock().min_document_snapshot_ts;
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Document
));
}
Ok(())
}
Expand Down Expand Up @@ -988,7 +1008,23 @@ impl<RT: Runtime> RetentionValidator for FollowerRetentionManager<RT> {
let min_snapshot_ts = self.min_snapshot_ts().await?;
log_snapshot_verification_age(&self.rt, ts, min_snapshot_ts, false, false);
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(ts, min_snapshot_ts));
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Index
));
}
Ok(())
}

async fn validate_document_snapshot(&self, ts: Timestamp) -> anyhow::Result<()> {
let min_snapshot_ts = self.min_document_snapshot_ts().await?;
if ts < min_snapshot_ts {
anyhow::bail!(snapshot_invalid_error(
ts,
min_snapshot_ts,
RetentionType::Document
));
}
Ok(())
}
Expand Down Expand Up @@ -1026,9 +1062,13 @@ impl<RT: Runtime> RetentionValidator for FollowerRetentionManager<RT> {
}
}

fn snapshot_invalid_error(ts: Timestamp, min_snapshot_ts: Timestamp) -> anyhow::Error {
fn snapshot_invalid_error(
ts: Timestamp,
min_snapshot_ts: Timestamp,
retention_type: RetentionType,
) -> anyhow::Error {
anyhow::anyhow!(ErrorMetadata::out_of_retention()).context(format!(
"Snapshot timestamp out of retention window: {ts} < {min_snapshot_ts}"
"{retention_type:?} snapshot timestamp out of retention window: {ts} < {min_snapshot_ts}"
))
}

Expand Down
17 changes: 16 additions & 1 deletion crates/sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ impl SqlitePersistence {
retention_validator.validate_snapshot(ts).await?;
}

#[allow(clippy::needless_lifetimes)]
#[try_stream(ok = T, error = anyhow::Error)]
async fn validate_document_snapshot<T: 'static>(
&self,
ts: Timestamp,
retention_validator: Arc<dyn RetentionValidator>,
) {
retention_validator.validate_document_snapshot(ts).await?;
}

fn _index_scan_inner(
&self,
index_id: IndexId,
Expand Down Expand Up @@ -481,6 +491,7 @@ impl PersistenceReader for SqlitePersistence {
range: TimestampRange,
order: Order,
_page_size: u32,
retention_validator: Arc<dyn RetentionValidator>,
) -> DocumentStream<'_> {
let triples = try {
let connection = &self.inner.lock().connection;
Expand Down Expand Up @@ -509,8 +520,12 @@ impl PersistenceReader for SqlitePersistence {
}
triples
};
// load_documents isn't async so we have to validate snapshot as part of the
// stream.
let validate =
self.validate_document_snapshot(range.min_timestamp_inclusive(), retention_validator);
match triples {
Ok(s) => stream::iter(s).boxed(),
Ok(s) => (validate.chain(stream::iter(s))).boxed(),
Err(e) => stream::once(async { Err(e) }).boxed(),
}
}
Expand Down

0 comments on commit 7c86f7a

Please sign in to comment.