Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): implement incremental indexing for tantivy #2062

Merged
merged 11 commits into from
May 7, 2024
4 changes: 4 additions & 0 deletions crates/tabby-common/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct CodeSearchSchema {
pub schema: Schema,
pub field_git_url: Field,
pub field_filepath: Field,
/// Indexed field uniquely identifying a file in a repository, stringified SourceFileKey
pub field_source_file_key: Field,
pub field_language: Field,
pub field_body: Field,
}
Expand All @@ -36,6 +38,7 @@ impl CodeSearchSchema {

let field_git_url = builder.add_text_field("git_url", STRING | STORED);
let field_filepath = builder.add_text_field("filepath", STRING | STORED);
let field_source_file_key = builder.add_text_field("file_id", STRING | STORED);
let field_language = builder.add_text_field("language", STRING | STORED);
let field_body = builder.add_text_field("body", code_options);
let schema = builder.build();
Expand All @@ -44,6 +47,7 @@ impl CodeSearchSchema {
schema,
field_git_url,
field_filepath,
field_source_file_key,
field_language,
field_body,
}
Expand Down
85 changes: 73 additions & 12 deletions crates/tabby-scheduler/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
fs::read_to_string,
path::{Path, PathBuf},
process::Command,
str::FromStr,
};

use anyhow::{bail, Context, Result};
Expand All @@ -13,24 +14,31 @@
use crate::code::CodeIntelligence;

const SOURCE_FILE_BUCKET_KEY: &str = "source_files";
const INDEX_BUCKET_KEY: &str = "indexed_files";

fn cmd_stdout(cmd: &str, args: &[&str]) -> Result<String> {
Ok(
String::from_utf8(Command::new(cmd).args(args).output()?.stdout)?
.trim()
.to_string(),

Check warning on line 23 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L19-L23

Added lines #L19 - L23 were not covered by tests
)
}

Check warning on line 25 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L25

Added line #L25 was not covered by tests

fn get_git_hash(path: &Path) -> Result<String> {
let path = path.display().to_string();
let output = Command::new("git").args(["hash-object", &path]).output()?;
Ok(String::from_utf8(output.stdout)?.trim().to_string())
cmd_stdout("git", &["hash-object", &path.display().to_string()])

Check warning on line 28 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L28

Added line #L28 was not covered by tests
}

#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Debug)]

Check warning on line 31 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L31

Added line #L31 was not covered by tests
struct SourceFileKey {
path: PathBuf,
language: String,
git_hash: String,
}

impl TryFrom<&str> for SourceFileKey {
type Error = serde_json::Error;
impl FromStr for SourceFileKey {
type Err = serde_json::Error;

fn try_from(s: &str) -> Result<Self, Self::Error> {
fn from_str(s: &str) -> Result<Self, Self::Err> {

Check warning on line 41 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L41

Added line #L41 was not covered by tests
serde_json::from_str(s)
}
}
Expand Down Expand Up @@ -75,6 +83,59 @@
}
}

fn index_bucket(&self) -> Bucket<String, String> {
self.store
.bucket(Some(INDEX_BUCKET_KEY))
.expect("Failed to access indexed files bucket")
}

Check warning on line 90 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L86-L90

Added lines #L86 - L90 were not covered by tests

pub fn check_indexed(&self, path: &Path) -> (String, bool) {
let key = SourceFileKey::try_from(path)
.expect("Failed to create source file key")
.to_string();
let indexed = self
.index_bucket()
.contains(&key)
.expect("Failed to read index bucket");
(key, indexed)
}

Check warning on line 101 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L92-L101

Added lines #L92 - L101 were not covered by tests

pub fn set_indexed(&self, batch: Batch<String, String>) {
boxbeam marked this conversation as resolved.
Show resolved Hide resolved
self.index_bucket()
.batch(batch)
.expect("Failed to commit batched index update")
}

Check warning on line 107 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L103-L107

Added lines #L103 - L107 were not covered by tests

