diff --git a/crates/tabby-common/src/index.rs b/crates/tabby-common/src/index.rs index a82984366296..733c0dbef733 100644 --- a/crates/tabby-common/src/index.rs +++ b/crates/tabby-common/src/index.rs @@ -19,6 +19,8 @@ pub struct CodeSearchSchema { pub schema: Schema, pub field_git_url: Field, pub field_filepath: Field, + /// Indexed field uniquely identifying a file in a repository, stringified SourceFileKey + pub field_source_file_key: Field, pub field_language: Field, pub field_body: Field, } @@ -36,6 +38,7 @@ impl CodeSearchSchema { let field_git_url = builder.add_text_field("git_url", STRING | STORED); let field_filepath = builder.add_text_field("filepath", STRING | STORED); + let field_source_file_key = builder.add_text_field("file_id", STRING | STORED); let field_language = builder.add_text_field("language", STRING | STORED); let field_body = builder.add_text_field("body", code_options); let schema = builder.build(); @@ -44,6 +47,7 @@ impl CodeSearchSchema { schema, field_git_url, field_filepath, + field_source_file_key, field_language, field_body, } diff --git a/crates/tabby-scheduler/src/cache.rs b/crates/tabby-scheduler/src/cache.rs index cc62af762888..f68ddd5021cc 100644 --- a/crates/tabby-scheduler/src/cache.rs +++ b/crates/tabby-scheduler/src/cache.rs @@ -2,6 +2,7 @@ use std::{ fs::read_to_string, path::{Path, PathBuf}, process::Command, + str::FromStr, }; use anyhow::{bail, Context, Result}; @@ -13,24 +14,31 @@ use tracing::{info, warn}; use crate::code::CodeIntelligence; const SOURCE_FILE_BUCKET_KEY: &str = "source_files"; +const INDEX_BUCKET_KEY: &str = "indexed_files"; + +fn cmd_stdout(cmd: &str, args: &[&str]) -> Result { + Ok( + String::from_utf8(Command::new(cmd).args(args).output()?.stdout)? + .trim() + .to_string(), + ) +} fn get_git_hash(path: &Path) -> Result { - let path = path.display().to_string(); - let output = Command::new("git").args(["hash-object", &path]).output()?; - Ok(String::from_utf8(output.stdout)?.trim().to_string()) + cmd_stdout("git", &["hash-object", &path.display().to_string()]) } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Debug)] struct SourceFileKey { path: PathBuf, language: String, git_hash: String, } -impl TryFrom<&str> for SourceFileKey { - type Error = serde_json::Error; +impl FromStr for SourceFileKey { + type Err = serde_json::Error; - fn try_from(s: &str) -> Result { + fn from_str(s: &str) -> Result { serde_json::from_str(s) } } @@ -75,6 +83,65 @@ impl CacheStore { } } + fn index_bucket(&self) -> Bucket { + self.store + .bucket(Some(INDEX_BUCKET_KEY)) + .expect("Failed to access indexed files bucket") + } + + pub fn check_indexed(&self, path: &Path) -> (String, bool) { + let key = SourceFileKey::try_from(path) + .expect("Failed to create source file key") + .to_string(); + let indexed = self + .index_bucket() + .contains(&key) + .expect("Failed to read index bucket"); + (key, indexed) + } + + pub fn apply_indexed(&self, batch: Batch) { + self.index_bucket() + .batch(batch) + .expect("Failed to commit batched index update") + } + + #[must_use] + pub fn prepare_garbage_collection_for_indexed_files( + &self, + key_remover: impl Fn(&String), + ) -> impl FnOnce() + '_ { + info!("Started cleaning up 'indexed_files' bucket"); + let bucket = self.index_bucket(); + let mut batch = Batch::new(); + + let mut num_keep = 0; + let mut num_removed = 0; + + bucket + .iter() + .filter_map(|item| { + let item = item.expect("Failed to read item"); + let item_key: String = item.key().expect("Failed to get key"); + if is_item_key_matched(&item_key) { + num_keep += 1; + None + } else { + num_removed += 1; + Some(item_key) + } + }) + .inspect(key_remover) + .for_each(|key| batch.remove(&key).expect("Failed to remove key")); + + info!("Finished garbage collection for 'indexed_files': {num_keep} items kept, {num_removed} items removed"); + move || { + bucket + .batch(batch) + .expect("Failed to execute batched delete") + } + } + pub fn get_source_file( &mut self, config: &RepositoryConfig, @@ -103,8 +170,8 @@ impl CacheStore { } } - pub fn garbage_collection(&self) { - info!("Running garbage collection"); + pub fn garbage_collection_for_source_files(&self) { + info!("Started cleaning up 'source_files' bucket"); let bucket: Bucket> = self .store .bucket(Some(SOURCE_FILE_BUCKET_KEY)) @@ -124,13 +191,13 @@ impl CacheStore { None } else { num_removed += 1; - Some(item.key().expect("Failed to get key")) + Some(item_key) } }) .for_each(|key| batch.remove(&key).expect("Failed to remove key")); info!( - "Finished garbage collection: {} items kept, {} items removed", + "Finished garbage collection for 'source_files': {} items kept, {} items removed", num_keep, num_removed ); bucket.batch(batch).expect("to batch remove staled files"); @@ -138,7 +205,7 @@ impl CacheStore { } fn is_item_key_matched(item_key: &str) -> bool { - let Ok(key) = SourceFileKey::try_from(item_key) else { + let Ok(key) = item_key.parse::() else { return false; }; diff --git a/crates/tabby-scheduler/src/index.rs b/crates/tabby-scheduler/src/index.rs index 90f7d080ff0a..d45d2997c638 100644 --- a/crates/tabby-scheduler/src/index.rs +++ b/crates/tabby-scheduler/src/index.rs @@ -1,33 +1,41 @@ use std::{fs, io::IsTerminal, path::Path}; +use ignore::Walk; use kdam::BarExt; +use kv::Batch; use tabby_common::{ config::RepositoryConfig, index::{register_tokenizers, CodeSearchSchema}, path, SourceFile, }; -use tantivy::{directory::MmapDirectory, doc, Index}; +use tantivy::{directory::MmapDirectory, doc, Index, Term}; use tracing::{debug, warn}; -use crate::{code::CodeIntelligence, utils::tqdm}; +use crate::{cache::CacheStore, code::CodeIntelligence, utils::tqdm}; // Magic numbers static MAX_LINE_LENGTH_THRESHOLD: usize = 300; static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32; -pub fn index_repositories(_config: &[RepositoryConfig]) { - let code = CodeSearchSchema::new(); - +pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) { + let code = CodeSearchSchema::default(); let index = open_or_create_index(&code, &path::index_dir()); - register_tokenizers(&index); + add_changed_documents(cache, &code, config, &index); + remove_staled_documents(cache, &code, &index); +} + +fn add_changed_documents( + cache: &mut CacheStore, + code: &CodeSearchSchema, + config: &[RepositoryConfig], + index: &Index, +) { + register_tokenizers(index); // Initialize the search index writer with an initial arena size of 150 MB. let mut writer = index .writer(150_000_000) .expect("Failed to create index writer"); - writer - .delete_all_documents() - .expect("Failed to delete all documents"); let total_file_size: usize = SourceFile::all() .filter(is_valid_file) @@ -39,36 +47,90 @@ pub fn index_repositories(_config: &[RepositoryConfig]) { .then(|| tqdm(total_file_size)); let intelligence = CodeIntelligence::default(); - for file in SourceFile::all().filter(is_valid_file) { - let text = match file.read_content() { - Ok(content) => content, - Err(e) => { - warn!("Failed to read content of '{}': {}", file.filepath, e); + let mut indexed_files_batch = Batch::new(); + for repository in config { + for file in Walk::new(repository.dir()) { + let file = match file { + Ok(file) => file, + Err(e) => { + warn!("Failed to walk file tree for indexing: {e}"); + continue; + } + }; + let Some(source_file) = cache.get_source_file(repository, file.path()) else { + continue; + }; + if !is_valid_file(&source_file) { continue; } - }; - - for body in intelligence.chunks(&text) { - pb.as_mut() - .map(|b| b.update(body.len())) - .transpose() - .expect("Failed to update progress bar"); - - writer - .add_document(doc!( - code.field_git_url => file.git_url.clone(), - code.field_filepath => file.filepath.clone(), - code.field_language => file.language.clone(), - code.field_body => body, - )) - .expect("Failed to add document"); + let (file_id, indexed) = cache.check_indexed(file.path()); + + if indexed { + continue; + } + let text = match source_file.read_content() { + Ok(content) => content, + Err(e) => { + warn!( + "Failed to read content of '{}': {}", + source_file.filepath, e + ); + continue; + } + }; + + for body in intelligence.chunks(&text) { + pb.as_mut() + .map(|b| b.update(body.len())) + .transpose() + .expect("Failed to update progress bar"); + + writer + .add_document(doc! { + code.field_git_url => source_file.git_url.clone(), + code.field_source_file_key => file_id.to_string(), + code.field_filepath => source_file.filepath.clone(), + code.field_language => source_file.language.clone(), + code.field_body => body, + }) + .expect("Failed to add document"); + } + indexed_files_batch + .set(&file_id, &String::new()) + .expect("Failed to mark file as indexed"); } } + // Commit updating indexed documents writer.commit().expect("Failed to commit index"); writer .wait_merging_threads() .expect("Failed to wait for merging threads"); + + // Mark all indexed documents as indexed + cache.apply_indexed(indexed_files_batch); +} + +pub fn remove_staled_documents(cache: &mut CacheStore, code: &CodeSearchSchema, index: &Index) { + // Create a new writer to commit deletion of removed indexed files + let mut writer = index + .writer(150_000_000) + .expect("Failed to create index writer"); + + let gc_commit = cache.prepare_garbage_collection_for_indexed_files(|key| { + writer.delete_term(Term::from_field_text(code.field_source_file_key, key)); + }); + + // Commit garbage collection + writer + .commit() + .expect("Failed to commit garbage collection"); + + writer + .wait_merging_threads() + .expect("Failed to wait for merging threads on garbage collection"); + + gc_commit(); } fn is_valid_file(file: &SourceFile) -> bool { diff --git a/crates/tabby-scheduler/src/lib.rs b/crates/tabby-scheduler/src/lib.rs index 5210b8474390..42a74f703c69 100644 --- a/crates/tabby-scheduler/src/lib.rs +++ b/crates/tabby-scheduler/src/lib.rs @@ -22,9 +22,9 @@ pub async fn scheduler(now: bool, access: T) { .await .expect("Must be able to retrieve repositories for sync"); job_sync(&mut cache, &repositories); - job_index(&repositories); + job_index(&mut cache, &repositories); - cache.garbage_collection(); + cache.garbage_collection_for_source_files(); } else { let access = Arc::new(access); let scheduler = JobScheduler::new() @@ -52,8 +52,8 @@ pub async fn scheduler(now: bool, access: T) { .expect("Must be able to retrieve repositories for sync"); job_sync(&mut cache, &repositories); - job_index(&repositories); - cache.garbage_collection(); + job_index(&mut cache, &repositories); + cache.garbage_collection_for_source_files(); }) }) .expect("Failed to create job"), @@ -69,9 +69,9 @@ pub async fn scheduler(now: bool, access: T) { } } -fn job_index(repositories: &[RepositoryConfig]) { +fn job_index(cache: &mut CacheStore, repositories: &[RepositoryConfig]) { println!("Indexing repositories..."); - index::index_repositories(repositories); + index::index_repositories(cache, repositories); } fn job_sync(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {