Skip to content

Commit

Permalink
fix Interval not advancing (#23462)
Browse files Browse the repository at this point in the history
pending deletes can make an `index_range` return no results, which is fine. but even in these cases we want to shrink the interval being read for the next page. otherwise we end up in an infinite loop.

I considered just fixing `interval_unread`, but we actually need the `interval_read` as well, to record the read set (in this case where there's an empty page). And we can't compute `interval_read` with the information available. So instead of passing `interval_unread` (i.e. `remaining_interval`) as a return value, we should return a `CursorPosition` from which we can easily derive both `interval_unread` and `interval_read`.

added regression test which fails on main.

GitOrigin-RevId: 5a37f0b9ef035dd59df88742afd3c56db0b9b420
  • Loading branch information
ldanilek authored and Convex, Inc. committed Mar 14, 2024
1 parent fce2558 commit 345c543
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 107 deletions.
21 changes: 19 additions & 2 deletions crates/common/src/interval/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub use self::{
};
use crate::{
index::IndexKeyBytes,
query::Order,
query::{
CursorPosition,
Order,
},
};

#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
Expand Down Expand Up @@ -88,6 +91,13 @@ impl Interval {
after_start && before_end
}

pub fn contains_cursor(&self, cursor: &CursorPosition) -> bool {
match cursor {
CursorPosition::After(last_key) => self.contains(last_key),
CursorPosition::End => true,
}
}

pub fn is_disjoint(&self, other: &Self) -> bool {
self.is_empty()
|| other.is_empty()
Expand All @@ -109,7 +119,7 @@ impl Interval {
/// Note last_key must be a full IndexKeyBytes, not an arbitrary BinaryKey,
/// so we can assume there are no other IndexKeyBytes that have `index_key`
/// as a prefix.
pub fn split(&self, last_key: IndexKeyBytes, order: Order) -> (Self, Self) {
pub fn split_after(&self, last_key: IndexKeyBytes, order: Order) -> (Self, Self) {
let last_key_binary = BinaryKey::from(last_key);
match order {
Order::Asc => (
Expand Down Expand Up @@ -137,6 +147,13 @@ impl Interval {
),
}
}

pub fn split(&self, cursor: CursorPosition, order: Order) -> (Self, Self) {
match cursor {
CursorPosition::After(last_key) => self.split_after(last_key, order),
CursorPosition::End => (self.clone(), Interval::empty()),
}
}
}

impl RangeBounds<[u8]> for &Interval {
Expand Down
15 changes: 9 additions & 6 deletions crates/database/src/bootstrap_model/user_facing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use common::{
},
index::IndexKeyBytes,
interval::Interval,
query::Order,
query::{
CursorPosition,
Order,
},
runtime::Runtime,
types::{
StableIndexName,
Expand Down Expand Up @@ -319,10 +322,10 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
version: Option<Version>,
) -> anyhow::Result<(
Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
Interval,
CursorPosition,
)> {
if interval.is_empty() {
return Ok((vec![], Interval::empty()));
return Ok((vec![], CursorPosition::End));
}

max_rows = cmp::min(max_rows, MAX_PAGE_SIZE);
Expand All @@ -345,14 +348,14 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
.await;
},
StableIndexName::Missing => {
return Ok((vec![], Interval::empty()));
return Ok((vec![], CursorPosition::End));
},
};
let index_name = tablet_index_name
.clone()
.map_table(&self.tx.table_mapping().tablet_to_name())?;

let (results, interval_remaining) = self
let (results, cursor) = self
.tx
.index
.range(
Expand All @@ -373,6 +376,6 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
anyhow::Ok((key, doc, ts))
})
.try_collect()?;
Ok((developer_results, interval_remaining))
Ok((developer_results, cursor))
}
}
15 changes: 8 additions & 7 deletions crates/database/src/query/index_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,19 @@ impl<T: QueryType> IndexRange<T> {
) -> Self {
// unfetched_interval = intersection of interval with cursor_interval
let unfetched_interval = match &cursor_interval.curr_exclusive {
Some(CursorPosition::After(position)) => {
let (_, after_curr_cursor_position) = interval.split(position.clone(), order);
Some(cursor) => {
let (_, after_curr_cursor_position) = interval.split(cursor.clone(), order);
after_curr_cursor_position
},
Some(CursorPosition::End) => Interval::empty(),
None => interval.clone(),
};
let unfetched_interval = match &cursor_interval.end_inclusive {
Some(CursorPosition::After(position)) => {
Some(cursor) => {
let (up_to_end_cursor_position, _) =
unfetched_interval.split(position.clone(), order);
unfetched_interval.split(cursor.clone(), order);
up_to_end_cursor_position
},
Some(CursorPosition::End) | None => unfetched_interval.clone(),
None => unfetched_interval.clone(),
};

Self {
Expand Down Expand Up @@ -204,7 +203,7 @@ impl<T: QueryType> IndexRange<T> {
}
max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read);
}
let (page, new_unfetched_interval) = T::index_range(
let (page, fetch_cursor) = T::index_range(
tx,
&self.stable_index_name,
&self.unfetched_interval,
Expand All @@ -213,6 +212,8 @@ impl<T: QueryType> IndexRange<T> {
self.version.clone(),
)
.await?;
let (_, new_unfetched_interval) =
self.unfetched_interval.split(fetch_cursor, self.order);
anyhow::ensure!(self.unfetched_interval != new_unfetched_interval);
self.unfetched_interval = new_unfetched_interval;
self.page_count += 1;
Expand Down
6 changes: 3 additions & 3 deletions crates/database/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub trait QueryType {
version: Option<Version>,
) -> anyhow::Result<(
Vec<(IndexKeyBytes, GenericDocument<Self::T>, WriteTimestamp)>,
Interval,
CursorPosition,
)>;

async fn get_with_ts<RT: Runtime>(
Expand Down Expand Up @@ -150,7 +150,7 @@ impl QueryType for Resolved {
_version: Option<Version>,
) -> anyhow::Result<(
Vec<(IndexKeyBytes, GenericDocument<Self::T>, WriteTimestamp)>,
Interval,
CursorPosition,
)> {
tx.index_range(stable_index_name, interval, order, max_rows)
.await
Expand Down Expand Up @@ -193,7 +193,7 @@ impl QueryType for Developer {
version: Option<Version>,
) -> anyhow::Result<(
Vec<(IndexKeyBytes, GenericDocument<Self::T>, WriteTimestamp)>,
Interval,
CursorPosition,
)> {
UserFacingModel::new(tx)
.index_range(stable_index_name, interval, order, max_rows, version)
Expand Down
11 changes: 6 additions & 5 deletions crates/database/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use common::{
},
persistence::RetentionValidator,
query::{
CursorPosition,
Order,
Search,
SearchVersion,
Expand Down Expand Up @@ -775,12 +776,12 @@ impl<RT: Runtime> Transaction<RT> {
let mut batch_result = BTreeMap::new();
for (batch_key, (id, table_name)) in ids {
let result: anyhow::Result<_> = try {
let (range_results, remaining) =
let (range_results, cursor) =
results.remove(&batch_key).context("expected result")??;
if range_results.len() > 1 {
Err(anyhow::anyhow!("Got multiple values for id {id:?}"))?;
}
if !remaining.is_empty() {
if !matches!(cursor, CursorPosition::End) {
Err(anyhow::anyhow!(
"Querying 2 items for a single id didn't exhaust interval for {id:?}"
))?;
Expand Down Expand Up @@ -971,10 +972,10 @@ impl<RT: Runtime> Transaction<RT> {
mut max_rows: usize,
) -> anyhow::Result<(
Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>,
Interval,
CursorPosition,
)> {
if interval.is_empty() {
return Ok((vec![], Interval::empty()));
return Ok((vec![], CursorPosition::End));
}
let tablet_index_name = match stable_index_name {
StableIndexName::Physical(tablet_index_name) => tablet_index_name,
Expand All @@ -985,7 +986,7 @@ impl<RT: Runtime> Transaction<RT> {
);
},
StableIndexName::Missing => {
return Ok((vec![], Interval::empty()));
return Ok((vec![], CursorPosition::End));
},
};
let index_name = tablet_index_name
Expand Down
Loading

0 comments on commit 345c543

Please sign in to comment.