Skip to content

Commit

Permalink
memory usage optimization for sparse vector (#1011)
Browse files Browse the repository at this point in the history
* sparse: remove raw data cache in sparse inverted index

To reduce memory usage, remove the raw data cache in sparse inverted index.
Note that config param `drop_ratio_build` and GetVectorByIds() are also be
removed.

Signed-off-by: Shawn Wang <[email protected]>

* sparse: quantize float to uint16_t for BM25 inverted index

To reduce the memory usage of the inverted index, when BM25 metric is used,
quantize term frequency from float to uint16_t. All values that exceeds the
maximum of uint16_t is quantized to the maximum of uint16_t.

Signed-off-by: Shawn Wang <[email protected]>

---------

Signed-off-by: Shawn Wang <[email protected]>
  • Loading branch information
sparknack authored Jan 7, 2025
1 parent 9a6a8df commit 8380a96
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 444 deletions.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ include(cmake/libs/libhnsw.cmake)

include_directories(thirdparty/faiss)

find_package(Boost REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})

find_package(OpenMP REQUIRED)

find_package(folly REQUIRED)
Expand Down Expand Up @@ -177,6 +180,7 @@ endif()
include_directories(src)
include_directories(include)

list(APPEND KNOWHERE_LINKER_LIBS Boost::boost)
list(APPEND KNOWHERE_LINKER_LIBS faiss)
list(APPEND KNOWHERE_LINKER_LIBS glog::glog)
list(APPEND KNOWHERE_LINKER_LIBS nlohmann_json::nlohmann_json)
Expand Down
97 changes: 95 additions & 2 deletions include/knowhere/sparse_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <cstddef>
#include <cstdint>
#include <cstring>
Expand Down Expand Up @@ -59,6 +60,33 @@ GetDocValueBM25Computer(float k1, float b, float avgdl) {
};
}

// A docid filter that tests whether a given id is in the list of docids, which is regarded as another form of BitSet.
// Note that all ids to be tested must be tested exactly once and in order.
class DocIdFilterByVector {
public:
DocIdFilterByVector(std::vector<table_t>&& docids) : docids_(std::move(docids)) {
std::sort(docids_.begin(), docids_.end());
}

[[nodiscard]] bool
test(const table_t id) {
// find the first id that is greater than or equal to the specific id
while (pos_ < docids_.size() && docids_[pos_] < id) {
++pos_;
}
return !(pos_ < docids_.size() && docids_[pos_] == id);
}

[[nodiscard]] bool
empty() const {
return docids_.empty();
}

private:
std::vector<table_t> docids_;
size_t pos_ = 0;
};

