Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): implement incremental indexing for tantivy #2062

Merged
merged 11 commits into from
May 7, 2024
4 changes: 4 additions & 0 deletions crates/tabby-common/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct CodeSearchSchema {
pub schema: Schema,
pub field_git_url: Field,
pub field_filepath: Field,
/// Indexed field uniquely identifying a file in a repository, stringified SourceFileKey
pub field_source_file_key: Field,
pub field_language: Field,
pub field_body: Field,
}
Expand All @@ -36,6 +38,7 @@ impl CodeSearchSchema {

let field_git_url = builder.add_text_field("git_url", STRING | STORED);
let field_filepath = builder.add_text_field("filepath", STRING | STORED);
let field_source_file_key = builder.add_text_field("file_id", STRING | STORED);
let field_language = builder.add_text_field("language", STRING | STORED);
let field_body = builder.add_text_field("body", code_options);
let schema = builder.build();
Expand All @@ -44,6 +47,7 @@ impl CodeSearchSchema {
schema,
field_git_url,
field_filepath,
field_source_file_key,
field_language,
field_body,
}
Expand Down
91 changes: 79 additions & 12 deletions crates/tabby-scheduler/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
fs::read_to_string,
path::{Path, PathBuf},
process::Command,
str::FromStr,
};

use anyhow::{bail, Context, Result};
Expand All @@ -13,24 +14,31 @@
use crate::code::CodeIntelligence;

const SOURCE_FILE_BUCKET_KEY: &str = "source_files";
const INDEX_BUCKET_KEY: &str = "indexed_files";

fn cmd_stdout(cmd: &str, args: &[&str]) -> Result<String> {
Ok(
String::from_utf8(Command::new(cmd).args(args).output()?.stdout)?
.trim()
.to_string(),

Check warning on line 23 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L19-L23

Added lines #L19 - L23 were not covered by tests
)
}

Check warning on line 25 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L25

Added line #L25 was not covered by tests

fn get_git_hash(path: &Path) -> Result<String> {
let path = path.display().to_string();
let output = Command::new("git").args(["hash-object", &path]).output()?;
Ok(String::from_utf8(output.stdout)?.trim().to_string())
cmd_stdout("git", &["hash-object", &path.display().to_string()])

Check warning on line 28 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L28

Added line #L28 was not covered by tests
}

#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Debug)]

Check warning on line 31 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L31

Added line #L31 was not covered by tests
struct SourceFileKey {
path: PathBuf,
language: String,
git_hash: String,
}

impl TryFrom<&str> for SourceFileKey {
type Error = serde_json::Error;
impl FromStr for SourceFileKey {
type Err = serde_json::Error;

fn try_from(s: &str) -> Result<Self, Self::Error> {
fn from_str(s: &str) -> Result<Self, Self::Err> {

Check warning on line 41 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L41

Added line #L41 was not covered by tests
serde_json::from_str(s)
}
}
Expand Down Expand Up @@ -75,6 +83,65 @@
}
}

fn index_bucket(&self) -> Bucket<String, String> {
self.store
.bucket(Some(INDEX_BUCKET_KEY))
.expect("Failed to access indexed files bucket")
}

Check warning on line 90 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L86-L90

Added lines #L86 - L90 were not covered by tests

pub fn check_indexed(&self, path: &Path) -> (String, bool) {
let key = SourceFileKey::try_from(path)
.expect("Failed to create source file key")
.to_string();
let indexed = self
.index_bucket()
.contains(&key)
.expect("Failed to read index bucket");
(key, indexed)
}

Check warning on line 101 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L92-L101

Added lines #L92 - L101 were not covered by tests

pub fn apply_indexed(&self, batch: Batch<String, String>) {
self.index_bucket()
.batch(batch)
.expect("Failed to commit batched index update")
}

Check warning on line 107 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L103-L107

Added lines #L103 - L107 were not covered by tests

#[must_use]
pub fn prepare_garbage_collection_for_indexed_files(
&self,
key_remover: impl Fn(&String),
) -> impl FnOnce() + '_ {
info!("Started cleaning up 'indexed_files' bucket");
let bucket = self.index_bucket();
let mut batch = Batch::new();

let mut num_keep = 0;
let mut num_removed = 0;

