Skip to content

Commit

Permalink
[transaction] run fetches in parallel (#23520)
Browse files Browse the repository at this point in the history
using the same pattern as the rest of Transaction batching, we do a loop over the batch at the top of the function, and another loop at the end. But the middle is special:

use a `futures::join_all` to run the persistence fetches in parallel.

this is safe because `fetch_cache_misses` has no side effects -- it takes in the requested range and the cached values, it populates the cache misses with fetches, and returns results.

🥳

GitOrigin-RevId: bd872649f98ea98a9affa3559ec12efb31f5e121
  • Loading branch information
ldanilek authored and Convex, Inc. committed Mar 15, 2024
1 parent 3244f43 commit 45d0935
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/common/src/knobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ pub static SCHEDULED_JOB_RETENTION: LazyLock<Duration> = LazyLock::new(|| {
pub static SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE: LazyLock<usize> =
LazyLock::new(|| env_config("SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE", 1000));

/// Maximum number of syscalls that can run in a batch together when
/// awaited in parallel. Higher values improve latency, while lower ones
/// protect one isolate from hogging database connections.
pub static MAX_SYSCALL_BATCH_SIZE: LazyLock<usize> =
LazyLock::new(|| env_config("MAX_SYSCALL_BATCH_SIZE", 16));

/// Number of rows that can be read in a transaction.
pub static TRANSACTION_MAX_READ_SIZE_ROWS: LazyLock<usize> =
LazyLock::new(|| env_config("TRANSACTION_MAX_READ_SIZE_ROWS", 16384));
Expand Down
7 changes: 1 addition & 6 deletions crates/database/src/transaction_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,7 @@ impl TransactionIndex {
)>,
> {
let snapshot = &mut self.database_index_snapshot;
let mut snapshot_results = BTreeMap::new();
for (batch_key, range_request) in ranges.clone() {
// TODO(lee) thread the batching down all the way to persistence.
// This is faux-batching for now, to establish the interface.
snapshot_results.insert(batch_key, snapshot.range(range_request).await);
}
let mut snapshot_results = snapshot.range_batch(ranges.clone()).await;

let batch_size = ranges.len();
let mut results = BTreeMap::new();
Expand Down
1 change: 1 addition & 0 deletions crates/indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ futures = { workspace = true }
futures-async-stream = { workspace = true }
imbl = { workspace = true }
itertools = { workspace = true }
maplit = { workspace = true }
metrics = { path = "../metrics" }
tracing = { workspace = true }
value = { path = "../value" }
Expand Down
122 changes: 91 additions & 31 deletions crates/indexing/src/backend_in_memory_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use errors::ErrorMetadata;
use futures::TryStreamExt;
use imbl::OrdMap;
use itertools::Itertools;
use maplit::btreemap;
use value::{
ResolvedDocumentId,
TableId,
Expand Down Expand Up @@ -355,26 +356,32 @@ impl DatabaseIndexSnapshot {
}
}

/// Query the given index at the snapshot.
pub async fn range(
&mut self,
async fn start_range_fetch(
&self,
range_request: RangeRequest,
) -> anyhow::Result<(
Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>,
CursorPosition,
)> {
) -> anyhow::Result<
// Ok means we have a result immediately, Err means we need to fetch.
Result<
(
Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>,
CursorPosition,
),
(IndexId, RangeRequest, Vec<DatabaseIndexSnapshotCacheResult>),
>,
> {
let index = match self.index_registry.require_enabled(
&range_request.index_name,
&range_request.printable_index_name,
) {
Ok(index) => index,
// Allow default system defined indexes on all tables other than the _index table.
// Allow default system defined indexes on all tables other than the _index
// table.
Err(_)
if range_request.index_name.table()
!= &self.index_registry.index_table().table_id
&& range_request.index_name.is_by_id_or_creation_time() =>
{
return Ok((vec![], CursorPosition::End));
return Ok(Ok((vec![], CursorPosition::End)));
},
Err(e) => anyhow::bail!(e),
};
Expand Down Expand Up @@ -406,35 +413,86 @@ impl DatabaseIndexSnapshot {
)
.await?
{
return Ok((range, CursorPosition::End));
return Ok(Ok((range, CursorPosition::End)));
}

// Next, try the transaction cache.
let cache_results =
self.cache
.get(index.id(), &range_request.interval, range_request.order);
Ok(Err((index.id(), range_request, cache_results)))
}

let (results, cache_miss_results, cursor) = self
.fetch_cache_misses(index.id(), range_request.clone(), cache_results)
.await?;
/// Query the given index at the snapshot.
pub async fn range_batch(
&mut self,
range_requests: BTreeMap<BatchKey, RangeRequest>,
) -> BTreeMap<
BatchKey,
anyhow::Result<(
Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>,
CursorPosition,
)>,
> {
let batch_size = range_requests.len();
let mut ranges_to_fetch = BTreeMap::new();
let mut results = BTreeMap::new();

for (batch_key, range_request) in range_requests {
let result = self.start_range_fetch(range_request).await;
match result {
Err(e) => {
results.insert(batch_key, Err(e));
},
Ok(Ok(result)) => {
results.insert(batch_key, Ok(result));
},
Ok(Err(to_fetch)) => {
ranges_to_fetch.insert(batch_key, to_fetch);
},
}
}

for (ts, doc) in cache_miss_results.into_iter() {
// Populate all index point lookups that can result in the given
// document.
for (some_index, index_key) in self.index_registry.index_keys(&doc) {
let self_ = &*self;
let fetch_results = futures::future::join_all(ranges_to_fetch.into_iter().map(
|(batch_key, (index_id, range_request, cache_results))| async move {
let fetch_result = self_
.fetch_cache_misses(index_id, range_request.clone(), cache_results)
.await;
(batch_key, index_id, range_request, fetch_result)
},
))
.await;

for (batch_key, index_id, range_request, fetch_result) in fetch_results {
let result: anyhow::Result<_> = try {
let (fetch_result_vec, cache_miss_results, cursor) = fetch_result?;
for (ts, doc) in cache_miss_results.into_iter() {
// Populate all index point lookups that can result in the given
// document.
for (some_index, index_key) in self.index_registry.index_keys(&doc) {
self.cache.populate(
some_index.id(),
index_key.into_bytes(),
ts,
doc.clone(),
);
}
}
let (interval_read, _) = range_request
.interval
.split(cursor.clone(), range_request.order);
// After all documents in an index interval have been
// added to the cache with `populate_cache`, record the entire interval as
// being populated.
self.cache
.populate(some_index.id(), index_key.into_bytes(), ts, doc.clone());
}
.record_interval_populated(index_id, interval_read);
(fetch_result_vec, cursor)
};
results.insert(batch_key, result);
}
let (interval_read, _) = range_request
.interval
.split(cursor.clone(), range_request.order);
// After all documents in an index interval have been
// added to the cache with `populate_cache`, record the entire interval as
// being populated.
self.cache
.record_interval_populated(index.id(), interval_read);
Ok((results, cursor))
assert_eq!(results.len(), batch_size);
results
}

async fn fetch_cache_misses(
Expand Down Expand Up @@ -505,14 +563,16 @@ impl DatabaseIndexSnapshot {
// We call next() twice due to the verification below.
let max_size = 2;
let (stream, cursor) = self
.range(RangeRequest {
.range_batch(btreemap! { 0 => RangeRequest {
index_name,
printable_index_name,
interval: range,
order: Order::Asc,
max_size,
})
.await?;
}})
.await
.remove(&0)
.context("batch_key missing")??;
let mut stream = stream.into_iter();
match stream.next() {
Some((key, ts, doc)) => {
Expand Down
5 changes: 2 additions & 3 deletions crates/isolate/src/environment/udf/async_syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
use anyhow::Context;
use common::{
document::GenericDocument,
knobs::MAX_SYSCALL_BATCH_SIZE,
query::{
Cursor,
CursorPosition,
Expand Down Expand Up @@ -120,8 +121,6 @@ pub enum AsyncSyscallBatch {
Unbatched { name: String, args: JsonValue },
}

const MAX_SYSCALL_BATCH_SIZE: usize = 8;

impl AsyncSyscallBatch {
pub fn new(name: String, args: JsonValue) -> Self {
match &*name {
Expand All @@ -131,7 +130,7 @@ impl AsyncSyscallBatch {
}

pub fn can_push(&self, name: &str, _args: &JsonValue) -> bool {
if self.len() >= MAX_SYSCALL_BATCH_SIZE {
if self.len() >= *MAX_SYSCALL_BATCH_SIZE {
return false;
}
match (self, name) {
Expand Down

0 comments on commit 45d0935

Please sign in to comment.