diff --git a/crates/database/src/committer.rs b/crates/database/src/committer.rs index cd804dbb..24242496 100644 --- a/crates/database/src/committer.rs +++ b/crates/database/src/committer.rs @@ -35,8 +35,6 @@ use common::{ RetentionValidator, TimestampRange, }, - persistence_helpers::stream_revision_pairs, - query::Order, runtime::{ Runtime, RuntimeInstant, @@ -80,7 +78,6 @@ use futures::{ use indexing::index_registry::IndexRegistry; use parking_lot::Mutex; use prometheus::VMHistogram; -use search::SearchIndexManager; use usage_tracking::{ DocInVectorIndex, FunctionUsageTracker, @@ -94,7 +91,6 @@ use value::{ TableMapping, TableName, }; -use vector::VectorIndexManager; use crate::{ bootstrap_model::defaults::BootstrapTableIds, @@ -102,8 +98,16 @@ use crate::{ ConflictingReadWithWriteSource, ShutdownSignal, }, - metrics, + metrics::{ + self, + bootstrap_update_timer, + finish_bootstrap_update, + }, reads::ReadSet, + search_and_vector_bootstrap::{ + stream_revision_pairs_for_indexes, + BootstrappedSearchAndVectorIndexes, + }, snapshot_manager::SnapshotManager, transaction::FinalTransaction, write_log::{ @@ -270,12 +274,12 @@ impl Committer { self.bump_max_repeatable_ts(result); }, Some(CommitterMessage::FinishSearchAndVectorBootstrap { - search_index_manager, - vector_index_manager, bootstrap_ts, result, + bootstrapped_indexes, + bootstrap_ts, + result, }) => { self.finish_search_and_vector_bootstrap( - search_index_manager, - vector_index_manager, + bootstrapped_indexes, bootstrap_ts, result ).await; @@ -293,12 +297,16 @@ impl Committer { } async fn update_indexes_since_bootstrap( - search_index_manager: &mut SearchIndexManager, - vector_index_manager: &mut VectorIndexManager, + BootstrappedSearchAndVectorIndexes { + search_index_manager, + vector_index_manager, + tables_with_indexes, + }: &mut BootstrappedSearchAndVectorIndexes, bootstrap_ts: Timestamp, persistence: RepeatablePersistence, registry: &IndexRegistry, ) -> anyhow::Result<()> { + let _timer = bootstrap_update_timer(); anyhow::ensure!( !search_index_manager.is_bootstrapping(), "Trying to update search index while it's still bootstrapping" @@ -307,13 +315,17 @@ impl Committer { !vector_index_manager.is_bootstrapping(), "Trying to update vector index while it's still bootstrapping" ); - let range = (Bound::Excluded(bootstrap_ts), Bound::Unbounded); + let range = TimestampRange::new((Bound::Excluded(bootstrap_ts), Bound::Unbounded))?; - let document_stream = persistence.load_documents(TimestampRange::new(range)?, Order::Asc); - let revision_stream = stream_revision_pairs(document_stream, &persistence); + let revision_stream = + stream_revision_pairs_for_indexes(tables_with_indexes, &persistence, range); futures::pin_mut!(revision_stream); + let mut num_revisions = 0; + let mut total_size = 0; while let Some(revision_pair) = revision_stream.try_next().await? { + num_revisions += 1; + total_size += revision_pair.document().map(|d| d.size()).unwrap_or(0); search_index_manager.update( registry, revision_pair.prev_document(), @@ -327,13 +339,13 @@ impl Committer { WriteTimestamp::Committed(revision_pair.ts()), )?; } + finish_bootstrap_update(num_revisions, total_size); Ok(()) } async fn finish_search_and_vector_bootstrap( &mut self, - mut search_index_manager: SearchIndexManager, - mut vector_index_manager: VectorIndexManager, + mut bootstrapped_indexes: BootstrappedSearchAndVectorIndexes, bootstrap_ts: RepeatableTimestamp, result: oneshot::Sender>, ) { @@ -352,8 +364,7 @@ impl Committer { ); let res = Self::update_indexes_since_bootstrap( - &mut search_index_manager, - &mut vector_index_manager, + &mut bootstrapped_indexes, *bootstrap_ts, repeatable_persistence, &last_snapshot.index_registry, @@ -371,8 +382,8 @@ impl Committer { panic!("Snapshots were changed concurrently during commit?"); } snapshot_manager.overwrite_last_snapshot_search_and_vector_indexes( - search_index_manager, - vector_index_manager, + bootstrapped_indexes.search_index_manager, + bootstrapped_indexes.vector_index_manager, ); tracing::info!("Committed backfilled vector indexes"); let _ = result.send(Ok(())); @@ -814,14 +825,12 @@ impl Clone for CommitterClient { impl CommitterClient { pub async fn finish_search_and_vector_bootstrap( &self, - search_index_manager: SearchIndexManager, - vector_index_manager: VectorIndexManager, + bootstrapped_indexes: BootstrappedSearchAndVectorIndexes, bootstrap_ts: RepeatableTimestamp, ) -> anyhow::Result<()> { let (tx, rx) = oneshot::channel(); let message = CommitterMessage::FinishSearchAndVectorBootstrap { - search_index_manager, - vector_index_manager, + bootstrapped_indexes, bootstrap_ts, result: tx, }; @@ -977,8 +986,7 @@ enum CommitterMessage { result: oneshot::Sender>, }, FinishSearchAndVectorBootstrap { - search_index_manager: SearchIndexManager, - vector_index_manager: VectorIndexManager, + bootstrapped_indexes: BootstrappedSearchAndVectorIndexes, bootstrap_ts: RepeatableTimestamp, result: oneshot::Sender>, }, diff --git a/crates/database/src/metrics.rs b/crates/database/src/metrics.rs index aaa9a6e2..c1488de3 100644 --- a/crates/database/src/metrics.rs +++ b/crates/database/src/metrics.rs @@ -586,13 +586,40 @@ pub fn bootstrap_timer() -> StatusTimer { StatusTimer::new(&SEARCH_AND_VECTOR_BOOTSTRAP_SECONDS) } +register_convex_histogram!( + SEARCH_AND_VECTOR_BOOTSTRAP_COMMITTER_UPDATE_SECONDS, + "Time to update search and vector index bootstrap in the committer" +); +pub fn bootstrap_update_timer() -> Timer { + Timer::new(&SEARCH_AND_VECTOR_BOOTSTRAP_COMMITTER_UPDATE_SECONDS) +} +register_convex_counter!( + SEARCH_AND_VECTOR_BOOTSTRAP_COMMITTER_UPDATE_REVISIONS_TOTAL, + "Number of revisions loaded during search and vector bootstrap updates in the committer" +); +register_convex_counter!( + SEARCH_AND_VECTOR_BOOTSTRAP_COMMITTER_UPDATE_REVISIONS_BYTES, + "Total size of revisions loaded during search and vector bootstrap updates in the committer" +); + +pub fn finish_bootstrap_update(num_revisions: usize, bytes: usize) { + log_counter( + &SEARCH_AND_VECTOR_BOOTSTRAP_COMMITTER_UPDATE_REVISIONS_TOTAL, + num_revisions as u64, + ); + log_counter( + &SEARCH_AND_VECTOR_BOOTSTRAP_COMMITTER_UPDATE_REVISIONS_BYTES, + bytes as u64, + ); +} + register_convex_counter!( SEARCH_AND_VECTOR_BOOTSTRAP_REVISIONS_TOTAL, - "Number of revisions loaded during vector bootstrap" + "Number of revisions loaded during search and vector bootstrap" ); register_convex_counter!( SEARCH_AND_VECTOR_BOOTSTRAP_REVISIONS_BYTES, - "Total size of revisions loaded during vector bootstrap" + "Total size of revisions loaded during search and vector bootstrap" ); pub fn finish_bootstrap(num_revisions: usize, bytes: usize, timer: StatusTimer) { log_counter( @@ -675,14 +702,11 @@ pub fn search_and_vector_bootstrap_timer() -> StatusTimer { } register_convex_counter!( - DATABASE_SEARCH_AND_VECTOR_BOOTSTRAP_DOCUMENTS_SKIPPED_TOTAL, + SEARCH_AND_VECTOR_BOOTSTRAP_DOCUMENTS_SKIPPED_TOTAL, "Number of documents skipped during vector and search index bootstrap", ); pub fn log_document_skipped() { - log_counter( - &DATABASE_SEARCH_AND_VECTOR_BOOTSTRAP_DOCUMENTS_SKIPPED_TOTAL, - 1, - ); + log_counter(&SEARCH_AND_VECTOR_BOOTSTRAP_DOCUMENTS_SKIPPED_TOTAL, 1); } pub mod vector { diff --git a/crates/database/src/search_and_vector_bootstrap.rs b/crates/database/src/search_and_vector_bootstrap.rs index 8d154275..6c6d3e66 100644 --- a/crates/database/src/search_and_vector_bootstrap.rs +++ b/crates/database/src/search_and_vector_bootstrap.rs @@ -3,7 +3,10 @@ use std::{ max, min, }, - collections::BTreeMap, + collections::{ + BTreeMap, + BTreeSet, + }, ops::Bound, time::Duration, }; @@ -41,6 +44,8 @@ use common::{ }; use errors::ErrorMetadataAnyhowExt; use futures::{ + future, + Stream, StreamExt, TryStreamExt, }; @@ -100,11 +105,23 @@ struct IndexesToBootstrap { oldest_index_ts: Timestamp, } +pub struct BootstrappedSearchAndVectorIndexes { + pub search_index_manager: SearchIndexManager, + pub vector_index_manager: VectorIndexManager, + pub tables_with_indexes: BTreeSet, +} + impl IndexesToBootstrap { - fn into_search_and_vector_index_managers( - self, - persistence_version: PersistenceVersion, - ) -> (SearchIndexManager, VectorIndexManager) { + fn tables_with_indexes(&self) -> BTreeSet { + self.table_to_search_indexes + .keys() + .chain(self.table_to_vector_indexes.keys()) + .copied() + .collect() + } + + fn finish(self, persistence_version: PersistenceVersion) -> BootstrappedSearchAndVectorIndexes { + let tables_with_indexes = self.tables_with_indexes(); let search_index_manager = SearchIndexManager::new( SearchIndexManagerState::Ready( self.table_to_search_indexes @@ -146,7 +163,11 @@ impl IndexesToBootstrap { .collect(), ); let vector_index_manager = VectorIndexManager { indexes }; - (search_index_manager, vector_index_manager) + BootstrappedSearchAndVectorIndexes { + search_index_manager, + vector_index_manager, + tables_with_indexes, + } } } @@ -238,6 +259,24 @@ impl VectorIndexBootstrapData { } } +/// Streams revision pairs for documents in the indexed tables. +pub fn stream_revision_pairs_for_indexes<'a>( + tables_with_indexes: &'a BTreeSet, + persistence: &'a RepeatablePersistence, + range: TimestampRange, +) -> impl Stream> + 'a { + let document_stream = persistence + .load_documents(range, Order::Asc) + .try_filter(|(_, id, _)| { + let is_in_indexed_table = tables_with_indexes.contains(id.table()); + if !is_in_indexed_table { + log_document_skipped(); + } + future::ready(tables_with_indexes.contains(id.table())) + }); + stream_revision_pairs(document_stream, persistence) +} + impl SearchAndVectorIndexBootstrapWorker { pub(crate) fn new( runtime: RT, @@ -291,13 +330,17 @@ impl SearchAndVectorIndexBootstrapWorker { } async fn run(&mut self) -> anyhow::Result<()> { - let (search_index_manager, vector_index_manager) = self.bootstrap_manager().await?; + let bootstrapped_indexes = self.bootstrap().await?; self.pause_client.wait(FINISHED_BOOTSTRAP_UPDATES).await; - self.finish_bootstrap(search_index_manager, vector_index_manager) + self.committer_client + .finish_search_and_vector_bootstrap( + bootstrapped_indexes, + self.persistence.upper_bound(), + ) .await } - async fn bootstrap_manager(&self) -> anyhow::Result<(SearchIndexManager, VectorIndexManager)> { + async fn bootstrap(&self) -> anyhow::Result { // Load all of the fast forward timestamps first to ensure that we stay within // the comparatively short valid time for the persistence snapshot let snapshot = self @@ -326,7 +369,7 @@ impl SearchAndVectorIndexBootstrapWorker { indexes_with_fast_forward_ts, )?; - Self::bootstrap(&self.persistence, indexes_to_bootstrap).await + Self::bootstrap_inner(&self.persistence, indexes_to_bootstrap).await } fn indexes_to_bootstrap( @@ -448,33 +491,32 @@ impl SearchAndVectorIndexBootstrapWorker { }) } - async fn bootstrap( + async fn bootstrap_inner( persistence: &RepeatablePersistence, mut indexes_to_bootstrap: IndexesToBootstrap, - ) -> anyhow::Result<(SearchIndexManager, VectorIndexManager)> { + ) -> anyhow::Result { let _status = log_worker_starting("SearchAndVectorBootstrap"); let timer = crate::metrics::bootstrap_timer(); let upper_bound = persistence.upper_bound(); let mut num_revisions = 0; let mut total_size = 0; - let range = ( + let range = TimestampRange::new(( Bound::Excluded(indexes_to_bootstrap.oldest_index_ts), Bound::Included(*upper_bound), - ); - let document_stream = persistence.load_documents(TimestampRange::new(range)?, Order::Asc); - let revision_stream = stream_revision_pairs(document_stream, persistence); + ))?; + let tables_with_indexes = indexes_to_bootstrap.tables_with_indexes(); + let revision_stream = + stream_revision_pairs_for_indexes(&tables_with_indexes, persistence, range); futures::pin_mut!(revision_stream); while let Some(revision_pair) = revision_stream.try_next().await? { num_revisions += 1; total_size += revision_pair.document().map(|d| d.size()).unwrap_or(0); - let mut revision_not_in_indexes = true; if let Some(vector_indexes_to_update) = indexes_to_bootstrap .table_to_vector_indexes .get_mut(revision_pair.id.table()) { - revision_not_in_indexes = false; for vector_index in vector_indexes_to_update { vector_index.update(&revision_pair)?; } @@ -483,14 +525,10 @@ impl SearchAndVectorIndexBootstrapWorker { .table_to_search_indexes .get_mut(revision_pair.id.table()) { - revision_not_in_indexes = false; for search_index in search_indexes_to_update { search_index.update(&revision_pair)?; } } - if revision_not_in_indexes { - log_document_skipped() - } } tracing::info!( @@ -499,21 +537,7 @@ impl SearchAndVectorIndexBootstrapWorker { ); crate::metrics::finish_bootstrap(num_revisions, total_size, timer); - Ok(indexes_to_bootstrap.into_search_and_vector_index_managers(persistence.version())) - } - - async fn finish_bootstrap( - &self, - search_index_manager: SearchIndexManager, - vector_index_manager: VectorIndexManager, - ) -> anyhow::Result<()> { - self.committer_client - .finish_search_and_vector_bootstrap( - search_index_manager, - vector_index_manager, - self.persistence.upper_bound(), - ) - .await + Ok(indexes_to_bootstrap.finish(persistence.version())) } } @@ -725,10 +749,14 @@ mod tests { let db = reopen_db(&rt, &fixtures).await?; let worker = db.new_search_and_vector_bootstrap_worker_for_testing(); - let (search_index_manager, vector_index_manager) = worker.bootstrap_manager().await?; + let bootstrapped_indexes = worker.bootstrap().await?; let vector_id = add_vector(&db, &index_metadata, [3f32, 4f32]).await?; worker - .finish_bootstrap(search_index_manager, vector_index_manager) + .committer_client + .finish_search_and_vector_bootstrap( + bootstrapped_indexes, + worker.persistence.upper_bound(), + ) .await?; let result = query_vectors(&db, &index_metadata).await?;