From f1083d6d6893d8ef793eccf5fe15b55ad0639484 Mon Sep 17 00:00:00 2001 From: boxbeam Date: Tue, 7 May 2024 16:30:13 -0400 Subject: [PATCH] Redesign incremental indexing --- crates/tabby-common/src/index.rs | 2 +- crates/tabby-common/src/lib.rs | 1 + crates/tabby-scheduler/src/cache.rs | 126 +++++++++--------- crates/tabby-scheduler/src/index.rs | 200 ++++++++-------------------- crates/tabby-scheduler/src/lib.rs | 4 +- 5 files changed, 126 insertions(+), 207 deletions(-) diff --git a/crates/tabby-common/src/index.rs b/crates/tabby-common/src/index.rs index 9e44e2211cb5..d6f1b5736b4e 100644 --- a/crates/tabby-common/src/index.rs +++ b/crates/tabby-common/src/index.rs @@ -19,7 +19,7 @@ pub struct CodeSearchSchema { pub schema: Schema, pub field_git_url: Field, pub field_filepath: Field, - /// Indexed field uniquely identifying a file in a repository, format is `git_url:filepath` + /// Indexed field uniquely identifying a file in a repository, stringified SourceFileKey pub field_file_id: Field, pub field_language: Field, pub field_body: Field, diff --git a/crates/tabby-common/src/lib.rs b/crates/tabby-common/src/lib.rs index 747126fd4df7..9d2cdb370964 100644 --- a/crates/tabby-common/src/lib.rs +++ b/crates/tabby-common/src/lib.rs @@ -23,6 +23,7 @@ use serde_jsonlines::JsonLinesReader; #[derive(Serialize, Deserialize, Clone)] pub struct SourceFile { + pub git_url: String, pub basedir: String, pub filepath: String, pub language: String, diff --git a/crates/tabby-scheduler/src/cache.rs b/crates/tabby-scheduler/src/cache.rs index 848036dd4b6b..b6c111c82a17 100644 --- a/crates/tabby-scheduler/src/cache.rs +++ b/crates/tabby-scheduler/src/cache.rs @@ -2,6 +2,7 @@ use std::{ fs::read_to_string, path::{Path, PathBuf}, process::Command, + str::FromStr, }; use anyhow::{bail, Context, Result}; @@ -13,54 +14,34 @@ use tracing::{info, warn}; use crate::code::CodeIntelligence; const SOURCE_FILE_BUCKET_KEY: &str = "source_files"; -const LAST_INDEX_COMMIT_BUCKET: &str = "last_index_commit"; - -fn cmd_stdout(path: &Path, cmd: &str, args: &[&str]) -> Result { - Ok(String::from_utf8( - Command::new(cmd) - .current_dir(path) - .args(args) - .output()? - .stdout, - )? - .trim() - .to_string()) +const INDEX_BUCKET_KEY: &str = "indexed_files"; + +fn cmd_stdout(cmd: &str, args: &[&str]) -> Result { + Ok( + String::from_utf8(Command::new(cmd).args(args).output()?.stdout)? + .trim() + .to_string(), + ) } fn get_git_hash(path: &Path) -> Result { Ok(cmd_stdout( - path, "git", &["hash-object", &path.display().to_string()], )?) } -pub fn get_current_commit_hash(path: &Path) -> Result { - cmd_stdout(path, "git", &["rev-parse", "HEAD"]) -} - -pub fn get_changed_files(path: &Path, since_commit: &str) -> Result> { - Ok(cmd_stdout( - path, - "git", - &["diff", "--no-renames", "--name-only", since_commit], - )? - .lines() - .map(|line| line.to_owned()) - .collect()) -} - -#[derive(Deserialize, Serialize)] -struct SourceFileKey { +#[derive(Deserialize, Serialize, Debug)] +pub(crate) struct SourceFileKey { path: PathBuf, language: String, git_hash: String, } -impl TryFrom<&str> for SourceFileKey { - type Error = serde_json::Error; +impl FromStr for SourceFileKey { + type Err = serde_json::Error; - fn try_from(s: &str) -> Result { + fn from_str(s: &str) -> Result { serde_json::from_str(s) } } @@ -105,38 +86,56 @@ impl CacheStore { } } - pub fn get_last_index_commit(&self, repository: &RepositoryConfig) -> Option { + fn index_bucket(&self) -> Bucket { self.store - .bucket(Some(LAST_INDEX_COMMIT_BUCKET)) - .expect("Failed to access meta bucket") - .get(&repository.canonical_git_url()) - .expect("Failed to read last index commit") + .bucket(Some(INDEX_BUCKET_KEY)) + .expect("Failed to access indexed files bucket") } - pub fn set_last_index_commit(&self, repository: &RepositoryConfig, commit: Option) { - let bucket = self - .store - .bucket(Some(LAST_INDEX_COMMIT_BUCKET)) - .expect("Failed to access meta bucket"); - if let Some(commit) = commit { - bucket - .set(&repository.canonical_git_url(), &commit) - .expect("Failed to write last index commit"); - } else { - bucket - .remove(&repository.git_url) - .expect("Failed to remove last index commit"); - } + pub fn is_indexed(&self, key: &SourceFileKey) -> bool { + self.index_bucket() + .contains(&key.to_string()) + .expect("Failed to read index bucket") } - pub fn list_indexed_repositories(&self) -> Vec { - self.store - .bucket::(Some(LAST_INDEX_COMMIT_BUCKET)) - .expect("Failed to read meta bucket") + pub fn set_indexed(&self, key: &SourceFileKey) { + self.index_bucket() + .set(&key.to_string(), &String::new()) + .expect("Failed to write to index bucket"); + } + + pub fn cleanup_old_indexed_files(&self, key_remover: impl Fn(&String)) { + info!("Cleaning up indexed file cache"); + let bucket = self.index_bucket(); + let mut batch = Batch::new(); + + let mut num_keep = 0; + let mut num_removed = 0; + + bucket .iter() - .map(|item| item.unwrap().key().unwrap()) - .map(|git_url| RepositoryConfig::new(git_url)) - .collect() + .filter_map(|item| { + let item = item.expect("Failed to read item"); + let item_key: String = item.key().expect("Failed to get key"); + if is_item_key_matched(&item_key) { + num_keep += 1; + None + } else { + num_removed += 1; + Some(item_key) + } + }) + .inspect(key_remover) + .for_each(|key| { + batch + .remove(&key) + .expect("Failed to remove indexed source file") + }); + + info!("Finished cleaning up indexed files: {num_keep} items kept, {num_removed} items removed"); + bucket + .batch(batch) + .expect("Failed to execute batched delete"); } pub fn get_source_file( @@ -167,8 +166,8 @@ impl CacheStore { } } - pub fn garbage_collection(&self) { - info!("Running garbage collection"); + pub fn cleanup_old_source_files(&self) { + info!("Cleaning up synced file cache"); let bucket: Bucket> = self .store .bucket(Some(SOURCE_FILE_BUCKET_KEY)) @@ -188,7 +187,7 @@ impl CacheStore { None } else { num_removed += 1; - Some(item.key().expect("Failed to get key")) + Some(item_key) } }) .for_each(|key| batch.remove(&key).expect("Failed to remove key")); @@ -202,7 +201,7 @@ impl CacheStore { } fn is_item_key_matched(item_key: &str) -> bool { - let Ok(key) = SourceFileKey::try_from(item_key) else { + let Ok(key) = item_key.parse::() else { return false; }; @@ -244,6 +243,7 @@ fn create_source_file( } }; let source_file = SourceFile { + git_url: config.canonical_git_url(), basedir: config.dir().display().to_string(), filepath: relative_path.display().to_string(), max_line_length: metrics::max_line_length(&contents), diff --git a/crates/tabby-scheduler/src/index.rs b/crates/tabby-scheduler/src/index.rs index 22ea53668c1e..88a06f85066a 100644 --- a/crates/tabby-scheduler/src/index.rs +++ b/crates/tabby-scheduler/src/index.rs @@ -5,14 +5,13 @@ use kdam::BarExt; use tabby_common::{ config::RepositoryConfig, index::{register_tokenizers, CodeSearchSchema}, - languages::get_language_by_ext, path, SourceFile, }; -use tantivy::{directory::MmapDirectory, doc, Index, IndexWriter, Term}; +use tantivy::{directory::MmapDirectory, doc, Index, Term}; use tracing::{debug, warn}; use crate::{ - cache::{get_changed_files, get_current_commit_hash, CacheStore}, + cache::{CacheStore, SourceFileKey}, code::CodeIntelligence, utils::tqdm, }; @@ -32,158 +31,81 @@ pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) { .writer(150_000_000) .expect("Failed to create index writer"); - let intelligence = CodeIntelligence::default(); + let total_file_size: usize = SourceFile::all() + .filter(is_valid_file) + .map(|x| x.read_file_size()) + .sum(); - writer - .delete_all_documents() - .expect("Failed to delete all documents"); + let mut pb = std::io::stdout() + .is_terminal() + .then(|| tqdm(total_file_size)); + let intelligence = CodeIntelligence::default(); for repository in config { - let Some(commit) = cache.get_last_index_commit(repository) else { - index_repository_from_scratch(repository, &writer, &code, &intelligence, cache); - continue; - }; - let dir = repository.dir(); - let changed_files = get_changed_files(&dir, &commit).expect("Failed read file diff"); - for file in changed_files { - let path = dir.join(&file); - let file_id = create_file_id(&repository.git_url, &file); - delete_document(&writer, &code, file_id.clone()); - if !path.exists() { - continue; - } - let Some(source_file) = cache.get_source_file(repository, &path) else { + for file in Walk::new(repository.dir()) { + let file = match file { + Ok(file) => file, + Err(e) => { + warn!("Failed to walk file tree for indexing: {e}"); + continue; + } + }; + let Some(source_file) = cache.get_source_file(repository, file.path()) else { continue; }; if !is_valid_file(&source_file) { continue; } - add_document( - &writer, - repository, - &source_file, - file_id, - &code, - &intelligence, - ); - } - cache.set_last_index_commit( - repository, - Some(get_current_commit_hash(&dir).expect("Failed to read current commit hash")), - ); - } + let file_id = + SourceFileKey::try_from(file.path()).expect("Failed to create source file key"); - for indexed_repository in cache.list_indexed_repositories() { - if !indexed_repository.dir().exists() { - cache.set_last_index_commit(&indexed_repository, None); - delete_all_documents(&writer, &code, &indexed_repository.canonical_git_url()); + if cache.is_indexed(&file_id) { + continue; + } + let text = match source_file.read_content() { + Ok(content) => content, + Err(e) => { + warn!( + "Failed to read content of '{}': {}", + source_file.filepath, e + ); + continue; + } + }; + + for body in intelligence.chunks(&text) { + pb.as_mut() + .map(|b| b.update(body.len())) + .transpose() + .expect("Failed to update progress bar"); + + writer + .add_document(doc! { + code.field_git_url => source_file.git_url.clone(), + code.field_file_id => file_id.to_string(), + code.field_filepath => source_file.filepath.clone(), + code.field_language => source_file.language.clone(), + code.field_body => body, + }) + .expect("Failed to add document"); + } + cache.set_indexed(&file_id); } } + cache.cleanup_old_indexed_files(|key| { + writer.delete_term(Term::from_field_text(code.field_file_id, key)); + }); + writer.commit().expect("Failed to commit index"); writer .wait_merging_threads() .expect("Failed to wait for merging threads"); } -fn index_repository_from_scratch( - repository: &RepositoryConfig, - writer: &IndexWriter, - code: &CodeSearchSchema, - intelligence: &CodeIntelligence, - cache: &mut CacheStore, -) { - let mut pb = std::io::stdout().is_terminal().then(|| { - let total_file_size: usize = Walk::new(repository.dir()) - .filter_map(|f| f.ok()) - .map(|f| f.path().to_owned()) - .filter(|f| { - f.extension() - .is_some_and(|ext| get_language_by_ext(ext).is_some()) - }) - .map(|f| read_file_size(&f)) - .sum(); - tqdm(total_file_size) - }); - - for file in Walk::new(repository.dir()) { - let file = file.expect("Failed to read file listing"); - let Some(source_file) = cache.get_source_file(repository, file.path()) else { - continue; - }; - if !is_valid_file(&source_file) { - continue; - } - let file_id = create_file_id(&repository.git_url, &source_file.filepath); - add_document( - writer, - repository, - &source_file, - file_id, - code, - intelligence, - ); - pb.as_mut().map(|pb| { - pb.update(source_file.read_file_size()) - .expect("Failed to update progress bar") - }); - } - cache.set_last_index_commit( - repository, - Some( - get_current_commit_hash(&repository.dir()).expect("Failed to read current commit hash"), - ), - ); -} - -fn read_file_size(path: &Path) -> usize { - std::fs::metadata(path) - .map(|meta| meta.len()) - .unwrap_or_default() as usize -} - -fn is_valid_file(source_file: &SourceFile) -> bool { - source_file.max_line_length <= MAX_LINE_LENGTH_THRESHOLD - && source_file.avg_line_length <= AVG_LINE_LENGTH_THRESHOLD -} - -pub fn delete_document(writer: &IndexWriter, code: &CodeSearchSchema, file_id: String) { - let term = Term::from_field_text(code.field_file_id.clone(), &file_id); - writer.delete_term(term); -} - -pub fn delete_all_documents(writer: &IndexWriter, code: &CodeSearchSchema, git_url: &str) { - let term = Term::from_field_text(code.field_git_url, git_url); - writer.delete_term(term); -} - -pub fn add_document( - writer: &IndexWriter, - repository: &RepositoryConfig, - file: &SourceFile, - file_id: String, - code: &CodeSearchSchema, - intelligence: &CodeIntelligence, -) -> usize { - let text = match file.read_content() { - Ok(content) => content, - Err(e) => { - warn!("Failed to read content of '{}': {}", file.filepath, e); - return 0; - } - }; - for body in intelligence.chunks(&text) { - writer - .add_document(doc!( - code.field_git_url => repository.canonical_git_url(), - code.field_filepath => file.filepath.clone(), - code.field_file_id => file_id.clone(), - code.field_language => file.language.clone(), - code.field_body => body, - )) - .expect("Failed to add document"); - } - text.len() +fn is_valid_file(file: &SourceFile) -> bool { + file.max_line_length <= MAX_LINE_LENGTH_THRESHOLD + && file.avg_line_length <= AVG_LINE_LENGTH_THRESHOLD } fn open_or_create_index(code: &CodeSearchSchema, path: &Path) -> Index { @@ -208,7 +130,3 @@ fn open_or_create_index_impl(code: &CodeSearchSchema, path: &Path) -> tantivy::R let directory = MmapDirectory::open(path).expect("Failed to open index directory"); Index::open_or_create(directory, code.schema.clone()) } - -pub fn create_file_id(git_url: &str, filepath: &str) -> String { - format!("{}:{}", git_url, filepath) -} diff --git a/crates/tabby-scheduler/src/lib.rs b/crates/tabby-scheduler/src/lib.rs index b0cedd696865..63a6904e3a85 100644 --- a/crates/tabby-scheduler/src/lib.rs +++ b/crates/tabby-scheduler/src/lib.rs @@ -24,7 +24,7 @@ pub async fn scheduler(now: bool, access: T) { job_sync(&mut cache, &repositories); job_index(&mut cache, &repositories); - cache.garbage_collection(); + cache.cleanup_old_source_files(); } else { let access = Arc::new(access); let scheduler = JobScheduler::new() @@ -53,7 +53,7 @@ pub async fn scheduler(now: bool, access: T) { job_sync(&mut cache, &repositories); job_index(&mut cache, &repositories); - cache.garbage_collection(); + cache.cleanup_old_source_files(); }) }) .expect("Failed to create job"),