Skip to content

Commit

Permalink
feat(scheduler): implement incremental indexing for tantivy
Browse files Browse the repository at this point in the history
  • Loading branch information
boxbeam committed May 7, 2024
1 parent 2c052f8 commit c1e516f
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 42 deletions.
3 changes: 3 additions & 0 deletions crates/tabby-common/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct CodeSearchSchema {
pub schema: Schema,
pub field_git_url: Field,
pub field_filepath: Field,
pub field_file_id: Field,
pub field_language: Field,
pub field_body: Field,
}
Expand All @@ -36,6 +37,7 @@ impl CodeSearchSchema {

let field_git_url = builder.add_text_field("git_url", STRING | STORED);
let field_filepath = builder.add_text_field("filepath", STRING | STORED);
let field_file_id = builder.add_text_field("file_id", STRING | STORED);
let field_language = builder.add_text_field("language", STRING | STORED);
let field_body = builder.add_text_field("body", code_options);
let schema = builder.build();
Expand All @@ -44,6 +46,7 @@ impl CodeSearchSchema {
schema,
field_git_url,
field_filepath,
field_file_id,
field_language,
field_body,
}
Expand Down
8 changes: 8 additions & 0 deletions crates/tabby-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ pub struct SourceFile {
}

impl SourceFile {
pub fn create_file_id(git_url: &str, filepath: &str) -> String {
format!("{}:{}", git_url, filepath)
}

pub fn file_id(&self) -> String {
Self::create_file_id(&self.git_url, &self.filepath)
}

pub fn files_jsonl() -> PathBuf {
dataset_dir().join("files.jsonl")
}
Expand Down
46 changes: 40 additions & 6 deletions crates/tabby-scheduler/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ use tracing::{info, warn};
use crate::code::CodeIntelligence;

const SOURCE_FILE_BUCKET_KEY: &str = "source_files";
const META_BUCKET_KEY: &str = "meta";

struct RepositoryMeta {
last_sync_commit: Option<String>,
}
const LAST_INDEX_COMMIT_BUCKET: &str = "last_index_commit";

fn cmd_stdout(path: &Path, cmd: &str, args: &[&str]) -> Result<String> {
Ok(String::from_utf8(
Expand All @@ -39,7 +35,11 @@ fn get_git_hash(path: &Path) -> Result<String> {
)?)
}

fn get_changed_files(path: &Path, since_commit: &str) -> Result<Vec<String>> {
pub fn get_current_commit_hash(path: &Path) -> Result<String> {
cmd_stdout(path, "git", &["rev-parse", "HEAD"])
}

pub fn get_changed_files(path: &Path, since_commit: &str) -> Result<Vec<String>> {
Ok(cmd_stdout(
path,
"git",
Expand Down Expand Up @@ -105,6 +105,40 @@ impl CacheStore {
}
}

pub fn get_last_index_commit(&self, repository: &RepositoryConfig) -> Option<String> {
self.store
.bucket(Some(LAST_INDEX_COMMIT_BUCKET))
.expect("Failed to access meta bucket")
.get(&repository.canonical_git_url())
.expect("Failed to read last index commit")
}

pub fn set_last_index_commit(&self, repository: &RepositoryConfig, commit: Option<String>) {
let bucket = self
.store
.bucket(Some(LAST_INDEX_COMMIT_BUCKET))
.expect("Failed to access meta bucket");
if let Some(commit) = commit {
bucket
.set(&repository.canonical_git_url(), &commit)
.expect("Failed to write last index commit");
} else {
bucket
.remove(&repository.git_url)
.expect("Failed to remove last index commit");
}
}

pub fn list_indexed_repositories(&self) -> Vec<RepositoryConfig> {
self.store
.bucket::<String, String>(Some(LAST_INDEX_COMMIT_BUCKET))
.expect("Failed to read meta bucket")
.iter()
.map(|item| item.unwrap().key().unwrap())
.map(|git_url| RepositoryConfig::new(git_url))
.collect()
}

pub fn get_source_file(
&mut self,
config: &RepositoryConfig,
Expand Down
171 changes: 135 additions & 36 deletions crates/tabby-scheduler/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
use std::{fs, io::IsTerminal, path::Path};

use ignore::Walk;
use kdam::BarExt;
use tabby_common::{
config::RepositoryConfig,
index::{register_tokenizers, CodeSearchSchema},
languages::get_language_by_ext,
path, SourceFile,
};
use tantivy::{directory::MmapDirectory, doc, Index};
use tantivy::{directory::MmapDirectory, doc, Index, IndexWriter, Term};
use tracing::{debug, warn};

use crate::{code::CodeIntelligence, utils::tqdm};
use crate::{
cache::{get_changed_files, get_current_commit_hash, CacheStore},
code::CodeIntelligence,
utils::tqdm,
};

// Magic numbers
static MAX_LINE_LENGTH_THRESHOLD: usize = 300;
static AVG_LINE_LENGTH_THRESHOLD: f32 = 150f32;

pub fn index_repositories(_config: &[RepositoryConfig]) {
pub fn index_repositories(config: &[RepositoryConfig]) {
let code = CodeSearchSchema::new();
let mut cache = CacheStore::new(tabby_common::path::cache_dir());

let index = open_or_create_index(&code, &path::index_dir());
register_tokenizers(&index);
Expand All @@ -25,43 +32,44 @@ pub fn index_repositories(_config: &[RepositoryConfig]) {
let mut writer = index
.writer(150_000_000)
.expect("Failed to create index writer");

let intelligence = CodeIntelligence::default();

writer
.delete_all_documents()
.expect("Failed to delete all documents");

let total_file_size: usize = SourceFile::all()
.filter(is_valid_file)
.map(|x| x.read_file_size())
.sum();

let mut pb = std::io::stdout()
.is_terminal()
.then(|| tqdm(total_file_size));

let intelligence = CodeIntelligence::default();
for file in SourceFile::all().filter(is_valid_file) {
let text = match file.read_content() {
Ok(content) => content,
Err(e) => {
warn!("Failed to read content of '{}': {}", file.filepath, e);
for repository in config {
let Some(commit) = cache.get_last_index_commit(repository) else {
index_repository_from_scratch(repository, &writer, &code, &intelligence, &mut cache);
continue;
};
let dir = repository.dir();
let changed_files = get_changed_files(&dir, &commit).expect("Failed read file diff");
for file in changed_files {
let path = dir.join(&file);
delete_indexed_source_file(&writer, &code, &repository.git_url, &file);
if !path.exists() {
continue;
}
};
let Some(source_file) = cache.get_source_file(repository, &path) else {
continue;
};
if !is_valid_file(&source_file) {
continue;
}
add_indexed_source_file(&writer, &source_file, &code, &intelligence);
}
cache.set_last_index_commit(
repository,
Some(get_current_commit_hash(&dir).expect("Failed to read current commit hash")),
);
}

for body in intelligence.chunks(&text) {
pb.as_mut()
.map(|b| b.update(body.len()))
.transpose()
.expect("Failed to update progress bar");

writer
.add_document(doc!(
code.field_git_url => file.git_url.clone(),
code.field_filepath => file.filepath.clone(),
code.field_language => file.language.clone(),
code.field_body => body,
))
.expect("Failed to add document");
for indexed_repository in cache.list_indexed_repositories() {
if !indexed_repository.dir().exists() {
cache.set_last_index_commit(&indexed_repository, None);
delete_all_indexed_files(&writer, &code, &indexed_repository.canonical_git_url());
}
}

Expand All @@ -71,9 +79,100 @@ pub fn index_repositories(_config: &[RepositoryConfig]) {
.expect("Failed to wait for merging threads");
}

fn is_valid_file(file: &SourceFile) -> bool {
file.max_line_length <= MAX_LINE_LENGTH_THRESHOLD
&& file.avg_line_length <= AVG_LINE_LENGTH_THRESHOLD
fn index_repository_from_scratch(
repository: &RepositoryConfig,
writer: &IndexWriter,
code: &CodeSearchSchema,
intelligence: &CodeIntelligence,
cache: &mut CacheStore,
) {
let mut pb = std::io::stdout().is_terminal().then(|| {
let total_file_size: usize = Walk::new(repository.dir())
.filter_map(|f| f.ok())
.map(|f| f.path().to_owned())
.filter(|f| {
f.extension()
.is_some_and(|ext| get_language_by_ext(ext).is_some())
})
.map(|f| read_file_size(&f))
.sum();
tqdm(total_file_size)
});

for file in Walk::new(repository.dir()) {
let file = file.expect("Failed to read file listing");
let Some(source_file) = cache.get_source_file(repository, file.path()) else {
continue;
};
if !is_valid_file(&source_file) {
continue;
}
add_indexed_source_file(writer, &source_file, code, intelligence);
pb.as_mut().map(|pb| {
pb.update(source_file.read_file_size())
.expect("Failed to update progress bar")
});
}
cache.set_last_index_commit(
repository,
Some(
get_current_commit_hash(&repository.dir()).expect("Failed to read current commit hash"),
),
);
}

fn read_file_size(path: &Path) -> usize {
std::fs::metadata(path)
.map(|meta| meta.len())
.unwrap_or_default() as usize
}

fn is_valid_file(source_file: &SourceFile) -> bool {
source_file.max_line_length <= MAX_LINE_LENGTH_THRESHOLD
&& source_file.avg_line_length <= AVG_LINE_LENGTH_THRESHOLD
}

pub fn delete_indexed_source_file(
writer: &IndexWriter,
code: &CodeSearchSchema,
git_url: &str,
filepath: &str,
) {
let file_id = SourceFile::create_file_id(git_url, filepath);
let term = Term::from_field_text(code.field_file_id.clone(), &file_id);
writer.delete_term(term);
}

pub fn delete_all_indexed_files(writer: &IndexWriter, code: &CodeSearchSchema, git_url: &str) {
let term = Term::from_field_text(code.field_git_url, git_url);
writer.delete_term(term);
}

pub fn add_indexed_source_file(
writer: &IndexWriter,
file: &SourceFile,
code: &CodeSearchSchema,
intelligence: &CodeIntelligence,
) -> usize {
let text = match file.read_content() {
Ok(content) => content,
Err(e) => {
warn!("Failed to read content of '{}': {}", file.filepath, e);
return 0;
}
};
for body in intelligence.chunks(&text) {
writer
.add_document(doc!(
code.field_git_url => file.git_url.clone(),
code.field_filepath => file.filepath.clone(),
code.field_file_id => file.file_id(),
code.field_language => file.language.clone(),
code.field_body => body,
))
.expect("Failed to add document");
}
text.len()
}

fn open_or_create_index(code: &CodeSearchSchema, path: &Path) -> Index {
Expand Down

0 comments on commit c1e516f

Please sign in to comment.