Skip to content

Commit

Permalink
feat(scheduler): implement incremental indexing for tantivy (#2062)
Browse files Browse the repository at this point in the history
* Begin reimplementing incremental indexing

* feat(scheduler): implement incremental indexing for tantivy

* Get git_url from RepositoryConfig

* Apply suggestions

* Redesign incremental indexing

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* Apply suggestions

* Apply suggestions

* Apply nits

* Update crates/tabby-scheduler/src/index.rs

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Meng Zhang <[email protected]>
  • Loading branch information
3 people authored May 7, 2024
1 parent 24fa4c0 commit 7d0343e
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 48 deletions.
4 changes: 4 additions & 0 deletions crates/tabby-common/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct CodeSearchSchema {
pub schema: Schema,
pub field_git_url: Field,
pub field_filepath: Field,
/// Indexed field uniquely identifying a file in a repository, stringified SourceFileKey
pub field_source_file_key: Field,
pub field_language: Field,
pub field_body: Field,
}
Expand All @@ -36,6 +38,7 @@ impl CodeSearchSchema {

let field_git_url = builder.add_text_field("git_url", STRING | STORED);
let field_filepath = builder.add_text_field("filepath", STRING | STORED);
let field_source_file_key = builder.add_text_field("file_id", STRING | STORED);
let field_language = builder.add_text_field("language", STRING | STORED);
let field_body = builder.add_text_field("body", code_options);
let schema = builder.build();
Expand All @@ -44,6 +47,7 @@ impl CodeSearchSchema {
schema,
field_git_url,
field_filepath,
field_source_file_key,
field_language,
field_body,
}
Expand Down
91 changes: 79 additions & 12 deletions crates/tabby-scheduler/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
fs::read_to_string,
path::{Path, PathBuf},
process::Command,
str::FromStr,
};

use anyhow::{bail, Context, Result};
Expand All @@ -13,24 +14,31 @@ use tracing::{info, warn};
use crate::code::CodeIntelligence;

const SOURCE_FILE_BUCKET_KEY: &str = "source_files";
const INDEX_BUCKET_KEY: &str = "indexed_files";

fn cmd_stdout(cmd: &str, args: &[&str]) -> Result<String> {
Ok(
String::from_utf8(Command::new(cmd).args(args).output()?.stdout)?
.trim()
.to_string(),
)
}

fn get_git_hash(path: &Path) -> Result<String> {
let path = path.display().to_string();
let output = Command::new("git").args(["hash-object", &path]).output()?;
Ok(String::from_utf8(output.stdout)?.trim().to_string())
cmd_stdout("git", &["hash-object", &path.display().to_string()])
}

#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Debug)]
struct SourceFileKey {
path: PathBuf,
language: String,
git_hash: String,
}

impl TryFrom<&str> for SourceFileKey {
type Error = serde_json::Error;
impl FromStr for SourceFileKey {
type Err = serde_json::Error;

fn try_from(s: &str) -> Result<Self, Self::Error> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}
Expand Down Expand Up @@ -75,6 +83,65 @@ impl CacheStore {
}
}

fn index_bucket(&self) -> Bucket<String, String> {
self.store
.bucket(Some(INDEX_BUCKET_KEY))
.expect("Failed to access indexed files bucket")
}

pub fn check_indexed(&self, path: &Path) -> (String, bool) {
let key = SourceFileKey::try_from(path)
.expect("Failed to create source file key")
.to_string();
let indexed = self
.index_bucket()
.contains(&key)
.expect("Failed to read index bucket");
(key, indexed)
}

pub fn apply_indexed(&self, batch: Batch<String, String>) {
self.index_bucket()
.batch(batch)
.expect("Failed to commit batched index update")
}

#[must_use]
pub fn prepare_garbage_collection_for_indexed_files(
&self,
key_remover: impl Fn(&String),
) -> impl FnOnce() + '_ {
info!("Started cleaning up 'indexed_files' bucket");
let bucket = self.index_bucket();
let mut batch = Batch::new();

let mut num_keep = 0;
let mut num_removed = 0;

bucket
.iter()
.filter_map(|item| {
let item = item.expect("Failed to read item");
let item_key: String = item.key().expect("Failed to get key");
if is_item_key_matched(&item_key) {
num_keep += 1;
None
} else {
num_removed += 1;
Some(item_key)
}
})
.inspect(key_remover)
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!("Finished garbage collection for 'indexed_files': {num_keep} items kept, {num_removed} items removed");
move || {
bucket
.batch(batch)
.expect("Failed to execute batched delete")
}
}

pub fn get_source_file(
&mut self,
config: &RepositoryConfig,
Expand Down Expand Up @@ -103,8 +170,8 @@ impl CacheStore {
}
}

pub fn garbage_collection(&self) {
info!("Running garbage collection");
pub fn garbage_collection_for_source_files(&self) {
info!("Started cleaning up 'source_files' bucket");
let bucket: Bucket<String, Json<SourceFile>> = self
.store
.bucket(Some(SOURCE_FILE_BUCKET_KEY))
Expand All @@ -124,21 +191,21 @@ impl CacheStore {
None
} else {
num_removed += 1;
Some(item.key().expect("Failed to get key"))
Some(item_key)
}
})
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!(
"Finished garbage collection: {} items kept, {} items removed",
"Finished garbage collection for 'source_files': {} items kept, {} items removed",
num_keep, num_removed
);
bucket.batch(batch).expect("to batch remove staled files");
}
}

