Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): implement incremental indexing for tantivy #2062

Merged
merged 11 commits into from
May 7, 2024
4 changes: 4 additions & 0 deletions crates/tabby-common/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct CodeSearchSchema {
pub schema: Schema,
pub field_git_url: Field,
pub field_filepath: Field,
/// Indexed field uniquely identifying a file in a repository, stringified SourceFileKey
pub field_source_file_key: Field,
pub field_language: Field,
pub field_body: Field,
}
Expand All @@ -36,6 +38,7 @@ impl CodeSearchSchema {

let field_git_url = builder.add_text_field("git_url", STRING | STORED);
let field_filepath = builder.add_text_field("filepath", STRING | STORED);
let field_source_file_key = builder.add_text_field("file_id", STRING | STORED);
let field_language = builder.add_text_field("language", STRING | STORED);
let field_body = builder.add_text_field("body", code_options);
let schema = builder.build();
Expand All @@ -44,6 +47,7 @@ impl CodeSearchSchema {
schema,
field_git_url,
field_filepath,
field_source_file_key,
field_language,
field_body,
}
Expand Down
91 changes: 79 additions & 12 deletions crates/tabby-scheduler/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
fs::read_to_string,
path::{Path, PathBuf},
process::Command,
str::FromStr,
};

use anyhow::{bail, Context, Result};
Expand All @@ -13,24 +14,31 @@ use tracing::{info, warn};
use crate::code::CodeIntelligence;

const SOURCE_FILE_BUCKET_KEY: &str = "source_files";
const INDEX_BUCKET_KEY: &str = "indexed_files";

fn cmd_stdout(cmd: &str, args: &[&str]) -> Result<String> {
Ok(
String::from_utf8(Command::new(cmd).args(args).output()?.stdout)?
.trim()
.to_string(),
)
}

fn get_git_hash(path: &Path) -> Result<String> {
let path = path.display().to_string();
let output = Command::new("git").args(["hash-object", &path]).output()?;
Ok(String::from_utf8(output.stdout)?.trim().to_string())
cmd_stdout("git", &["hash-object", &path.display().to_string()])
}

#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Debug)]
struct SourceFileKey {
path: PathBuf,
language: String,
git_hash: String,
}

impl TryFrom<&str> for SourceFileKey {
type Error = serde_json::Error;
impl FromStr for SourceFileKey {
type Err = serde_json::Error;

fn try_from(s: &str) -> Result<Self, Self::Error> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}
Expand Down Expand Up @@ -75,6 +83,65 @@ impl CacheStore {
}
}

fn index_bucket(&self) -> Bucket<String, String> {
self.store
.bucket(Some(INDEX_BUCKET_KEY))
.expect("Failed to access indexed files bucket")
}

pub fn check_indexed(&self, path: &Path) -> (String, bool) {
let key = SourceFileKey::try_from(path)
.expect("Failed to create source file key")
.to_string();
let indexed = self
.index_bucket()
.contains(&key)
.expect("Failed to read index bucket");
(key, indexed)
}

pub fn apply_indexed(&self, batch: Batch<String, String>) {
self.index_bucket()
.batch(batch)
.expect("Failed to commit batched index update")
}

#[must_use]
pub fn prepare_garbage_collection_for_indexed_files(
&self,
key_remover: impl Fn(&String),
) -> impl FnOnce() + '_ {
info!("Started cleaning up 'indexed_files' bucket");
let bucket = self.index_bucket();
let mut batch = Batch::new();

let mut num_keep = 0;
let mut num_removed = 0;

bucket
.iter()
.filter_map(|item| {
let item = item.expect("Failed to read item");
let item_key: String = item.key().expect("Failed to get key");
if is_item_key_matched(&item_key) {
num_keep += 1;
None
} else {
num_removed += 1;
Some(item_key)
}
})
.inspect(key_remover)
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!("Finished garbage collection for 'indexed_files': {num_keep} items kept, {num_removed} items removed");
move || {
bucket
.batch(batch)
.expect("Failed to execute batched delete")
}
}

pub fn get_source_file(
&mut self,
config: &RepositoryConfig,
Expand Down Expand Up @@ -103,8 +170,8 @@ impl CacheStore {
}
}

pub fn garbage_collection(&self) {
info!("Running garbage collection");
pub fn garbage_collection_for_source_files(&self) {
info!("Started cleaning up 'source_files' bucket");
let bucket: Bucket<String, Json<SourceFile>> = self
.store
.bucket(Some(SOURCE_FILE_BUCKET_KEY))
Expand All @@ -124,21 +191,21 @@ impl CacheStore {
None
} else {
num_removed += 1;
Some(item.key().expect("Failed to get key"))
Some(item_key)
}
})
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!(
"Finished garbage collection: {} items kept, {} items removed",
"Finished garbage collection for 'source_files': {} items kept, {} items removed",
num_keep, num_removed
);
bucket.batch(batch).expect("to batch remove staled files");
}
}

