diff --git a/Cargo.lock b/Cargo.lock index ff773042..31d2e077 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3158,6 +3158,7 @@ dependencies = [ "futures-async-stream", "imbl", "itertools 0.12.1", + "maplit", "metrics", "must-let", "runtime", diff --git a/crates/common/src/knobs.rs b/crates/common/src/knobs.rs index ec72b6f8..81c30baa 100644 --- a/crates/common/src/knobs.rs +++ b/crates/common/src/knobs.rs @@ -237,6 +237,12 @@ pub static SCHEDULED_JOB_RETENTION: LazyLock = LazyLock::new(|| { pub static SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE: LazyLock = LazyLock::new(|| env_config("SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE", 1000)); +/// Maximum number of syscalls that can run in a batch together when +/// awaited in parallel. Higher values improve latency, while lower ones +/// protect one isolate from hogging database connections. +pub static MAX_SYSCALL_BATCH_SIZE: LazyLock = + LazyLock::new(|| env_config("MAX_SYSCALL_BATCH_SIZE", 16)); + /// Number of rows that can be read in a transaction. pub static TRANSACTION_MAX_READ_SIZE_ROWS: LazyLock = LazyLock::new(|| env_config("TRANSACTION_MAX_READ_SIZE_ROWS", 16384)); diff --git a/crates/database/src/transaction_index.rs b/crates/database/src/transaction_index.rs index 872c1b83..ea3acdd4 100644 --- a/crates/database/src/transaction_index.rs +++ b/crates/database/src/transaction_index.rs @@ -135,12 +135,7 @@ impl TransactionIndex { )>, > { let snapshot = &mut self.database_index_snapshot; - let mut snapshot_results = BTreeMap::new(); - for (batch_key, range_request) in ranges.clone() { - // TODO(lee) thread the batching down all the way to persistence. - // This is faux-batching for now, to establish the interface. - snapshot_results.insert(batch_key, snapshot.range(range_request).await); - } + let mut snapshot_results = snapshot.range_batch(ranges.clone()).await; let batch_size = ranges.len(); let mut results = BTreeMap::new(); diff --git a/crates/indexing/Cargo.toml b/crates/indexing/Cargo.toml index 44c35e33..3bf354da 100644 --- a/crates/indexing/Cargo.toml +++ b/crates/indexing/Cargo.toml @@ -17,6 +17,7 @@ futures = { workspace = true } futures-async-stream = { workspace = true } imbl = { workspace = true } itertools = { workspace = true } +maplit = { workspace = true } metrics = { path = "../metrics" } tracing = { workspace = true } value = { path = "../value" } diff --git a/crates/indexing/src/backend_in_memory_indexes.rs b/crates/indexing/src/backend_in_memory_indexes.rs index c93c33ab..71392f94 100644 --- a/crates/indexing/src/backend_in_memory_indexes.rs +++ b/crates/indexing/src/backend_in_memory_indexes.rs @@ -50,6 +50,7 @@ use errors::ErrorMetadata; use futures::TryStreamExt; use imbl::OrdMap; use itertools::Itertools; +use maplit::btreemap; use value::{ ResolvedDocumentId, TableId, @@ -355,26 +356,32 @@ impl DatabaseIndexSnapshot { } } - /// Query the given index at the snapshot. - pub async fn range( - &mut self, + async fn start_range_fetch( + &self, range_request: RangeRequest, - ) -> anyhow::Result<( - Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>, - CursorPosition, - )> { + ) -> anyhow::Result< + // Ok means we have a result immediately, Err means we need to fetch. + Result< + ( + Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>, + CursorPosition, + ), + (IndexId, RangeRequest, Vec), + >, + > { let index = match self.index_registry.require_enabled( &range_request.index_name, &range_request.printable_index_name, ) { Ok(index) => index, - // Allow default system defined indexes on all tables other than the _index table. + // Allow default system defined indexes on all tables other than the _index + // table. Err(_) if range_request.index_name.table() != &self.index_registry.index_table().table_id && range_request.index_name.is_by_id_or_creation_time() => { - return Ok((vec![], CursorPosition::End)); + return Ok(Ok((vec![], CursorPosition::End))); }, Err(e) => anyhow::bail!(e), }; @@ -406,35 +413,86 @@ impl DatabaseIndexSnapshot { ) .await? { - return Ok((range, CursorPosition::End)); + return Ok(Ok((range, CursorPosition::End))); } // Next, try the transaction cache. let cache_results = self.cache .get(index.id(), &range_request.interval, range_request.order); + Ok(Err((index.id(), range_request, cache_results))) + } - let (results, cache_miss_results, cursor) = self - .fetch_cache_misses(index.id(), range_request.clone(), cache_results) - .await?; + /// Query the given index at the snapshot. + pub async fn range_batch( + &mut self, + range_requests: BTreeMap, + ) -> BTreeMap< + BatchKey, + anyhow::Result<( + Vec<(IndexKeyBytes, Timestamp, ResolvedDocument)>, + CursorPosition, + )>, + > { + let batch_size = range_requests.len(); + let mut ranges_to_fetch = BTreeMap::new(); + let mut results = BTreeMap::new(); + + for (batch_key, range_request) in range_requests { + let result = self.start_range_fetch(range_request).await; + match result { + Err(e) => { + results.insert(batch_key, Err(e)); + }, + Ok(Ok(result)) => { + results.insert(batch_key, Ok(result)); + }, + Ok(Err(to_fetch)) => { + ranges_to_fetch.insert(batch_key, to_fetch); + }, + } + } - for (ts, doc) in cache_miss_results.into_iter() { - // Populate all index point lookups that can result in the given - // document. - for (some_index, index_key) in self.index_registry.index_keys(&doc) { + let self_ = &*self; + let fetch_results = futures::future::join_all(ranges_to_fetch.into_iter().map( + |(batch_key, (index_id, range_request, cache_results))| async move { + let fetch_result = self_ + .fetch_cache_misses(index_id, range_request.clone(), cache_results) + .await; + (batch_key, index_id, range_request, fetch_result) + }, + )) + .await; + + for (batch_key, index_id, range_request, fetch_result) in fetch_results { + let result: anyhow::Result<_> = try { + let (fetch_result_vec, cache_miss_results, cursor) = fetch_result?; + for (ts, doc) in cache_miss_results.into_iter() { + // Populate all index point lookups that can result in the given + // document. + for (some_index, index_key) in self.index_registry.index_keys(&doc) { + self.cache.populate( + some_index.id(), + index_key.into_bytes(), + ts, + doc.clone(), + ); + } + } + let (interval_read, _) = range_request + .interval + .split(cursor.clone(), range_request.order); + // After all documents in an index interval have been + // added to the cache with `populate_cache`, record the entire interval as + // being populated. self.cache - .populate(some_index.id(), index_key.into_bytes(), ts, doc.clone()); - } + .record_interval_populated(index_id, interval_read); + (fetch_result_vec, cursor) + }; + results.insert(batch_key, result); } - let (interval_read, _) = range_request - .interval - .split(cursor.clone(), range_request.order); - // After all documents in an index interval have been - // added to the cache with `populate_cache`, record the entire interval as - // being populated. - self.cache - .record_interval_populated(index.id(), interval_read); - Ok((results, cursor)) + assert_eq!(results.len(), batch_size); + results } async fn fetch_cache_misses( @@ -505,14 +563,16 @@ impl DatabaseIndexSnapshot { // We call next() twice due to the verification below. let max_size = 2; let (stream, cursor) = self - .range(RangeRequest { + .range_batch(btreemap! { 0 => RangeRequest { index_name, printable_index_name, interval: range, order: Order::Asc, max_size, - }) - .await?; + }}) + .await + .remove(&0) + .context("batch_key missing")??; let mut stream = stream.into_iter(); match stream.next() { Some((key, ts, doc)) => { diff --git a/crates/isolate/src/environment/udf/async_syscall.rs b/crates/isolate/src/environment/udf/async_syscall.rs index 8aca00bb..8d250dfb 100644 --- a/crates/isolate/src/environment/udf/async_syscall.rs +++ b/crates/isolate/src/environment/udf/async_syscall.rs @@ -11,6 +11,7 @@ use std::{ use anyhow::Context; use common::{ document::GenericDocument, + knobs::MAX_SYSCALL_BATCH_SIZE, query::{ Cursor, CursorPosition, @@ -120,8 +121,6 @@ pub enum AsyncSyscallBatch { Unbatched { name: String, args: JsonValue }, } -const MAX_SYSCALL_BATCH_SIZE: usize = 8; - impl AsyncSyscallBatch { pub fn new(name: String, args: JsonValue) -> Self { match &*name { @@ -131,7 +130,7 @@ impl AsyncSyscallBatch { } pub fn can_push(&self, name: &str, _args: &JsonValue) -> bool { - if self.len() >= MAX_SYSCALL_BATCH_SIZE { + if self.len() >= *MAX_SYSCALL_BATCH_SIZE { return false; } match (self, name) {