diff --git a/crates/tabby-common/src/index.rs b/crates/tabby-common/src/index.rs index d6f1b5736b4e..733c0dbef733 100644 --- a/crates/tabby-common/src/index.rs +++ b/crates/tabby-common/src/index.rs @@ -20,7 +20,7 @@ pub struct CodeSearchSchema { pub field_git_url: Field, pub field_filepath: Field, /// Indexed field uniquely identifying a file in a repository, stringified SourceFileKey - pub field_file_id: Field, + pub field_source_file_key: Field, pub field_language: Field, pub field_body: Field, } @@ -38,7 +38,7 @@ impl CodeSearchSchema { let field_git_url = builder.add_text_field("git_url", STRING | STORED); let field_filepath = builder.add_text_field("filepath", STRING | STORED); - let field_file_id = builder.add_text_field("file_id", STRING | STORED); + let field_source_file_key = builder.add_text_field("file_id", STRING | STORED); let field_language = builder.add_text_field("language", STRING | STORED); let field_body = builder.add_text_field("body", code_options); let schema = builder.build(); @@ -47,7 +47,7 @@ impl CodeSearchSchema { schema, field_git_url, field_filepath, - field_file_id, + field_source_file_key, field_language, field_body, } diff --git a/crates/tabby-scheduler/src/cache.rs b/crates/tabby-scheduler/src/cache.rs index ba46a387a4a3..873d6700b6c6 100644 --- a/crates/tabby-scheduler/src/cache.rs +++ b/crates/tabby-scheduler/src/cache.rs @@ -29,7 +29,7 @@ fn get_git_hash(path: &Path) -> Result { } #[derive(Deserialize, Serialize, Debug)] -pub(crate) struct SourceFileKey { +struct SourceFileKey { path: PathBuf, language: String, git_hash: String, @@ -89,20 +89,25 @@ impl CacheStore { .expect("Failed to access indexed files bucket") } - pub fn is_indexed(&self, key: &SourceFileKey) -> bool { - self.index_bucket() - .contains(&key.to_string()) - .expect("Failed to read index bucket") + pub fn check_indexed(&self, path: &Path) -> (String, bool) { + let key = SourceFileKey::try_from(path) + .expect("Failed to create source file key") + .to_string(); + let indexed = self + .index_bucket() + .contains(&key) + .expect("Failed to read index bucket"); + (key, indexed) } - pub fn set_indexed(&self, key: &SourceFileKey) { + pub fn set_indexed(&self, batch: Batch) { self.index_bucket() - .set(&key.to_string(), &String::new()) - .expect("Failed to write to index bucket"); + .batch(batch) + .expect("Failed to commit batched index update") } - pub fn cleanup_old_indexed_files(&self, key_remover: impl Fn(&String)) { - info!("Cleaning up indexed file cache"); + pub fn garbage_collection_for_indexed_files(&self, key_remover: impl Fn(&String)) { + info!("Started cleaning up 'indexed_files' bucket"); let bucket = self.index_bucket(); let mut batch = Batch::new(); @@ -123,13 +128,9 @@ impl CacheStore { } }) .inspect(key_remover) - .for_each(|key| { - batch - .remove(&key) - .expect("Failed to remove indexed source file") - }); + .for_each(|key| batch.remove(&key).expect("Failed to remove key")); - info!("Finished cleaning up indexed files: {num_keep} items kept, {num_removed} items removed"); + info!("Finished garbage collection for 'indexed_files': {num_keep} items kept, {num_removed} items removed"); bucket .batch(batch) .expect("Failed to execute batched delete"); @@ -163,8 +164,8 @@ impl CacheStore { } } - pub fn cleanup_old_source_files(&self) { - info!("Cleaning up synced file cache"); + pub fn garbage_collection_for_source_files(&self) { + info!("Started cleaning up 'source_files' bucket"); let bucket: Bucket> = self .store .bucket(Some(SOURCE_FILE_BUCKET_KEY)) @@ -190,7 +191,7 @@ impl CacheStore { .for_each(|key| batch.remove(&key).expect("Failed to remove key")); info!( - "Finished garbage collection: {} items kept, {} items removed", + "Finished garbage collection for 'source_files': {} items kept, {} items removed", num_keep, num_removed ); bucket.batch(batch).expect("to batch remove staled files"); diff --git a/crates/tabby-scheduler/src/index.rs b/crates/tabby-scheduler/src/index.rs index 88a06f85066a..0a63b3a89456 100644 --- a/crates/tabby-scheduler/src/index.rs +++ b/crates/tabby-scheduler/src/index.rs @@ -2,6 +2,7 @@ use std::{fs, io::IsTerminal, path::Path}; use ignore::Walk; use kdam::BarExt; +use kv::Batch; use tabby_common::{ config::RepositoryConfig, index::{register_tokenizers, CodeSearchSchema}, @@ -10,11 +11,7 @@ use tabby_common::{ use tantivy::{directory::MmapDirectory, doc, Index, Term}; use tracing::{debug, warn}; -use crate::{ - cache::{CacheStore, SourceFileKey}, - code::CodeIntelligence, - utils::tqdm, -}; +use crate::{cache::CacheStore, code::CodeIntelligence, utils::tqdm}; // Magic numbers static MAX_LINE_LENGTH_THRESHOLD: usize = 300; @@ -41,6 +38,7 @@ pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) { .then(|| tqdm(total_file_size)); let intelligence = CodeIntelligence::default(); + let mut indexed_files_batch = Batch::new(); for repository in config { for file in Walk::new(repository.dir()) { let file = match file { @@ -56,10 +54,9 @@ pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) { if !is_valid_file(&source_file) { continue; } - let file_id = - SourceFileKey::try_from(file.path()).expect("Failed to create source file key"); + let (file_id, indexed) = cache.check_indexed(file.path()); - if cache.is_indexed(&file_id) { + if indexed { continue; } let text = match source_file.read_content() { @@ -82,25 +79,45 @@ pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) { writer .add_document(doc! { code.field_git_url => source_file.git_url.clone(), - code.field_file_id => file_id.to_string(), + code.field_source_file_key => file_id.to_string(), code.field_filepath => source_file.filepath.clone(), code.field_language => source_file.language.clone(), code.field_body => body, }) .expect("Failed to add document"); } - cache.set_indexed(&file_id); + indexed_files_batch + .set(&file_id, &String::new()) + .expect("Failed to mark file as indexed"); } } - cache.cleanup_old_indexed_files(|key| { - writer.delete_term(Term::from_field_text(code.field_file_id, key)); - }); - + // Commit updating indexed documents writer.commit().expect("Failed to commit index"); writer .wait_merging_threads() .expect("Failed to wait for merging threads"); + + // Mark all indexed documents as indexed + cache.set_indexed(indexed_files_batch); + + // Create a new writer to commit deletion of removed indexed files + let mut writer = index + .writer(150_000_000) + .expect("Failed to create index writer"); + + cache.garbage_collection_for_indexed_files(|key| { + writer.delete_term(Term::from_field_text(code.field_source_file_key, key)); + }); + + // Commit garbage collection + writer + .commit() + .expect("Failed to commit garbage collection"); + + writer + .wait_merging_threads() + .expect("Failed to wait for merging threads on garbage collection"); } fn is_valid_file(file: &SourceFile) -> bool { diff --git a/crates/tabby-scheduler/src/lib.rs b/crates/tabby-scheduler/src/lib.rs index 63a6904e3a85..42a74f703c69 100644 --- a/crates/tabby-scheduler/src/lib.rs +++ b/crates/tabby-scheduler/src/lib.rs @@ -24,7 +24,7 @@ pub async fn scheduler(now: bool, access: T) { job_sync(&mut cache, &repositories); job_index(&mut cache, &repositories); - cache.cleanup_old_source_files(); + cache.garbage_collection_for_source_files(); } else { let access = Arc::new(access); let scheduler = JobScheduler::new() @@ -53,7 +53,7 @@ pub async fn scheduler(now: bool, access: T) { job_sync(&mut cache, &repositories); job_index(&mut cache, &repositories); - cache.cleanup_old_source_files(); + cache.garbage_collection_for_source_files(); }) }) .expect("Failed to create job"),