From 345c543635f8c41910e1c7548c4b8a8cd278bc40 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Thu, 14 Mar 2024 12:49:45 -0400 Subject: [PATCH] fix Interval not advancing (#23462) pending deletes can make an `index_range` return no results, which is fine. but even in these cases we want to shrink the interval being read for the next page. otherwise we end up in an infinite loop. I considered just fixing `interval_unread`, but we actually need the `interval_read` as well, to record the read set (in this case where there's an empty page). And we can't compute `interval_read` with the information available. So instead of passing `interval_unread` (i.e. `remaining_interval`) as a return value, we should return a `CursorPosition` from which we can easily derive both `interval_unread` and `interval_read`. added regression test which fails on main. GitOrigin-RevId: 5a37f0b9ef035dd59df88742afd3c56db0b9b420 --- crates/common/src/interval/mod.rs | 21 +++- .../src/bootstrap_model/user_facing.rs | 15 ++- crates/database/src/query/index_range.rs | 15 +-- crates/database/src/query/mod.rs | 6 +- crates/database/src/transaction.rs | 11 +- crates/database/src/transaction_index.rs | 105 +++++++++--------- crates/database/src/virtual_tables/mod.rs | 8 +- .../indexing/src/backend_in_memory_indexes.rs | 46 ++++---- crates/isolate/src/tests/query.rs | 17 +++ npm-packages/udf-tests/convex/query.ts | 9 ++ 10 files changed, 146 insertions(+), 107 deletions(-) diff --git a/crates/common/src/interval/mod.rs b/crates/common/src/interval/mod.rs index ef9b9b72..9e1fa097 100644 --- a/crates/common/src/interval/mod.rs +++ b/crates/common/src/interval/mod.rs @@ -33,7 +33,10 @@ pub use self::{ }; use crate::{ index::IndexKeyBytes, - query::Order, + query::{ + CursorPosition, + Order, + }, }; #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] @@ -88,6 +91,13 @@ impl Interval { after_start && before_end } + pub fn contains_cursor(&self, cursor: &CursorPosition) -> bool { + match cursor { + CursorPosition::After(last_key) => self.contains(last_key), + CursorPosition::End => true, + } + } + pub fn is_disjoint(&self, other: &Self) -> bool { self.is_empty() || other.is_empty() @@ -109,7 +119,7 @@ impl Interval { /// Note last_key must be a full IndexKeyBytes, not an arbitrary BinaryKey, /// so we can assume there are no other IndexKeyBytes that have `index_key` /// as a prefix. - pub fn split(&self, last_key: IndexKeyBytes, order: Order) -> (Self, Self) { + pub fn split_after(&self, last_key: IndexKeyBytes, order: Order) -> (Self, Self) { let last_key_binary = BinaryKey::from(last_key); match order { Order::Asc => ( @@ -137,6 +147,13 @@ impl Interval { ), } } + + pub fn split(&self, cursor: CursorPosition, order: Order) -> (Self, Self) { + match cursor { + CursorPosition::After(last_key) => self.split_after(last_key, order), + CursorPosition::End => (self.clone(), Interval::empty()), + } + } } impl RangeBounds<[u8]> for &Interval { diff --git a/crates/database/src/bootstrap_model/user_facing.rs b/crates/database/src/bootstrap_model/user_facing.rs index df96e6db..2826d173 100644 --- a/crates/database/src/bootstrap_model/user_facing.rs +++ b/crates/database/src/bootstrap_model/user_facing.rs @@ -11,7 +11,10 @@ use common::{ }, index::IndexKeyBytes, interval::Interval, - query::Order, + query::{ + CursorPosition, + Order, + }, runtime::Runtime, types::{ StableIndexName, @@ -319,10 +322,10 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> { version: Option, ) -> anyhow::Result<( Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>, - Interval, + CursorPosition, )> { if interval.is_empty() { - return Ok((vec![], Interval::empty())); + return Ok((vec![], CursorPosition::End)); } max_rows = cmp::min(max_rows, MAX_PAGE_SIZE); @@ -345,14 +348,14 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> { .await; }, StableIndexName::Missing => { - return Ok((vec![], Interval::empty())); + return Ok((vec![], CursorPosition::End)); }, }; let index_name = tablet_index_name .clone() .map_table(&self.tx.table_mapping().tablet_to_name())?; - let (results, interval_remaining) = self + let (results, cursor) = self .tx .index .range( @@ -373,6 +376,6 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> { anyhow::Ok((key, doc, ts)) }) .try_collect()?; - Ok((developer_results, interval_remaining)) + Ok((developer_results, cursor)) } } diff --git a/crates/database/src/query/index_range.rs b/crates/database/src/query/index_range.rs index b3a728df..a7721906 100644 --- a/crates/database/src/query/index_range.rs +++ b/crates/database/src/query/index_range.rs @@ -91,20 +91,19 @@ impl IndexRange { ) -> Self { // unfetched_interval = intersection of interval with cursor_interval let unfetched_interval = match &cursor_interval.curr_exclusive { - Some(CursorPosition::After(position)) => { - let (_, after_curr_cursor_position) = interval.split(position.clone(), order); + Some(cursor) => { + let (_, after_curr_cursor_position) = interval.split(cursor.clone(), order); after_curr_cursor_position }, - Some(CursorPosition::End) => Interval::empty(), None => interval.clone(), }; let unfetched_interval = match &cursor_interval.end_inclusive { - Some(CursorPosition::After(position)) => { + Some(cursor) => { let (up_to_end_cursor_position, _) = - unfetched_interval.split(position.clone(), order); + unfetched_interval.split(cursor.clone(), order); up_to_end_cursor_position }, - Some(CursorPosition::End) | None => unfetched_interval.clone(), + None => unfetched_interval.clone(), }; Self { @@ -204,7 +203,7 @@ impl IndexRange { } max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read); } - let (page, new_unfetched_interval) = T::index_range( + let (page, fetch_cursor) = T::index_range( tx, &self.stable_index_name, &self.unfetched_interval, @@ -213,6 +212,8 @@ impl IndexRange { self.version.clone(), ) .await?; + let (_, new_unfetched_interval) = + self.unfetched_interval.split(fetch_cursor, self.order); anyhow::ensure!(self.unfetched_interval != new_unfetched_interval); self.unfetched_interval = new_unfetched_interval; self.page_count += 1; diff --git a/crates/database/src/query/mod.rs b/crates/database/src/query/mod.rs index 692b6682..60aa1e26 100644 --- a/crates/database/src/query/mod.rs +++ b/crates/database/src/query/mod.rs @@ -113,7 +113,7 @@ pub trait QueryType { version: Option, ) -> anyhow::Result<( Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, - Interval, + CursorPosition, )>; async fn get_with_ts( @@ -150,7 +150,7 @@ impl QueryType for Resolved { _version: Option, ) -> anyhow::Result<( Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, - Interval, + CursorPosition, )> { tx.index_range(stable_index_name, interval, order, max_rows) .await @@ -193,7 +193,7 @@ impl QueryType for Developer { version: Option, ) -> anyhow::Result<( Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, - Interval, + CursorPosition, )> { UserFacingModel::new(tx) .index_range(stable_index_name, interval, order, max_rows, version) diff --git a/crates/database/src/transaction.rs b/crates/database/src/transaction.rs index cb81311f..e5000b68 100644 --- a/crates/database/src/transaction.rs +++ b/crates/database/src/transaction.rs @@ -43,6 +43,7 @@ use common::{ }, persistence::RetentionValidator, query::{ + CursorPosition, Order, Search, SearchVersion, @@ -775,12 +776,12 @@ impl Transaction { let mut batch_result = BTreeMap::new(); for (batch_key, (id, table_name)) in ids { let result: anyhow::Result<_> = try { - let (range_results, remaining) = + let (range_results, cursor) = results.remove(&batch_key).context("expected result")??; if range_results.len() > 1 { Err(anyhow::anyhow!("Got multiple values for id {id:?}"))?; } - if !remaining.is_empty() { + if !matches!(cursor, CursorPosition::End) { Err(anyhow::anyhow!( "Querying 2 items for a single id didn't exhaust interval for {id:?}" ))?; @@ -971,10 +972,10 @@ impl Transaction { mut max_rows: usize, ) -> anyhow::Result<( Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>, - Interval, + CursorPosition, )> { if interval.is_empty() { - return Ok((vec![], Interval::empty())); + return Ok((vec![], CursorPosition::End)); } let tablet_index_name = match stable_index_name { StableIndexName::Physical(tablet_index_name) => tablet_index_name, @@ -985,7 +986,7 @@ impl Transaction { ); }, StableIndexName::Missing => { - return Ok((vec![], Interval::empty())); + return Ok((vec![], CursorPosition::End)); }, }; let index_name = tablet_index_name diff --git a/crates/database/src/transaction_index.rs b/crates/database/src/transaction_index.rs index ae2cda5d..872c1b83 100644 --- a/crates/database/src/transaction_index.rs +++ b/crates/database/src/transaction_index.rs @@ -29,6 +29,7 @@ use common::{ interval::Interval, knobs::TRANSACTION_MAX_READ_SIZE_BYTES, query::{ + CursorPosition, InternalSearch, Order, SearchVersion, @@ -130,7 +131,7 @@ impl TransactionIndex { BatchKey, anyhow::Result<( Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>, - Interval, + CursorPosition, )>, > { let snapshot = &mut self.database_index_snapshot; @@ -146,7 +147,7 @@ impl TransactionIndex { for (batch_key, range_request) in ranges { let item_result: anyhow::Result<_> = try { - let (snapshot_result_vec, remaining_interval) = snapshot_results + let (snapshot_result_vec, cursor) = snapshot_results .remove(&batch_key) .context("batch_key missing")??; let mut snapshot_it = snapshot_result_vec.into_iter(); @@ -237,12 +238,13 @@ impl TransactionIndex { (None, None) => break, } } - if remaining_interval == range_request.interval { + if !range_request.interval.contains_cursor(&cursor) { Err(anyhow::anyhow!( - "query for {remaining_interval:?} did not shrink" + "query for {:?} not making progress", + range_request.interval ))?; } - (range_results, remaining_interval) + (range_results, cursor) }; assert!(results.insert(batch_key, item_result).is_none()); } @@ -291,7 +293,7 @@ impl TransactionIndex { BatchKey, anyhow::Result<( Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>, - Interval, + CursorPosition, )>, > { let batch_size = ranges.len(); @@ -349,7 +351,7 @@ impl TransactionIndex { ) in ranges_to_fetch { let result: anyhow::Result<_> = try { - let (documents, interval_unfetched) = fetch_results + let (documents, fetch_cursor) = fetch_results .remove(&batch_key) .context("batch item missing")??; let mut total_bytes = 0; @@ -368,31 +370,31 @@ impl TransactionIndex { }) .collect(); - let mut interval_read = Interval::empty(); - let mut interval_unread = interval.clone(); - if out.len() < max_size && within_bytes_limit && interval_unfetched.is_empty() { - // If we exhaust the query before hitting any early-termination condition, - // put the entire range in the read set. - interval_read = interval.clone(); - interval_unread = Interval::empty(); - } else if let Some((last_key, ..)) = out.last() { - // If there is more in the query, split at the last key returned. - (interval_read, interval_unread) = interval.split(last_key.clone(), order); - } + let cursor = if let Some((last_key, ..)) = out.last() + && (out.len() >= max_size || !within_bytes_limit) + { + // We hit an early termination condition within this page. + CursorPosition::After(last_key.clone()) + } else { + // Everything fetched will be returned, so the cursor + // of the page is the fetch cursor + fetch_cursor + }; let indexed_fields = indexed_fields_by_key .remove(&batch_key) .context("indexed_fields missing")?; + let (interval_read, _) = interval.split(cursor.clone(), order); reads.record_indexed_directly( index_name.clone(), indexed_fields.clone(), interval_read, )?; - if interval_unread == interval { + if !interval.contains_cursor(&cursor) { Err(anyhow::anyhow!( - "query for {interval_unread:?} did not shrink" + "query for {interval:?} not making progress" ))?; } - (out, interval_unread) + (out, cursor) }; assert!(results.insert(batch_key, result).is_none()); } @@ -411,7 +413,7 @@ impl TransactionIndex { range_request: RangeRequest, ) -> anyhow::Result<( Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>, - Interval, + CursorPosition, )> { self.range_batch(reads, btreemap! {0 => range_request}) .await @@ -440,7 +442,7 @@ impl TransactionIndex { let mut remaining_interval = interval.clone(); let mut preloaded = BTreeMap::new(); while !remaining_interval.is_empty() { - let (documents, new_remaining_interval) = self + let (documents, cursor) = self .range_no_deps(btreemap! { 0 => RangeRequest { index_name: tablet_index_name.clone(), printable_index_name: printable_index_name.clone(), @@ -451,7 +453,7 @@ impl TransactionIndex { .await .remove(&0) .context("batch_key missing")??; - remaining_interval = new_remaining_interval; + (_, remaining_interval) = interval.split(cursor, Order::Asc); for (_, document, _) in documents { let key = document.value().0.get_path(&indexed_field).cloned(); anyhow::ensure!( @@ -821,12 +823,7 @@ mod tests { ResolvedDocument, }, index::IndexKey, - interval::{ - BinaryKey, - End, - Interval, - Start, - }, + interval::Interval, persistence::{ now_ts, ConflictStrategy, @@ -834,7 +831,10 @@ mod tests { Persistence, RepeatablePersistence, }, - query::Order, + query::{ + CursorPosition, + Order, + }, testing::{ TestIdGenerator, TestPersistence, @@ -1089,7 +1089,7 @@ mod tests { ); // Query the missing table using table scan index. It should return no results. - let (results, remaining_interval) = index + let (results, cursor) = index .range( &mut reads, RangeRequest { @@ -1101,7 +1101,7 @@ mod tests { }, ) .await?; - assert!(remaining_interval.is_empty()); + assert!(matches!(cursor, CursorPosition::End)); assert!(results.is_empty()); // Query by any other index should return an error. @@ -1132,7 +1132,7 @@ mod tests { let by_id_index = gen_index_document(&mut id_generator, metadata.clone())?; index.begin_update(None, Some(by_id_index))?.apply(); - let (results, remaining_interval) = index + let (results, cursor) = index .range( &mut reads, RangeRequest { @@ -1144,7 +1144,7 @@ mod tests { }, ) .await?; - assert!(remaining_interval.is_empty()); + assert!(matches!(cursor, CursorPosition::End)); assert!(results.is_empty()); // Add a document and make sure we see it. @@ -1156,7 +1156,7 @@ mod tests { ), )?; index.begin_update(None, Some(doc.clone()))?.apply(); - let (result, remaining_interval) = index + let (result, cursor) = index .range( &mut reads, RangeRequest { @@ -1177,7 +1177,7 @@ mod tests { WriteTimestamp::Pending )], ); - assert!(remaining_interval.is_empty()); + assert!(matches!(cursor, CursorPosition::End)); Ok(()) } @@ -1303,7 +1303,7 @@ mod tests { index.begin_update(Some(bob), None)?.apply(); // Query by id - let (results, remaining_interval) = index + let (results, cursor) = index .range( &mut reads, RangeRequest { @@ -1315,7 +1315,7 @@ mod tests { }, ) .await?; - assert!(remaining_interval.is_empty()); + assert!(matches!(cursor, CursorPosition::End)); assert_eq!( results, vec![ @@ -1354,7 +1354,7 @@ mod tests { expected_reads.record_indexed_directly(by_id, IndexedFields::by_id(), Interval::all())?; assert_eq!(reads, expected_reads); // Query by name in ascending order - let (results, remaining_interval) = index + let (results, cursor) = index .range( &mut reads, RangeRequest { @@ -1366,7 +1366,7 @@ mod tests { }, ) .await?; - assert!(remaining_interval.is_empty()); + assert!(matches!(cursor, CursorPosition::End)); assert_eq!( results, vec![ @@ -1393,8 +1393,8 @@ mod tests { ] ); // Query by name in ascending order with limit=2. - // Returned remaining interval should be ("david", unbounded). - let (results, remaining_interval) = index + // Returned cursor should be After("david"). + let (results, cursor) = index .range( &mut reads, RangeRequest { @@ -1407,18 +1407,13 @@ mod tests { ) .await?; assert_eq!( - remaining_interval.start, - Start::Included( - BinaryKey::from( - david - .index_key(&by_name_fields[..], persistence_version) - .into_bytes() - ) - .increment() - .unwrap() + cursor, + CursorPosition::After( + david + .index_key(&by_name_fields[..], persistence_version) + .into_bytes() ) ); - assert_eq!(remaining_interval.end, End::Unbounded); assert_eq!( results, vec![ @@ -1440,7 +1435,7 @@ mod tests { ); // Query by name in descending order - let (result, remaining_interval) = index + let (result, cursor) = index .range( &mut reads, RangeRequest { @@ -1452,7 +1447,7 @@ mod tests { }, ) .await?; - assert!(remaining_interval.is_empty()); + assert!(matches!(cursor, CursorPosition::End)); assert_eq!( result, vec![ diff --git a/crates/database/src/virtual_tables/mod.rs b/crates/database/src/virtual_tables/mod.rs index 2f5d2f44..00022ca3 100644 --- a/crates/database/src/virtual_tables/mod.rs +++ b/crates/database/src/virtual_tables/mod.rs @@ -9,7 +9,7 @@ use common::{ ResolvedDocument, }, index::IndexKeyBytes, - interval::Interval, + query::CursorPosition, runtime::Runtime, types::{ IndexName, @@ -80,12 +80,12 @@ impl<'a, RT: Runtime> VirtualTable<'a, RT> { version: Option, ) -> anyhow::Result<( Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>, - Interval, + CursorPosition, )> { let table_mapping = self.tx.table_mapping().clone(); let virtual_table_mapping = self.tx.virtual_table_mapping().clone(); - let (results, remaining_interval) = self + let (results, cursor) = self .tx .index .range(&mut self.tx.reads, range_request) @@ -102,7 +102,7 @@ impl<'a, RT: Runtime> VirtualTable<'a, RT> { anyhow::Ok((index_key, doc, ts)) }) .try_collect()?; - Ok((virtual_results, remaining_interval)) + Ok((virtual_results, cursor)) } } diff --git a/crates/indexing/src/backend_in_memory_indexes.rs b/crates/indexing/src/backend_in_memory_indexes.rs index b822f172..c93c33ab 100644 --- a/crates/indexing/src/backend_in_memory_indexes.rs +++ b/crates/indexing/src/backend_in_memory_indexes.rs @@ -28,7 +28,10 @@ use common::{ IntervalSet, }, persistence::PersistenceSnapshot, - query::Order, + query::{ + CursorPosition, + Order, + }, static_span, types::{ DatabaseIndexUpdate, @@ -356,7 +359,10 @@ impl DatabaseIndexSnapshot { pub async fn range( &mut self, range_request: RangeRequest, - ) -> anyhow::Result<(Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>, Interval)> { + ) -> anyhow::Result<( + Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>, + CursorPosition, + )> { let index = match self.index_registry.require_enabled( &range_request.index_name, &range_request.printable_index_name, @@ -368,7 +374,7 @@ impl DatabaseIndexSnapshot { != &self.index_registry.index_table().table_id && range_request.index_name.is_by_id_or_creation_time() => { - return Ok((vec![], Interval::empty())); + return Ok((vec![], CursorPosition::End)); }, Err(e) => anyhow::bail!(e), }; @@ -400,7 +406,7 @@ impl DatabaseIndexSnapshot { ) .await? { - return Ok((range, Interval::empty())); + return Ok((range, CursorPosition::End)); } // Next, try the transaction cache. @@ -408,8 +414,8 @@ impl DatabaseIndexSnapshot { self.cache .get(index.id(), &range_request.interval, range_request.order); - let (results, cache_miss_results, interval_read, interval_remaining) = self - .fetch_cache_misses(index.id(), range_request, cache_results) + let (results, cache_miss_results, cursor) = self + .fetch_cache_misses(index.id(), range_request.clone(), cache_results) .await?; for (ts, doc) in cache_miss_results.into_iter() { @@ -420,12 +426,15 @@ impl DatabaseIndexSnapshot { .populate(some_index.id(), index_key.into_bytes(), ts, doc.clone()); } } + let (interval_read, _) = range_request + .interval + .split(cursor.clone(), range_request.order); // After all documents in an index interval have been // added to the cache with `populate_cache`, record the entire interval as // being populated. self.cache .record_interval_populated(index.id(), interval_read); - Ok((results, interval_remaining)) + Ok((results, cursor)) } async fn fetch_cache_misses( @@ -436,8 +445,7 @@ impl DatabaseIndexSnapshot { ) -> anyhow::Result<( Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>, Vec<(Timestamp, ResolvedDocument)>, - Interval, - Interval, + CursorPosition, )> { let mut results = vec![]; let mut cache_miss_results = vec![]; @@ -475,22 +483,10 @@ impl DatabaseIndexSnapshot { .expect("should be at least one result") .0 .clone(); - let (interval_read, interval_remaining) = - range_request.interval.split(last_key, range_request.order); - return Ok(( - results, - cache_miss_results, - interval_read, - interval_remaining, - )); + return Ok((results, cache_miss_results, CursorPosition::After(last_key))); } } - Ok(( - results, - cache_miss_results, - range_request.interval, - Interval::empty(), - )) + Ok((results, cache_miss_results, CursorPosition::End)) } /// Lookup the latest value of a document by id. Returns the document and @@ -508,7 +504,7 @@ impl DatabaseIndexSnapshot { // We call next() twice due to the verification below. let max_size = 2; - let (stream, remaining_interval) = self + let (stream, cursor) = self .range(RangeRequest { index_name, printable_index_name, @@ -525,7 +521,7 @@ impl DatabaseIndexSnapshot { "Got multiple values for key {:?}", key ); - anyhow::ensure!(remaining_interval.is_empty()); + anyhow::ensure!(matches!(cursor, CursorPosition::End)); Ok(Some((doc, ts))) }, None => Ok(None), diff --git a/crates/isolate/src/tests/query.rs b/crates/isolate/src/tests/query.rs index 30e4010b..5ee71e1d 100644 --- a/crates/isolate/src/tests/query.rs +++ b/crates/isolate/src/tests/query.rs @@ -793,3 +793,20 @@ async fn test_query_order_filter(rt: TestRuntime) -> anyhow::Result<()> { Ok(()) } + +#[convex_macro::test_runtime] +async fn test_query_with_pending_deletes(rt: TestRuntime) -> anyhow::Result<()> { + let t = UdfTest::default(rt).await?; + for i in 0..10 { + t.mutation("query:insert", assert_obj!("number" => i)) + .await?; + } + + // Deletes 0 through 4, and returns the next which is 5. + let res = t + .mutation("query:firstAfterPendingDeletes", assert_obj!()) + .await?; + must_let!(let ConvexValue::Int64(first) = res); + assert_eq!(first, 5); + Ok(()) +} diff --git a/npm-packages/udf-tests/convex/query.ts b/npm-packages/udf-tests/convex/query.ts index 932d3360..6f7a6383 100644 --- a/npm-packages/udf-tests/convex/query.ts +++ b/npm-packages/udf-tests/convex/query.ts @@ -127,3 +127,12 @@ export const orderOrder = query(async ({ db }) => { const q: any = db.query("test").order("desc"); return q.order("desc").collect(); }); + +export const firstAfterPendingDeletes = mutation(async ({ db }) => { + const toDelete = await db.query("test").take(5); + for (const doc of toDelete) { + await db.delete(doc._id); + } + const firstDoc = await db.query("test").first(); + return firstDoc!.hello; +});