fn is_item_key_matched(item_key: &str) -> bool {
let Ok(key) = SourceFileKey::try_from(item_key) else {
let Ok(key) = item_key.parse::<SourceFileKey>() else {
return false;
};

Expand Down
122 changes: 92 additions & 30 deletions crates/tabby-scheduler/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
use std::{fs, io::IsTerminal, path::Path};

use ignore::Walk;
use kdam::BarExt;
use kv::Batch;
use tabby_common::{
config::RepositoryConfig,
index::{register_tokenizers, CodeSearchSchema},
path, SourceFile,
};
use tantivy::{directory::MmapDirectory, doc, Index};
use tantivy::{directory::MmapDirectory, doc, Index, Term};
use tracing::{debug, warn};

use crate::{code::CodeIntelligence, utils::tqdm};
use crate::{cache::CacheStore, code::CodeIntelligence, utils::tqdm};

// Magic numbers
static MAX_LINE_LENGTH_THRESHOLD: usize = 300;
static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32;

pub fn index_repositories(_config: &[RepositoryConfig]) {
let code = CodeSearchSchema::new();

pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) {
boxbeam marked this conversation as resolved.
Show resolved Hide resolved
let code = CodeSearchSchema::default();
let index = open_or_create_index(&code, &path::index_dir());
register_tokenizers(&index);
add_changed_documents(cache, &code, config, &index);
remove_staled_documents(cache, &code, &index);
}

fn add_changed_documents(
cache: &mut CacheStore,
code: &CodeSearchSchema,
config: &[RepositoryConfig],
index: &Index,
) {
register_tokenizers(index);

// Initialize the search index writer with an initial arena size of 150 MB.
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");
writer
.delete_all_documents()
.expect("Failed to delete all documents");

let total_file_size: usize = SourceFile::all()
.filter(is_valid_file)
Expand All @@ -39,36 +47,90 @@ pub fn index_repositories(_config: &[RepositoryConfig]) {
.then(|| tqdm(total_file_size));

let intelligence = CodeIntelligence::default();
for file in SourceFile::all().filter(is_valid_file) {
let text = match file.read_content() {
Ok(content) => content,
Err(e) => {
warn!("Failed to read content of '{}': {}", file.filepath, e);
let mut indexed_files_batch = Batch::new();
for repository in config {
for file in Walk::new(repository.dir()) {
let file = match file {
Ok(file) => file,
Err(e) => {
warn!("Failed to walk file tree for indexing: {e}");
continue;
}
};
let Some(source_file) = cache.get_source_file(repository, file.path()) else {
continue;
};
if !is_valid_file(&source_file) {
continue;
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc!(
code.field_git_url => file.git_url.clone(),
code.field_filepath => file.filepath.clone(),
code.field_language => file.language.clone(),
code.field_body => body,
))
.expect("Failed to add document");
let (file_id, indexed) = cache.check_indexed(file.path());

if indexed {
continue;
}
let text = match source_file.read_content() {
Ok(content) => content,
Err(e) => {
warn!(
"Failed to read content of '{}': {}",
source_file.filepath, e
);
continue;
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc! {
code.field_git_url => source_file.git_url.clone(),
code.field_source_file_key => file_id.to_string(),
code.field_filepath => source_file.filepath.clone(),
code.field_language => source_file.language.clone(),
code.field_body => body,
})
.expect("Failed to add document");
}
indexed_files_batch
.set(&file_id, &String::new())
.expect("Failed to mark file as indexed");
}
}

// Commit updating indexed documents
writer.commit().expect("Failed to commit index");
boxbeam marked this conversation as resolved.
Show resolved Hide resolved
writer
.wait_merging_threads()
.expect("Failed to wait for merging threads");

// Mark all indexed documents as indexed
cache.apply_indexed(indexed_files_batch);
}

pub fn remove_staled_documents(cache: &mut CacheStore, code: &CodeSearchSchema, index: &Index) {
// Create a new writer to commit deletion of removed indexed files
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");

let gc_commit = cache.prepare_garbage_collection_for_indexed_files(|key| {
writer.delete_term(Term::from_field_text(code.field_source_file_key, key));
});

// Commit garbage collection
writer
.commit()
.expect("Failed to commit garbage collection");
boxbeam marked this conversation as resolved.
Show resolved Hide resolved

gc_commit();

writer
.wait_merging_threads()
.expect("Failed to wait for merging threads on garbage collection");
wsxiaoys marked this conversation as resolved.
Show resolved Hide resolved
}

fn is_valid_file(file: &SourceFile) -> bool {
Expand Down
12 changes: 6 additions & 6 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) {
.await
.expect("Must be able to retrieve repositories for sync");
job_sync(&mut cache, &repositories);
job_index(&repositories);
job_index(&mut cache, &repositories);

cache.garbage_collection();
cache.garbage_collection_for_source_files();
} else {
let access = Arc::new(access);
let scheduler = JobScheduler::new()
Expand Down Expand Up @@ -52,8 +52,8 @@ pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) {
.expect("Must be able to retrieve repositories for sync");

job_sync(&mut cache, &repositories);
job_index(&repositories);
cache.garbage_collection();
job_index(&mut cache, &repositories);
cache.garbage_collection_for_source_files();
})
})
.expect("Failed to create job"),
Expand All @@ -69,9 +69,9 @@ pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) {
}
}

fn job_index(repositories: &[RepositoryConfig]) {
fn job_index(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {
println!("Indexing repositories...");
index::index_repositories(repositories);
index::index_repositories(cache, repositories);
}

fn job_sync(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {
Expand Down
Loading