pub fn garbage_collection_for_indexed_files(&self, key_remover: impl Fn(&String)) {
info!("Started cleaning up 'indexed_files' bucket");
let bucket = self.index_bucket();
let mut batch = Batch::new();

let mut num_keep = 0;
let mut num_removed = 0;

bucket
.iter()
.filter_map(|item| {
let item = item.expect("Failed to read item");
let item_key: String = item.key().expect("Failed to get key");
if is_item_key_matched(&item_key) {
num_keep += 1;
None

Check warning on line 124 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L109-L124

Added lines #L109 - L124 were not covered by tests
} else {
num_removed += 1;
Some(item_key)

Check warning on line 127 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L126-L127

Added lines #L126 - L127 were not covered by tests
}
})
.inspect(key_remover)
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!("Finished garbage collection for 'indexed_files': {num_keep} items kept, {num_removed} items removed");
bucket
.batch(batch)
.expect("Failed to execute batched delete");
}

Check warning on line 137 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L129-L137

Added lines #L129 - L137 were not covered by tests

pub fn get_source_file(
&mut self,
config: &RepositoryConfig,
Expand Down Expand Up @@ -103,8 +164,8 @@
}
}

pub fn garbage_collection(&self) {
info!("Running garbage collection");
pub fn garbage_collection_for_source_files(&self) {
info!("Started cleaning up 'source_files' bucket");

Check warning on line 168 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L167-L168

Added lines #L167 - L168 were not covered by tests
let bucket: Bucket<String, Json<SourceFile>> = self
.store
.bucket(Some(SOURCE_FILE_BUCKET_KEY))
Expand All @@ -124,21 +185,21 @@
None
} else {
num_removed += 1;
Some(item.key().expect("Failed to get key"))
Some(item_key)

Check warning on line 188 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L188

Added line #L188 was not covered by tests
}
})
.for_each(|key| batch.remove(&key).expect("Failed to remove key"));

info!(
"Finished garbage collection: {} items kept, {} items removed",
"Finished garbage collection for 'source_files': {} items kept, {} items removed",

Check warning on line 194 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L194

Added line #L194 was not covered by tests
num_keep, num_removed
);
bucket.batch(batch).expect("to batch remove staled files");
}
}