bucket
.iter()
.filter_map(|item| {
let item = item.expect("Failed to read item");
let item_key: String = item.key().expect("Failed to get key");
if is_item_key_matched(&item_key) {
num_keep += 1;
None

Check warning on line 128 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L110-L128

Added lines #L110 - L128 were not covered by tests
} else {
num_removed += 1;
Some(item_key)

Check warning on line 131 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L130-L131

Added lines #L130 - L131 were not covered by tests
}
})
.inspect(key_remover)
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!("Finished garbage collection for 'indexed_files': {num_keep} items kept, {num_removed} items removed");
move || {
bucket
.batch(batch)
.expect("Failed to execute batched delete")
}
}

Check warning on line 143 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L133-L143

Added lines #L133 - L143 were not covered by tests

pub fn get_source_file(
&mut self,
config: &RepositoryConfig,
Expand Down Expand Up @@ -103,8 +170,8 @@
}
}

pub fn garbage_collection(&self) {
info!("Running garbage collection");
pub fn garbage_collection_for_source_files(&self) {
info!("Started cleaning up 'source_files' bucket");

Check warning on line 174 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L173-L174

Added lines #L173 - L174 were not covered by tests
let bucket: Bucket<String, Json<SourceFile>> = self
.store
.bucket(Some(SOURCE_FILE_BUCKET_KEY))
Expand All @@ -124,21 +191,21 @@
None
} else {
num_removed += 1;
Some(item.key().expect("Failed to get key"))
Some(item_key)

Check warning on line 194 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L194

Added line #L194 was not covered by tests
}
})
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!(
"Finished garbage collection: {} items kept, {} items removed",
"Finished garbage collection for 'source_files': {} items kept, {} items removed",

Check warning on line 200 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L200

Added line #L200 was not covered by tests
num_keep, num_removed
);
bucket.batch(batch).expect("to batch remove staled files");
}
}

