Skip to content

Commit

Permalink
Add functions that delete documents from documents log (#23214)
Browse files Browse the repository at this point in the history
Added implementations for `documents_to_delete` and `delete` for all persistence implementations along with tests. This lays the groundwork to start working on a handle that deletes documents that are outside of the retention window.

Before this can happen, I still have to:
- enforce that all functions that read directly from the `documents` table are called at a valid timestamp
- ensure that doc retention windows are working as expected in `prod`

GitOrigin-RevId: 232a12095d211a380a4e63a872639f9df6139fee
  • Loading branch information
jordanhunt22 authored and Convex, Inc. committed Mar 11, 2024
1 parent 878094a commit 79498fc
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 11 deletions.
12 changes: 12 additions & 0 deletions crates/common/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ pub trait Persistence: Sync + Send + 'static {
) -> anyhow::Result<Vec<IndexEntry>>;
async fn delete_index_entries(&self, entries: Vec<IndexEntry>) -> anyhow::Result<usize>;

// Retrieves expired documents
async fn documents_to_delete(
&self,
expired_documents: &Vec<(Timestamp, InternalDocumentId)>,
) -> anyhow::Result<Vec<(Timestamp, InternalDocumentId)>>;

// Deletes documents
async fn delete(
&self,
documents: Vec<(Timestamp, InternalDocumentId)>,
) -> anyhow::Result<usize>;

fn box_clone(&self) -> Box<dyn Persistence>;