template <typename T>
class SparseRow {
static_assert(std::is_same_v<T, fp32>, "SparseRow supports float only");
Expand All @@ -72,6 +100,15 @@ class SparseRow {
SparseRow(size_t count, uint8_t* data, bool own_data) : data_(data), count_(count), own_data_(own_data) {
}

SparseRow(const std::vector<std::pair<table_t, T>>& data) : count_(data.size()), own_data_(true) {
data_ = new uint8_t[count_ * element_size()];
for (size_t i = 0; i < count_; ++i) {
auto* elem = reinterpret_cast<ElementProxy*>(data_) + i;
elem->index = data[i].first;
elem->value = data[i].second;
}
}

// copy constructor and copy assignment operator perform deep copy
SparseRow(const SparseRow<T>& other) : SparseRow(other.count_) {
std::memcpy(data_, other.data_, data_byte_size());
Expand Down Expand Up @@ -147,6 +184,9 @@ class SparseRow {

void
set_at(size_t i, table_t index, T value) {
if (i >= count_) {
throw std::out_of_range("set_at on a SparseRow with invalid index");
}
auto* elem = reinterpret_cast<ElementProxy*>(data_) + i;
elem->index = index;
elem->value = value;
Expand Down Expand Up @@ -300,12 +340,12 @@ class GrowableVectorView {
mmap_element_count_ = 0;
}

size_type
[[nodiscard]] size_type
capacity() const {
return mmap_byte_size_ / sizeof(T);
}

size_type
[[nodiscard]] size_type
size() const {
return mmap_element_count_;
}
Expand Down Expand Up @@ -346,6 +386,59 @@ class GrowableVectorView {
return reinterpret_cast<const T*>(mmap_data_)[i];
}

class iterator : public boost::iterator_facade<iterator, T, boost::random_access_traversal_tag, T&> {
public:
iterator() = default;
explicit iterator(T* ptr) : ptr_(ptr) {
}

friend class GrowableVectorView;
friend class boost::iterator_core_access;

T&
dereference() const {
return *ptr_;
}

void
increment() {
++ptr_;
}

void
decrement() {
--ptr_;
}

void
advance(std::ptrdiff_t n) {
ptr_ += n;
}

std::ptrdiff_t
distance_to(const iterator& other) const {
return other.ptr_ - ptr_;
}

bool
equal(const iterator& other) const {
return ptr_ == other.ptr_;
}

private:
T* ptr_ = nullptr;
};

iterator
begin() const {
return iterator(reinterpret_cast<T*>(mmap_data_));
}

iterator
end() const {
return iterator(reinterpret_cast<T*>(mmap_data_) + mmap_element_count_);
}

private:
void* mmap_data_ = nullptr;
size_type mmap_byte_size_ = 0;
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_readme():
get_numpy_include(),
os.path.join("..", "include"),
os.path.join("..", "thirdparty"),
get_thirdparty_prefix("boost-headers") + "/include",
get_thirdparty_prefix("nlohmann_json") + "/include",
get_thirdparty_prefix("libglog") + "/include",
get_thirdparty_prefix("gflags") + "/include"
Expand Down
89 changes: 31 additions & 58 deletions src/index/sparse/sparse_index_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "knowhere/config.h"
#include "knowhere/dataset.h"
#include "knowhere/expected.h"
#include "knowhere/feature.h"
#include "knowhere/index/index_factory.h"
#include "knowhere/index/index_node.h"
#include "knowhere/log.h"
Expand Down Expand Up @@ -54,14 +53,12 @@ class SparseInvertedIndexNode : public IndexNode {
LOG_KNOWHERE_ERROR_ << Type() << " only support metric_type IP or BM25";
return Status::invalid_metric_type;
}
auto drop_ratio_build = cfg.drop_ratio_build.value_or(0.0f);
auto index_or = CreateIndex</*mmapped=*/false>(cfg);
if (!index_or.has_value()) {
return index_or.error();
}
auto index = index_or.value();
index->Train(static_cast<const sparse::SparseRow<T>*>(dataset->GetTensor()), dataset->GetRows(),
drop_ratio_build);
index->Train(static_cast<const sparse::SparseRow<T>*>(dataset->GetTensor()), dataset->GetRows());
if (index_ != nullptr) {
LOG_KNOWHERE_WARNING_ << Type() << " has already been created, deleting old";
DeleteExistingIndex();
Expand Down Expand Up @@ -127,7 +124,7 @@ class SparseInvertedIndexNode : public IndexNode {
public:
RefineIterator(const sparse::BaseInvertedIndex<T>* index, sparse::SparseRow<T>&& query,
std::shared_ptr<PrecomputedDistanceIterator> precomputed_it,
const sparse::DocValueComputer<T>& computer, bool use_knowhere_search_pool = true,
const sparse::DocValueComputer<float>& computer, bool use_knowhere_search_pool = true,
const float refine_ratio = 0.5f)
: IndexIterator(true, use_knowhere_search_pool, refine_ratio),
index_(index),
Expand Down Expand Up @@ -158,7 +155,7 @@ class SparseInvertedIndexNode : public IndexNode {
private:
const sparse::BaseInvertedIndex<T>* index_;
sparse::SparseRow<T> query_;
const sparse::DocValueComputer<T> computer_;
const sparse::DocValueComputer<float> computer_;
std::shared_ptr<PrecomputedDistanceIterator> precomputed_it_;
bool first_return_ = true;
};
Expand All @@ -185,7 +182,7 @@ class SparseInvertedIndexNode : public IndexNode {
auto computer = computer_or.value();
auto drop_ratio_search = cfg.drop_ratio_search.value_or(0.0f);

const bool approximated = index_->IsApproximated() || drop_ratio_search > 0;
const bool approximated = drop_ratio_search > 0;

auto vec = std::vector<std::shared_ptr<IndexNode::iterator>>(nq, nullptr);
try {
Expand Down Expand Up @@ -228,37 +225,17 @@ class SparseInvertedIndexNode : public IndexNode {

[[nodiscard]] expected<DataSetPtr>
GetVectorByIds(const DataSetPtr dataset) const override {
if (!index_) {
return expected<DataSetPtr>::Err(Status::empty_index, "index not loaded");
}

auto rows = dataset->GetRows();
auto ids = dataset->GetIds();

auto data = std::make_unique<sparse::SparseRow<T>[]>(rows);
int64_t dim = 0;
try {
for (int64_t i = 0; i < rows; ++i) {
auto& target = data[i];
index_->GetVectorById(ids[i], target);
dim = std::max(dim, target.dim());
}
} catch (std::exception& e) {
return expected<DataSetPtr>::Err(Status::invalid_args, "GetVectorByIds failed");
}
auto res = GenResultDataSet(rows, dim, data.release());
res->SetIsSparse(true);
return res;
return expected<DataSetPtr>::Err(Status::not_implemented, "GetVectorByIds not implemented");
}

[[nodiscard]] bool
HasRawData(const std::string& metric_type) const override {
return true;
return false;
}

[[nodiscard]] expected<DataSetPtr>
GetIndexMeta(std::unique_ptr<Config> cfg) const override {
throw std::runtime_error("GetIndexMeta not supported for current index type");
return expected<DataSetPtr>::Err(Status::not_implemented, "GetIndexMeta not supported for current index type");
}

Status
Expand Down Expand Up @@ -292,7 +269,7 @@ class SparseInvertedIndexNode : public IndexNode {
return index_or.error();
}
index_ = index_or.value();
return index_->Load(reader);
return index_->Load(reader, 0, "");
}

Status
Expand All @@ -301,29 +278,36 @@ class SparseInvertedIndexNode : public IndexNode {
LOG_KNOWHERE_WARNING_ << Type() << " has already been created, deleting old";
DeleteExistingIndex();
}

auto cfg = static_cast<const knowhere::SparseInvertedIndexConfig&>(*config);
auto index_or = CreateIndex</*mmapped=*/true>(cfg);
if (!index_or.has_value()) {
return index_or.error();
}
index_ = index_or.value();

auto reader = knowhere::FileReader(filename);
map_size_ = reader.size();
size_t map_size = reader.size();
int map_flags = MAP_SHARED;
#ifdef MAP_POPULATE
if (cfg.enable_mmap_pop.has_value() && cfg.enable_mmap_pop.value()) {
map_flags |= MAP_POPULATE;
}
#endif
map_ = static_cast<char*>(mmap(nullptr, map_size_, PROT_READ, map_flags, reader.descriptor(), 0));
if (map_ == MAP_FAILED) {
LOG_KNOWHERE_ERROR_ << "Failed to mmap file: " << strerror(errno);
void* mapped_memory = mmap(nullptr, map_size, PROT_READ, map_flags, reader.descriptor(), 0);
if (mapped_memory == MAP_FAILED) {
LOG_KNOWHERE_ERROR_ << "Failed to mmap file " << filename << ": " << strerror(errno);
return Status::disk_file_error;
}
if (madvise(map_, map_size_, MADV_RANDOM) != 0) {
LOG_KNOWHERE_WARNING_ << "Failed to madvise file: " << strerror(errno);
}
auto index_or = CreateIndex</*mmapped=*/true>(cfg);
if (!index_or.has_value()) {
return index_or.error();
}
index_ = index_or.value();
MemoryIOReader map_reader((uint8_t*)map_, map_size_);

auto cleanup_mmap = [map_size, filename](void* map_addr) {
if (munmap(map_addr, map_size) != 0) {
LOG_KNOWHERE_ERROR_ << "Failed to munmap file " << filename << ": " << strerror(errno);
}
};
std::unique_ptr<void, decltype(cleanup_mmap)> mmap_guard(mapped_memory, cleanup_mmap);

MemoryIOReader map_reader(reinterpret_cast<uint8_t*>(mapped_memory), map_size);
auto supplement_target_filename = filename + ".knowhere_sparse_index_supplement";
return index_->Load(map_reader, map_flags, supplement_target_filename);
}
Expand Down Expand Up @@ -364,7 +348,8 @@ class SparseInvertedIndexNode : public IndexNode {
expected<sparse::BaseInvertedIndex<T>*>
CreateIndex(const SparseInvertedIndexConfig& cfg) const {
if (IsMetricType(cfg.metric_type.value(), metric::BM25)) {
auto idx = new sparse::InvertedIndex<T, use_wand, true, mmapped>();
// quantize float to uint16_t when BM25 metric type is used.
auto idx = new sparse::InvertedIndex<T, uint16_t, use_wand, true, mmapped>();
if (!cfg.bm25_k1.has_value() || !cfg.bm25_b.has_value() || !cfg.bm25_avgdl.has_value()) {
return expected<sparse::BaseInvertedIndex<T>*>::Err(
Status::invalid_args, "BM25 parameters k1, b, and avgdl must be set when building/loading");
Expand All @@ -376,7 +361,7 @@ class SparseInvertedIndexNode : public IndexNode {
idx->SetBM25Params(k1, b, avgdl, max_score_ratio);
return idx;
} else {
return new sparse::InvertedIndex<T, use_wand, false, mmapped>();
return new sparse::InvertedIndex<T, T, use_wand, false, mmapped>();
}
}

Expand All @@ -386,23 +371,11 @@ class SparseInvertedIndexNode : public IndexNode {
delete index_;
index_ = nullptr;
}
if (map_ != nullptr) {
auto res = munmap(map_, map_size_);
if (res != 0) {
LOG_KNOWHERE_ERROR_ << "Failed to munmap when trying to delete index: " << strerror(errno);
}
map_ = nullptr;
map_size_ = 0;
}
}

sparse::BaseInvertedIndex<T>* index_{};
std::shared_ptr<ThreadPool> search_pool_;
std::shared_ptr<ThreadPool> build_pool_;

// if map_ is not nullptr, it means the index is mmapped from disk.
char* map_ = nullptr;
size_t map_size_ = 0;
}; // class SparseInvertedIndexNode

// Concurrent version of SparseInvertedIndexNode
Expand Down
Loading

0 comments on commit 8380a96

Please sign in to comment.