fn is_item_key_matched(item_key: &str) -> bool {
let Ok(key) = SourceFileKey::try_from(item_key) else {
let Ok(key) = item_key.parse::<SourceFileKey>() else {

Check warning on line 208 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L208

Added line #L208 was not covered by tests
return false;
};

Expand Down
122 changes: 92 additions & 30 deletions crates/tabby-scheduler/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
use std::{fs, io::IsTerminal, path::Path};

use ignore::Walk;
use kdam::BarExt;
use kv::Batch;
use tabby_common::{
config::RepositoryConfig,
index::{register_tokenizers, CodeSearchSchema},
path, SourceFile,
};
use tantivy::{directory::MmapDirectory, doc, Index};
use tantivy::{directory::MmapDirectory, doc, Index, Term};
use tracing::{debug, warn};

use crate::{code::CodeIntelligence, utils::tqdm};
use crate::{cache::CacheStore, code::CodeIntelligence, utils::tqdm};

// Magic numbers
static MAX_LINE_LENGTH_THRESHOLD: usize = 300;
static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32;

pub fn index_repositories(_config: &[RepositoryConfig]) {
let code = CodeSearchSchema::new();

pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) {
boxbeam marked this conversation as resolved.
Show resolved Hide resolved
let code = CodeSearchSchema::default();

Check warning on line 21 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L20-L21

Added lines #L20 - L21 were not covered by tests
let index = open_or_create_index(&code, &path::index_dir());
register_tokenizers(&index);
add_changed_documents(cache, &code, config, &index);
remove_staled_documents(cache, &code, &index);
}

Check warning on line 25 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L23-L25

Added lines #L23 - L25 were not covered by tests

fn add_changed_documents(
cache: &mut CacheStore,
code: &CodeSearchSchema,
config: &[RepositoryConfig],
index: &Index,
) {
register_tokenizers(index);

Check warning on line 33 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L27-L33

Added lines #L27 - L33 were not covered by tests

// Initialize the search index writer with an initial arena size of 150 MB.
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");
writer
.delete_all_documents()
.expect("Failed to delete all documents");

let total_file_size: usize = SourceFile::all()
.filter(is_valid_file)
Expand All @@ -39,36 +47,90 @@
.then(|| tqdm(total_file_size));

let intelligence = CodeIntelligence::default();
for file in SourceFile::all().filter(is_valid_file) {
let text = match file.read_content() {
Ok(content) => content,
Err(e) => {
warn!("Failed to read content of '{}': {}", file.filepath, e);
let mut indexed_files_batch = Batch::new();
for repository in config {
for file in Walk::new(repository.dir()) {
let file = match file {
Ok(file) => file,
Err(e) => {
warn!("Failed to walk file tree for indexing: {e}");
continue;

Check warning on line 57 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L50-L57

Added lines #L50 - L57 were not covered by tests
}
};
let Some(source_file) = cache.get_source_file(repository, file.path()) else {
continue;

Check warning on line 61 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L60-L61

Added lines #L60 - L61 were not covered by tests
};
if !is_valid_file(&source_file) {

Check warning on line 63 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L63

Added line #L63 was not covered by tests
continue;
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc!(
code.field_git_url => file.git_url.clone(),
code.field_filepath => file.filepath.clone(),
code.field_language => file.language.clone(),
code.field_body => body,
))
.expect("Failed to add document");
let (file_id, indexed) = cache.check_indexed(file.path());

if indexed {
continue;
}
let text = match source_file.read_content() {
Ok(content) => content,
Err(e) => {
warn!(
"Failed to read content of '{}': {}",
source_file.filepath, e
);
continue;

Check warning on line 78 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L66-L78

Added lines #L66 - L78 were not covered by tests
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc! {
code.field_git_url => source_file.git_url.clone(),
code.field_source_file_key => file_id.to_string(),
code.field_filepath => source_file.filepath.clone(),
code.field_language => source_file.language.clone(),
code.field_body => body,
})
.expect("Failed to add document");
}
indexed_files_batch
.set(&file_id, &String::new())
.expect("Failed to mark file as indexed");

Check warning on line 100 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L82-L100

Added lines #L82 - L100 were not covered by tests
}
}

// Commit updating indexed documents
writer.commit().expect("Failed to commit index");
boxbeam marked this conversation as resolved.
Show resolved Hide resolved
writer
.wait_merging_threads()
.expect("Failed to wait for merging threads");

// Mark all indexed documents as indexed
cache.apply_indexed(indexed_files_batch);
}

Check warning on line 112 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L109-L112

Added lines #L109 - L112 were not covered by tests

pub fn remove_staled_documents(cache: &mut CacheStore, code: &CodeSearchSchema, index: &Index) {
// Create a new writer to commit deletion of removed indexed files
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");

let gc_commit = cache.prepare_garbage_collection_for_indexed_files(|key| {
writer.delete_term(Term::from_field_text(code.field_source_file_key, key));
});

// Commit garbage collection
writer
.commit()
.expect("Failed to commit garbage collection");
boxbeam marked this conversation as resolved.
Show resolved Hide resolved

writer
.wait_merging_threads()
.expect("Failed to wait for merging threads on garbage collection");

gc_commit();

Check warning on line 133 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L114-L133

Added lines #L114 - L133 were not covered by tests
}

fn is_valid_file(file: &SourceFile) -> bool {
Expand Down
12 changes: 6 additions & 6 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
.await
.expect("Must be able to retrieve repositories for sync");
job_sync(&mut cache, &repositories);
job_index(&repositories);
job_index(&mut cache, &repositories);

Check warning on line 25 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L25

Added line #L25 was not covered by tests

cache.garbage_collection();
cache.garbage_collection_for_source_files();

Check warning on line 27 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L27

Added line #L27 was not covered by tests
} else {
let access = Arc::new(access);
let scheduler = JobScheduler::new()
Expand Down Expand Up @@ -52,8 +52,8 @@
.expect("Must be able to retrieve repositories for sync");

job_sync(&mut cache, &repositories);
job_index(&repositories);
cache.garbage_collection();
job_index(&mut cache, &repositories);
cache.garbage_collection_for_source_files();

Check warning on line 56 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L55-L56

Added lines #L55 - L56 were not covered by tests
})
})
.expect("Failed to create job"),
Expand All @@ -69,9 +69,9 @@
}
}

fn job_index(repositories: &[RepositoryConfig]) {
fn job_index(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {

Check warning on line 72 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L72

Added line #L72 was not covered by tests
println!("Indexing repositories...");
index::index_repositories(repositories);
index::index_repositories(cache, repositories);

Check warning on line 74 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L74

Added line #L74 was not covered by tests
}

fn job_sync(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {
Expand Down
Loading