// No-op by default. Persistence implementation can override.
Expand Down
93 changes: 82 additions & 11 deletions crates/common/src/testing/persistence_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use futures::{
pin_mut,
Future,
StreamExt,
TryStreamExt,
Expand Down Expand Up @@ -205,6 +206,13 @@ macro_rules! run_persistence_test_suite {
persistence_test_suite::persistence_enforce_retention(p).await
}

#[tokio::test]
async fn test_persistence_delete_documents() -> anyhow::Result<()> {
let $db = $create_db;
let p = $create_persistence;
persistence_test_suite::persistence_delete_documents(p).await
}

#[tokio::test]
async fn test_persistence_previous_revisions() -> anyhow::Result<()> {
let $db = $create_db;
Expand Down Expand Up @@ -1311,24 +1319,24 @@ pub async fn persistence_global<P: Persistence>(p: P) -> anyhow::Result<()> {
Ok(())
}

fn doc(
id: ResolvedDocumentId,
ts: i32,
val: Option<i64>,
) -> anyhow::Result<(Timestamp, InternalDocumentId, Option<ResolvedDocument>)> {
let doc = val
.map(|val| ResolvedDocument::new(id, CreationTime::ONE, assert_obj!("value" => val)))
.transpose()?;
Ok((Timestamp::must(ts), id.into(), doc))
}

pub async fn persistence_enforce_retention<P: Persistence>(p: P) -> anyhow::Result<()> {
let mut id_generator = TestIdGenerator::new();
let by_id_index_id = id_generator.generate(&INDEX_TABLE).internal_id();
let by_val_index_id = id_generator.generate(&INDEX_TABLE).internal_id();
let table: TableName = str::parse("table")?;
let table_id = id_generator.table_id(&table).table_id;

fn doc(
id: ResolvedDocumentId,
ts: i32,
val: Option<i64>,
) -> anyhow::Result<(Timestamp, InternalDocumentId, Option<ResolvedDocument>)> {
let doc = val
.map(|val| ResolvedDocument::new(id, CreationTime::ONE, assert_obj!("value" => val)))
.transpose()?;
Ok((Timestamp::must(ts), id.into(), doc))
}

let by_id = |id: ResolvedDocumentId,
ts: i32,
deleted: bool|
Expand Down Expand Up @@ -1494,6 +1502,69 @@ pub async fn persistence_enforce_retention<P: Persistence>(p: P) -> anyhow::Resu
Ok(())
}

pub async fn persistence_delete_documents<P: Persistence>(p: P) -> anyhow::Result<()> {
let mut id_generator = TestIdGenerator::new();
let table: TableName = str::parse("table")?;

let id1 = id_generator.generate(&table);
let id2 = id_generator.generate(&table);
let id3 = id_generator.generate(&table);
let id4 = id_generator.generate(&table);
let id5 = id_generator.generate(&table);
let id6 = id_generator.generate(&table);
let id7 = id_generator.generate(&table);
let id8 = id_generator.generate(&table);
let id9 = id_generator.generate(&table);
let id10 = id_generator.generate(&table);

let documents = vec![
doc(id1, 1, Some(1))?,
doc(id2, 2, Some(2))?,
doc(id3, 3, Some(3))?,
// min_document_snapshot_ts: 4
doc(id4, 5, Some(4))?,
doc(id5, 6, Some(5))?,
doc(id6, 7, Some(6))?,
doc(id7, 8, Some(7))?,
doc(id8, 9, Some(8))?,
doc(id9, 10, Some(9))?,
doc(id10, 11, Some(10))?,
];

p.write(documents.clone(), BTreeSet::new(), ConflictStrategy::Error)
.await?;

let reader = p.reader();

let stream = reader.load_all_documents();
pin_mut!(stream);
let mut all_docs = Vec::new();
while let Some(val) = stream.try_next().await? {
all_docs.push(val);
}
assert_eq!(documents.clone(), all_docs);

let docs_to_delete = documents.clone()[..3]
.iter()
.map(|(ts, id, _)| (*ts, *id))
.collect_vec();

let expired_docs = p.documents_to_delete(&docs_to_delete).await?;
assert_eq!(docs_to_delete, expired_docs);

assert_eq!(p.delete(expired_docs).await?, 3);

let stream = reader.load_all_documents();
pin_mut!(stream);
let mut all_docs = Vec::new();
while let Some(val) = stream.try_next().await? {
all_docs.push(val);
}
assert_eq!(documents[3..], all_docs);

Ok(())
}

pub async fn persistence_previous_revisions<P: Persistence>(p: P) -> anyhow::Result<()> {
let reader = p.reader();

Expand Down
30 changes: 30 additions & 0 deletions crates/common/src/testing/test_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,36 @@ impl Persistence for TestPersistence {
Ok(total_deleted)
}

async fn documents_to_delete(
&self,
expired_documents: &Vec<(Timestamp, InternalDocumentId)>,
) -> anyhow::Result<Vec<(Timestamp, InternalDocumentId)>> {
let inner = self.inner.lock();
let log = &inner.log;
let mut new_expired_rows = Vec::new();
for expired_doc in expired_documents {
if log.get(expired_doc).is_some() {
new_expired_rows.push(*expired_doc);
}
}
Ok(new_expired_rows)
}

async fn delete(
&self,
documents: Vec<(Timestamp, InternalDocumentId)>,
) -> anyhow::Result<usize> {
let mut inner = self.inner.lock();
let log = &mut inner.log;
let mut total_deleted = 0;
for expired_doc in documents {
if log.remove(&expired_doc).is_some() {
total_deleted += 1;
}
}
Ok(total_deleted)
}

fn box_clone(&self) -> Box<dyn Persistence> {
Box::new(self.clone())
}
Expand Down
56 changes: 56 additions & 0 deletions crates/sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,56 @@ impl Persistence for SqlitePersistence {
Ok(count_deleted)
}

async fn documents_to_delete(
&self,
expired_documents: &Vec<(Timestamp, InternalDocumentId)>,
) -> anyhow::Result<Vec<(Timestamp, InternalDocumentId)>> {
let connection = &self.inner.lock().connection;
let mut all_entries = BTreeSet::new();
for expired_entry in expired_documents {
let table_id: &TableId = expired_entry.1.table();
let id = expired_entry.1.internal_id();
let params = params![&table_id.0[..], &id[..], &u64::from(expired_entry.0),];
let mut entries_query = connection.prepare(DOCUMENTS_TO_DELETE)?;
let row_iter = entries_query.query_map(params, |row| {
let table_id: Vec<u8> = row.get(0)?;
let id: Vec<u8> = row.get(1)?;
let ts =
Timestamp::try_from(row.get::<_, u64>(2)?).expect("timestamp out of bounds");
Ok((table_id, id, ts))
})?;
for row in row_iter {
let (table_id, id, ts) = row?;
all_entries.insert((
ts,
InternalDocumentId::new(TableId(table_id.try_into()?), id.try_into()?),
));
}
}
Ok(all_entries.into_iter().collect())
}

async fn delete(
&self,
documents: Vec<(Timestamp, InternalDocumentId)>,
) -> anyhow::Result<usize> {
let mut inner = self.inner.lock();
let tx = inner.connection.transaction()?;
let mut delete_document_query = tx.prepare_cached(DELETE_DOCUMENT)?;
let mut count_deleted = 0;

for (ts, internal_id) in documents {
let table_id: &TableId = internal_id.table();
let id = internal_id.internal_id();
count_deleted +=
delete_document_query
.execute(params![&table_id.0[..], &id[..], &u64::from(ts),])?;
}
drop(delete_document_query);
tx.commit()?;
Ok(count_deleted)
}

fn box_clone(&self) -> Box<dyn Persistence> {
Box::new(self.clone())
}
Expand Down Expand Up @@ -634,6 +684,12 @@ ORDER BY index_id DESC, key DESC, ts DESC
"#;
const DELETE_INDEX: &str = "DELETE FROM indexes WHERE index_id = ? AND ts <= ? AND key = ?";

const DOCUMENTS_TO_DELETE: &str = r#"SELECT table_id, id, ts
FROM documents WHERE table_id = ? AND id = ? AND ts <= ?
ORDER BY table_id DESC, id DESC, ts DESC
"#;
const DELETE_DOCUMENT: &str = "DELETE FROM documents WHERE table_id = ? AND id = ? AND ts = ?";

const CHECK_IS_READ_ONLY: &str = "SELECT 1 FROM read_only LIMIT 1";
const SET_READ_ONLY: &str = "INSERT INTO read_only (id) VALUES (1)";
const UNSET_READ_ONLY: &str = "DELETE FROM read_only WHERE id = 1";
Expand Down

0 comments on commit 79498fc

Please sign in to comment.