fn is_item_key_matched(item_key: &str) -> bool {
let Ok(key) = SourceFileKey::try_from(item_key) else {
let Ok(key) = item_key.parse::<SourceFileKey>() else {
return false;
};

Expand Down
122 changes: 92 additions & 30 deletions crates/tabby-scheduler/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
use std::{fs, io::IsTerminal, path::Path};

use ignore::Walk;
use kdam::BarExt;
use kv::Batch;
use tabby_common::{
config::RepositoryConfig,
index::{register_tokenizers, CodeSearchSchema},
path, SourceFile,
};
use tantivy::{directory::MmapDirectory, doc, Index};
use tantivy::{directory::MmapDirectory, doc, Index, Term};
use tracing::{debug, warn};

use crate::{code::CodeIntelligence, utils::tqdm};
use crate::{cache::CacheStore, code::CodeIntelligence, utils::tqdm};

// Magic numbers
static MAX_LINE_LENGTH_THRESHOLD: usize = 300;
static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32;

pub fn index_repositories(_config: &[RepositoryConfig]) {
let code = CodeSearchSchema::new();

pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) {
let code = CodeSearchSchema::default();
let index = open_or_create_index(&code, &path::index_dir());
register_tokenizers(&index);
add_changed_documents(cache, &code, config, &index);
remove_staled_documents(cache, &code, &index);
}

fn add_changed_documents(
cache: &mut CacheStore,
code: &CodeSearchSchema,
config: &[RepositoryConfig],
index: &Index,
) {
register_tokenizers(index);

// Initialize the search index writer with an initial arena size of 150 MB.
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");
writer
.delete_all_documents()
.expect("Failed to delete all documents");

let total_file_size: usize = SourceFile::all()
.filter(is_valid_file)
Expand All @@ -39,36 +47,90 @@ pub fn index_repositories(_config: &[RepositoryConfig]) {
.then(|| tqdm(total_file_size));

let intelligence = CodeIntelligence::default();
for file in SourceFile::all().filter(is_valid_file) {
let text = match file.read_content() {
Ok(content) => content,
Err(e) => {
warn!("Failed to read content of '{}': {}", file.filepath, e);
let mut indexed_files_batch = Batch::new();
for repository in config {
for file in Walk::new(repository.dir()) {
let file = match file {
Ok(file) => file,
Err(e) => {
warn!("Failed to walk file tree for indexing: {e}");
continue;
}
};
let Some(source_file) = cache.get_source_file(repository, file.path()) else {
continue;
};
if !is_valid_file(&source_file) {
continue;
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc!(
code.field_git_url => file.git_url.clone(),
code.field_filepath => file.filepath.clone(),
code.field_language => file.language.clone(),
code.field_body => body,
))
.expect("Failed to add document");
let (file_id, indexed) = cache.check_indexed(file.path());

if indexed {
continue;
}
let text = match source_file.read_content() {
Ok(content) => content,
Err(e) => {
warn!(
"Failed to read content of '{}': {}",
source_file.filepath, e
);
continue;
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc! {
code.field_git_url => source_file.git_url.clone(),
code.field_source_file_key => file_id.to_string(),
code.field_filepath => source_file.filepath.clone(),
code.field_language => source_file.language.clone(),
code.field_body => body,
})
.expect("Failed to add document");
}
indexed_files_batch
.set(&file_id, &String::new())
.expect("Failed to mark file as indexed");
}
}

// Commit updating indexed documents
writer.commit().expect("Failed to commit index");
writer
.wait_merging_threads()
.expect("Failed to wait for merging threads");

// Mark all indexed documents as indexed
cache.apply_indexed(indexed_files_batch);
}

pub fn remove_staled_documents(cache: &mut CacheStore, code: &CodeSearchSchema, index: &Index) {
// Create a new writer to commit deletion of removed indexed files
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");

let gc_commit = cache.prepare_garbage_collection_for_indexed_files(|key| {
writer.delete_term(Term::from_field_text(code.field_source_file_key, key));
});

// Commit garbage collection
writer
.commit()
.expect("Failed to commit garbage collection");

writer
.wait_merging_threads()
.expect("Failed to wait for merging threads on garbage collection");

gc_commit();
}

fn is_valid_file(file: &SourceFile) -> bool {
Expand Down
12 changes: 6 additions & 6 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) {
.await
.expect("Must be able to retrieve repositories for sync");
job_sync(&mut cache, &repositories);
job_index(&repositories);
job_index(&mut cache, &repositories);

cache.garbage_collection();
cache.garbage_collection_for_source_files();
} else {
let access = Arc::new(access);
let scheduler = JobScheduler::new()
Expand Down Expand Up @@ -52,8 +52,8 @@ pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) {
.expect("Must be able to retrieve repositories for sync");

job_sync(&mut cache, &repositories);
job_index(&repositories);
cache.garbage_collection();
job_index(&mut cache, &repositories);
cache.garbage_collection_for_source_files();
})
})
.expect("Failed to create job"),
Expand All @@ -69,9 +69,9 @@ pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) {
}
}

fn job_index(repositories: &[RepositoryConfig]) {
fn job_index(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {
println!("Indexing repositories...");
index::index_repositories(repositories);
index::index_repositories(cache, repositories);
}

fn job_sync(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {
Expand Down

0 comments on commit 7d0343e

Please sign in to comment.