From 8883c9cbde85d3a38d18d720ea6c5a5f3ce0069a Mon Sep 17 00:00:00 2001 From: sthuang <167743503+shaoting-huang@users.noreply.github.com> Date: Tue, 3 Sep 2024 19:28:56 +0800 Subject: [PATCH] enhance: optimize kv metadata and fix first large row group (#142) issue: #127 Signed-off-by: shaoting-huang --- .../format/parquet/file_writer.h | 1 + cpp/include/milvus-storage/packed/reader.h | 1 + .../packed/splitter/size_based_splitter.h | 7 +- cpp/src/format/parquet/file_writer.cpp | 10 ++- cpp/src/packed/reader.cpp | 17 +++- .../packed/splitter/size_based_splitter.cpp | 83 +++++++++---------- cpp/src/packed/writer.cpp | 6 +- 7 files changed, 69 insertions(+), 56 deletions(-) diff --git a/cpp/include/milvus-storage/format/parquet/file_writer.h b/cpp/include/milvus-storage/format/parquet/file_writer.h index 3d8dd02..f010f4d 100644 --- a/cpp/include/milvus-storage/format/parquet/file_writer.h +++ b/cpp/include/milvus-storage/format/parquet/file_writer.h @@ -58,5 +58,6 @@ class ParquetFileWriter : public FileWriter { parquet::WriterProperties props_; int64_t count_ = 0; int row_group_num_ = 0; + std::vector row_group_sizes_; }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index 9b630c6..04cb49b 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -74,6 +74,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { std::unique_ptr chunk_manager_; int64_t absolute_row_position_; std::vector needed_column_offsets_; + std::vector> row_group_sizes_; int read_count_; }; diff --git a/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h b/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h index 2cebe1b..da8bd30 100644 --- a/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h +++ b/cpp/include/milvus-storage/packed/splitter/size_based_splitter.h @@ -24,15 +24,10 @@ class SizeBasedSplitter : public SplitterPlugin { void Init() override; - Result> SplitRecordBatches(const std::vector>& batches); + std::vector SplitRecordBatches(const std::vector>& batches); std::vector Split(const std::shared_ptr& record) override; - private: - void AddColumnGroup(const std::shared_ptr& record, - std::vector& column_groups, - std::vector& indices, - GroupId& group_id); size_t max_group_size_; static constexpr size_t SPLIT_THRESHOLD = 1024; // 1K }; diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index 767c517..9de74c0 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "common/log.h" #include "common/macro.h" #include "format/parquet/file_writer.h" #include #include #include #include "common/fs_util.h" +#include namespace milvus_storage { @@ -63,7 +63,7 @@ Status ParquetFileWriter::WriteTable(const arrow::Table& table) { Status ParquetFileWriter::WriteRecordBatches(const std::vector>& batches, const std::vector& batch_memory_sizes) { auto WriteRowGroup = [&](const std::vector>& batch, size_t group_size) -> Status { - kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(group_size)); + row_group_sizes_.push_back(group_size); ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(batch)); RETURN_ARROW_NOT_OK(writer_->WriteTable(*table)); return Status::OK(); @@ -83,13 +83,17 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vectorAddKeyValueMetadata(kv_metadata_)); return Status::OK(); } int64_t ParquetFileWriter::count() { return count_; } Status ParquetFileWriter::Close() { + std::stringstream ss; + copy(row_group_sizes_.begin(), row_group_sizes_.end(), std::ostream_iterator(ss, ",")); + std::string value = ss.str(); + kv_metadata_->Append("row_group_sizes", value.substr(0, value.length() - 1)); + RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_)); RETURN_ARROW_NOT_OK(writer_->Close()); return Status::OK(); } diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index c0e2086..6ecbb9a 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -48,6 +48,21 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, file_readers_.emplace_back(std::move(result.value())); } + auto parse_size = [&](const std::string& input) -> std::vector { + std::vector sizes; + std::string token; + std::stringstream ss(input); + + while (std::getline(ss, token, ',')) { + sizes.push_back(std::stoll(token)); + } + return sizes; + }; + for (int i = 0; i < file_readers_.size(); ++i) { + row_group_sizes_.push_back( + parse_size(file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->value(0))); + LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size(); + } // Initialize table states and chunk manager column_group_states_.resize(file_readers_.size(), ColumnGroupState(0, -1, 0)); chunk_manager_ = std::make_unique(needed_column_offsets_, 0); @@ -70,7 +85,7 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() { LOG_STORAGE_DEBUG_ << "No more row groups in file " << i; return -1; } - int64_t rg_size = std::stoll(reader->parquet_reader()->metadata()->key_value_metadata()->value(rg)); + int64_t rg_size = row_group_sizes_[i][rg]; if (plan_buffer_size + rg_size >= buffer_available_) { LOG_STORAGE_DEBUG_ << "buffer is full now " << i; return -1; diff --git a/cpp/src/packed/splitter/size_based_splitter.cpp b/cpp/src/packed/splitter/size_based_splitter.cpp index 2530dd9..afc0622 100644 --- a/cpp/src/packed/splitter/size_based_splitter.cpp +++ b/cpp/src/packed/splitter/size_based_splitter.cpp @@ -27,65 +27,60 @@ SizeBasedSplitter::SizeBasedSplitter(size_t max_group_size) : max_group_size_(ma void SizeBasedSplitter::Init() {} -Result> SizeBasedSplitter::SplitRecordBatches( +std::vector SizeBasedSplitter::SplitRecordBatches( const std::vector>& batches) { - auto schema = batches[0]->schema(); - - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto merged_table, arrow::Table::FromRecordBatches(schema, batches)); - - std::vector> arrays; - for (const auto& column : merged_table->columns()) { - // Concatenate all chunks of the current column - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto concatenated_array, - arrow::Concatenate(column->chunks(), arrow::default_memory_pool())); - arrays.push_back(concatenated_array); - } - std::shared_ptr batch = - RecordBatch::Make(merged_table->schema(), merged_table->num_rows(), std::move(arrays)); - LOG_STORAGE_INFO_ << "split record batch: " << merged_table->num_rows(); - return Split(batch); -} - -std::vector SizeBasedSplitter::Split(const std::shared_ptr& record) { - if (!record) { - throw std::invalid_argument("RecordBatch is null"); + if (batches.empty()) { + return {}; } - std::vector column_groups; - std::vector small_group_indices; - GroupId group_id = 0; - for (int i = 0; i < record->num_columns(); ++i) { - std::shared_ptr column = record->column(i); - if (!column) { - throw std::runtime_error("Column is null"); + // calculate column sizes and rows + std::vector column_sizes(batches[0]->num_columns(), 0); + std::vector column_rows(batches[0]->num_columns(), 0); + for (const auto& record : batches) { + for (int i = 0; i < record->num_columns(); ++i) { + std::shared_ptr column = record->column(i); + if (!column) { + throw std::runtime_error("Column is null"); + } + column_sizes[i] += GetArrowArrayMemorySize(column); + column_rows[i] += record->num_rows(); } - size_t avg_size = GetArrowArrayMemorySize(column) / record->num_rows(); + } + // split column indices into small and large groups + std::vector> group_indices; + std::vector small_group_indices; + for (int i = 0; i < column_sizes.size(); ++i) { + size_t avg_size = column_sizes[i] / column_rows[i]; if (small_group_indices.size() >= max_group_size_) { - AddColumnGroup(record, column_groups, small_group_indices, group_id); + group_indices.push_back(small_group_indices); + small_group_indices.clear(); } - if (avg_size >= SPLIT_THRESHOLD) { - std::vector indices = {i}; - AddColumnGroup(record, column_groups, indices, group_id); + group_indices.push_back({i}); } else { small_group_indices.push_back(i); } } + group_indices.push_back(small_group_indices); + small_group_indices.clear(); - AddColumnGroup(record, column_groups, small_group_indices, group_id); + // create column groups + std::vector column_groups; + for (auto& record : batches) { + for (GroupId group_id = 0; group_id < group_indices.size(); ++group_id) { + auto batch = record->SelectColumns(group_indices[group_id]).ValueOrDie(); + if (column_groups.size() < group_indices.size()) { + column_groups.push_back(ColumnGroup(group_id, group_indices[group_id], batch)); + } else { + column_groups[group_id].AddRecordBatch(batch); + } + } + } return column_groups; } -void SizeBasedSplitter::AddColumnGroup(const std::shared_ptr& record, - std::vector& column_groups, - std::vector& indices, - GroupId& group_id) { - if (indices.empty() || !record) { - return; - } - auto batch = record->SelectColumns(indices).ValueOrDie(); - column_groups.push_back(ColumnGroup(group_id++, indices, batch)); - indices.clear(); +std::vector SizeBasedSplitter::Split(const std::shared_ptr& record) { + return SplitRecordBatches({record}); } } // namespace milvus_storage diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index a10b6ef..e4853a8 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -60,7 +60,7 @@ Status PackedRecordBatchWriter::Write(const std::shared_ptr& Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { std::vector groups = - SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_).value(); + SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_); std::vector> group_indices; for (GroupId i = 0; i < groups.size(); ++i) { auto& group = groups[i]; @@ -68,7 +68,9 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { auto writer = std::make_unique(i, group.Schema(), fs_, group_path, props_, group.GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); - RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0))); + for (auto& batch : group.GetRecordBatches()) { + RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0))); + } max_heap_.emplace(i, group.GetMemoryUsage()); group_indices.emplace_back(group.GetOriginColumnIndices());