diff --git a/crates/common/src/persistence.rs b/crates/common/src/persistence.rs index 7b365ef5..2049e2fb 100644 --- a/crates/common/src/persistence.rs +++ b/crates/common/src/persistence.rs @@ -198,6 +198,18 @@ pub trait Persistence: Sync + Send + 'static { ) -> anyhow::Result>; async fn delete_index_entries(&self, entries: Vec) -> anyhow::Result; + // Retrieves expired documents + async fn documents_to_delete( + &self, + expired_documents: &Vec<(Timestamp, InternalDocumentId)>, + ) -> anyhow::Result>; + + // Deletes documents + async fn delete( + &self, + documents: Vec<(Timestamp, InternalDocumentId)>, + ) -> anyhow::Result; + fn box_clone(&self) -> Box; // No-op by default. Persistence implementation can override. diff --git a/crates/common/src/testing/persistence_test_suite.rs b/crates/common/src/testing/persistence_test_suite.rs index ba85b3e1..773ebda0 100644 --- a/crates/common/src/testing/persistence_test_suite.rs +++ b/crates/common/src/testing/persistence_test_suite.rs @@ -8,6 +8,7 @@ use std::{ }; use futures::{ + pin_mut, Future, StreamExt, TryStreamExt, @@ -205,6 +206,13 @@ macro_rules! run_persistence_test_suite { persistence_test_suite::persistence_enforce_retention(p).await } + #[tokio::test] + async fn test_persistence_delete_documents() -> anyhow::Result<()> { + let $db = $create_db; + let p = $create_persistence; + persistence_test_suite::persistence_delete_documents(p).await + } + #[tokio::test] async fn test_persistence_previous_revisions() -> anyhow::Result<()> { let $db = $create_db; @@ -1311,6 +1319,17 @@ pub async fn persistence_global(p: P) -> anyhow::Result<()> { Ok(()) } +fn doc( + id: ResolvedDocumentId, + ts: i32, + val: Option, +) -> anyhow::Result<(Timestamp, InternalDocumentId, Option)> { + let doc = val + .map(|val| ResolvedDocument::new(id, CreationTime::ONE, assert_obj!("value" => val))) + .transpose()?; + Ok((Timestamp::must(ts), id.into(), doc)) +} + pub async fn persistence_enforce_retention(p: P) -> anyhow::Result<()> { let mut id_generator = TestIdGenerator::new(); let by_id_index_id = id_generator.generate(&INDEX_TABLE).internal_id(); @@ -1318,17 +1337,6 @@ pub async fn persistence_enforce_retention(p: P) -> anyhow::Resu let table: TableName = str::parse("table")?; let table_id = id_generator.table_id(&table).table_id; - fn doc( - id: ResolvedDocumentId, - ts: i32, - val: Option, - ) -> anyhow::Result<(Timestamp, InternalDocumentId, Option)> { - let doc = val - .map(|val| ResolvedDocument::new(id, CreationTime::ONE, assert_obj!("value" => val))) - .transpose()?; - Ok((Timestamp::must(ts), id.into(), doc)) - } - let by_id = |id: ResolvedDocumentId, ts: i32, deleted: bool| @@ -1494,6 +1502,69 @@ pub async fn persistence_enforce_retention(p: P) -> anyhow::Resu Ok(()) } +pub async fn persistence_delete_documents(p: P) -> anyhow::Result<()> { + let mut id_generator = TestIdGenerator::new(); + let table: TableName = str::parse("table")?; + + let id1 = id_generator.generate(&table); + let id2 = id_generator.generate(&table); + let id3 = id_generator.generate(&table); + let id4 = id_generator.generate(&table); + let id5 = id_generator.generate(&table); + let id6 = id_generator.generate(&table); + let id7 = id_generator.generate(&table); + let id8 = id_generator.generate(&table); + let id9 = id_generator.generate(&table); + let id10 = id_generator.generate(&table); + + let documents = vec![ + doc(id1, 1, Some(1))?, + doc(id2, 2, Some(2))?, + doc(id3, 3, Some(3))?, + // min_document_snapshot_ts: 4 + doc(id4, 5, Some(4))?, + doc(id5, 6, Some(5))?, + doc(id6, 7, Some(6))?, + doc(id7, 8, Some(7))?, + doc(id8, 9, Some(8))?, + doc(id9, 10, Some(9))?, + doc(id10, 11, Some(10))?, + ]; + + p.write(documents.clone(), BTreeSet::new(), ConflictStrategy::Error) + .await?; + + let reader = p.reader(); + + let stream = reader.load_all_documents(); + pin_mut!(stream); + let mut all_docs = Vec::new(); + while let Some(val) = stream.try_next().await? { + all_docs.push(val); + } + assert_eq!(documents.clone(), all_docs); + + let docs_to_delete = documents.clone()[..3] + .iter() + .map(|(ts, id, _)| (*ts, *id)) + .collect_vec(); + + let expired_docs = p.documents_to_delete(&docs_to_delete).await?; + assert_eq!(docs_to_delete, expired_docs); + + assert_eq!(p.delete(expired_docs).await?, 3); + + let stream = reader.load_all_documents(); + pin_mut!(stream); + let mut all_docs = Vec::new(); + while let Some(val) = stream.try_next().await? { + all_docs.push(val); + } + assert_eq!(documents[3..], all_docs); + + Ok(()) +} + pub async fn persistence_previous_revisions(p: P) -> anyhow::Result<()> { let reader = p.reader(); diff --git a/crates/common/src/testing/test_persistence.rs b/crates/common/src/testing/test_persistence.rs index 9da3f5de..178625de 100644 --- a/crates/common/src/testing/test_persistence.rs +++ b/crates/common/src/testing/test_persistence.rs @@ -224,6 +224,36 @@ impl Persistence for TestPersistence { Ok(total_deleted) } + async fn documents_to_delete( + &self, + expired_documents: &Vec<(Timestamp, InternalDocumentId)>, + ) -> anyhow::Result> { + let inner = self.inner.lock(); + let log = &inner.log; + let mut new_expired_rows = Vec::new(); + for expired_doc in expired_documents { + if log.get(expired_doc).is_some() { + new_expired_rows.push(*expired_doc); + } + } + Ok(new_expired_rows) + } + + async fn delete( + &self, + documents: Vec<(Timestamp, InternalDocumentId)>, + ) -> anyhow::Result { + let mut inner = self.inner.lock(); + let log = &mut inner.log; + let mut total_deleted = 0; + for expired_doc in documents { + if log.remove(&expired_doc).is_some() { + total_deleted += 1; + } + } + Ok(total_deleted) + } + fn box_clone(&self) -> Box { Box::new(self.clone()) } diff --git a/crates/sqlite/src/lib.rs b/crates/sqlite/src/lib.rs index 92c6ec80..936b42a2 100644 --- a/crates/sqlite/src/lib.rs +++ b/crates/sqlite/src/lib.rs @@ -424,6 +424,56 @@ impl Persistence for SqlitePersistence { Ok(count_deleted) } + async fn documents_to_delete( + &self, + expired_documents: &Vec<(Timestamp, InternalDocumentId)>, + ) -> anyhow::Result> { + let connection = &self.inner.lock().connection; + let mut all_entries = BTreeSet::new(); + for expired_entry in expired_documents { + let table_id: &TableId = expired_entry.1.table(); + let id = expired_entry.1.internal_id(); + let params = params![&table_id.0[..], &id[..], &u64::from(expired_entry.0),]; + let mut entries_query = connection.prepare(DOCUMENTS_TO_DELETE)?; + let row_iter = entries_query.query_map(params, |row| { + let table_id: Vec = row.get(0)?; + let id: Vec = row.get(1)?; + let ts = + Timestamp::try_from(row.get::<_, u64>(2)?).expect("timestamp out of bounds"); + Ok((table_id, id, ts)) + })?; + for row in row_iter { + let (table_id, id, ts) = row?; + all_entries.insert(( + ts, + InternalDocumentId::new(TableId(table_id.try_into()?), id.try_into()?), + )); + } + } + Ok(all_entries.into_iter().collect()) + } + + async fn delete( + &self, + documents: Vec<(Timestamp, InternalDocumentId)>, + ) -> anyhow::Result { + let mut inner = self.inner.lock(); + let tx = inner.connection.transaction()?; + let mut delete_document_query = tx.prepare_cached(DELETE_DOCUMENT)?; + let mut count_deleted = 0; + + for (ts, internal_id) in documents { + let table_id: &TableId = internal_id.table(); + let id = internal_id.internal_id(); + count_deleted += + delete_document_query + .execute(params![&table_id.0[..], &id[..], &u64::from(ts),])?; + } + drop(delete_document_query); + tx.commit()?; + Ok(count_deleted) + } + fn box_clone(&self) -> Box { Box::new(self.clone()) } @@ -634,6 +684,12 @@ ORDER BY index_id DESC, key DESC, ts DESC "#; const DELETE_INDEX: &str = "DELETE FROM indexes WHERE index_id = ? AND ts <= ? AND key = ?"; +const DOCUMENTS_TO_DELETE: &str = r#"SELECT table_id, id, ts +FROM documents WHERE table_id = ? AND id = ? AND ts <= ? +ORDER BY table_id DESC, id DESC, ts DESC +"#; +const DELETE_DOCUMENT: &str = "DELETE FROM documents WHERE table_id = ? AND id = ? AND ts = ?"; + const CHECK_IS_READ_ONLY: &str = "SELECT 1 FROM read_only LIMIT 1"; const SET_READ_ONLY: &str = "INSERT INTO read_only (id) VALUES (1)"; const UNSET_READ_ONLY: &str = "DELETE FROM read_only WHERE id = 1";