fn is_item_key_matched(item_key: &str) -> bool {
let Ok(key) = SourceFileKey::try_from(item_key) else {
let Ok(key) = item_key.parse::<SourceFileKey>() else {

Check warning on line 202 in crates/tabby-scheduler/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/cache.rs#L202

Added line #L202 was not covered by tests
return false;
};

Expand Down
103 changes: 76 additions & 27 deletions crates/tabby-scheduler/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use std::{fs, io::IsTerminal, path::Path};

use ignore::Walk;
use kdam::BarExt;
use kv::Batch;
use tabby_common::{
config::RepositoryConfig,
index::{register_tokenizers, CodeSearchSchema},
path, SourceFile,
};
use tantivy::{directory::MmapDirectory, doc, Index};
use tantivy::{directory::MmapDirectory, doc, Index, Term};
use tracing::{debug, warn};

use crate::{code::CodeIntelligence, utils::tqdm};
use crate::{cache::CacheStore, code::CodeIntelligence, utils::tqdm};

// Magic numbers
static MAX_LINE_LENGTH_THRESHOLD: usize = 300;
static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32;

pub fn index_repositories(_config: &[RepositoryConfig]) {
pub fn index_repositories(cache: &mut CacheStore, config: &[RepositoryConfig]) {

Check warning on line 20 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L20

Added line #L20 was not covered by tests
boxbeam marked this conversation as resolved.
Show resolved Hide resolved
let code = CodeSearchSchema::new();

let index = open_or_create_index(&code, &path::index_dir());
Expand All @@ -25,9 +27,6 @@
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");
writer
.delete_all_documents()
.expect("Failed to delete all documents");

let total_file_size: usize = SourceFile::all()
.filter(is_valid_file)
Expand All @@ -39,36 +38,86 @@
.then(|| tqdm(total_file_size));

let intelligence = CodeIntelligence::default();
for file in SourceFile::all().filter(is_valid_file) {
let text = match file.read_content() {
Ok(content) => content,
Err(e) => {
warn!("Failed to read content of '{}': {}", file.filepath, e);
let mut indexed_files_batch = Batch::new();
for repository in config {
for file in Walk::new(repository.dir()) {
let file = match file {
Ok(file) => file,
Err(e) => {
warn!("Failed to walk file tree for indexing: {e}");
continue;

Check warning on line 48 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L41-L48

Added lines #L41 - L48 were not covered by tests
}
};
let Some(source_file) = cache.get_source_file(repository, file.path()) else {
continue;

Check warning on line 52 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
};
if !is_valid_file(&source_file) {
continue;
}
let (file_id, indexed) = cache.check_indexed(file.path());

if indexed {

Check warning on line 59 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L54-L59

Added lines #L54 - L59 were not covered by tests
continue;
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc!(
code.field_git_url => file.git_url.clone(),
code.field_filepath => file.filepath.clone(),
code.field_language => file.language.clone(),
code.field_body => body,
))
.expect("Failed to add document");
let text = match source_file.read_content() {
Ok(content) => content,
Err(e) => {
warn!(
"Failed to read content of '{}': {}",
source_file.filepath, e
);
continue;

Check warning on line 69 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L62-L69

Added lines #L62 - L69 were not covered by tests
}
};

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc! {
code.field_git_url => source_file.git_url.clone(),
code.field_source_file_key => file_id.to_string(),
code.field_filepath => source_file.filepath.clone(),
code.field_language => source_file.language.clone(),
code.field_body => body,
})
.expect("Failed to add document");
}
indexed_files_batch
.set(&file_id, &String::new())
.expect("Failed to mark file as indexed");

Check warning on line 91 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L73-L91

Added lines #L73 - L91 were not covered by tests
}
}

// Commit updating indexed documents
writer.commit().expect("Failed to commit index");
boxbeam marked this conversation as resolved.
Show resolved Hide resolved
writer
.wait_merging_threads()
.expect("Failed to wait for merging threads");

// Mark all indexed documents as indexed
cache.set_indexed(indexed_files_batch);

// Create a new writer to commit deletion of removed indexed files
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");

cache.garbage_collection_for_indexed_files(|key| {
writer.delete_term(Term::from_field_text(code.field_source_file_key, key));
});

// Commit garbage collection
writer
.commit()
.expect("Failed to commit garbage collection");
boxbeam marked this conversation as resolved.
Show resolved Hide resolved

writer
.wait_merging_threads()
.expect("Failed to wait for merging threads on garbage collection");

Check warning on line 120 in crates/tabby-scheduler/src/index.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/index.rs#L100-L120

Added lines #L100 - L120 were not covered by tests
}

fn is_valid_file(file: &SourceFile) -> bool {
Expand Down
12 changes: 6 additions & 6 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
.await
.expect("Must be able to retrieve repositories for sync");
job_sync(&mut cache, &repositories);
job_index(&repositories);
job_index(&mut cache, &repositories);

Check warning on line 25 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L25

Added line #L25 was not covered by tests

cache.garbage_collection();
cache.garbage_collection_for_source_files();

Check warning on line 27 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L27

Added line #L27 was not covered by tests
} else {
let access = Arc::new(access);
let scheduler = JobScheduler::new()
Expand Down Expand Up @@ -52,8 +52,8 @@
.expect("Must be able to retrieve repositories for sync");

job_sync(&mut cache, &repositories);
job_index(&repositories);
cache.garbage_collection();
job_index(&mut cache, &repositories);
cache.garbage_collection_for_source_files();

Check warning on line 56 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L55-L56

Added lines #L55 - L56 were not covered by tests
})
})
.expect("Failed to create job"),
Expand All @@ -69,9 +69,9 @@
}
}

fn job_index(repositories: &[RepositoryConfig]) {
fn job_index(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {

Check warning on line 72 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L72

Added line #L72 was not covered by tests
println!("Indexing repositories...");
index::index_repositories(repositories);
index::index_repositories(cache, repositories);

Check warning on line 74 in crates/tabby-scheduler/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/tabby-scheduler/src/lib.rs#L74

Added line #L74 was not covered by tests
}

fn job_sync(cache: &mut CacheStore, repositories: &[RepositoryConfig]) {
Expand Down
Loading