Skip to content

Commit

Permalink
enhance: optimize kv metadata and fix first large row group (#142)
Browse files Browse the repository at this point in the history
issue: #127

Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang authored Sep 3, 2024
1 parent 4575147 commit 8883c9c
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 56 deletions.
1 change: 1 addition & 0 deletions cpp/include/milvus-storage/format/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ class ParquetFileWriter : public FileWriter {
parquet::WriterProperties props_;
int64_t count_ = 0;
int row_group_num_ = 0;
std::vector<size_t> row_group_sizes_;
};
} // namespace milvus_storage
1 change: 1 addition & 0 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffset> needed_column_offsets_;
std::vector<std::vector<int64_t>> row_group_sizes_;
int read_count_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,10 @@ class SizeBasedSplitter : public SplitterPlugin {

void Init() override;

Result<std::vector<ColumnGroup>> SplitRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
std::vector<ColumnGroup> SplitRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);

std::vector<ColumnGroup> Split(const std::shared_ptr<arrow::RecordBatch>& record) override;

private:
void AddColumnGroup(const std::shared_ptr<arrow::RecordBatch>& record,
std::vector<ColumnGroup>& column_groups,
std::vector<int>& indices,
GroupId& group_id);
size_t max_group_size_;
static constexpr size_t SPLIT_THRESHOLD = 1024; // 1K
};
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/log.h"
#include "common/macro.h"
#include "format/parquet/file_writer.h"
#include <parquet/properties.h>
#include <memory>
#include <string>
#include "common/fs_util.h"
#include <boost/variant.hpp>

namespace milvus_storage {

Expand Down Expand Up @@ -63,7 +63,7 @@ Status ParquetFileWriter::WriteTable(const arrow::Table& table) {
Status ParquetFileWriter::WriteRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::vector<size_t>& batch_memory_sizes) {
auto WriteRowGroup = [&](const std::vector<std::shared_ptr<arrow::RecordBatch>>& batch, size_t group_size) -> Status {
kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(group_size));
row_group_sizes_.push_back(group_size);
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(batch));
RETURN_ARROW_NOT_OK(writer_->WriteTable(*table));
return Status::OK();
Expand All @@ -83,13 +83,17 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vector<std::shared_ptr<a
if (!current_batches.empty()) {
RETURN_ARROW_NOT_OK(WriteRowGroup(current_batches, current_size));
}
RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_));
return Status::OK();
}

int64_t ParquetFileWriter::count() { return count_; }

Status ParquetFileWriter::Close() {
std::stringstream ss;
copy(row_group_sizes_.begin(), row_group_sizes_.end(), std::ostream_iterator<int>(ss, ","));
std::string value = ss.str();
kv_metadata_->Append("row_group_sizes", value.substr(0, value.length() - 1));
RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_));
RETURN_ARROW_NOT_OK(writer_->Close());
return Status::OK();
}
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
file_readers_.emplace_back(std::move(result.value()));
}

auto parse_size = [&](const std::string& input) -> std::vector<int64_t> {
std::vector<int64_t> sizes;
std::string token;
std::stringstream ss(input);

while (std::getline(ss, token, ',')) {
sizes.push_back(std::stoll(token));
}
return sizes;
};
for (int i = 0; i < file_readers_.size(); ++i) {
row_group_sizes_.push_back(
parse_size(file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->value(0)));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
}
// Initialize table states and chunk manager
column_group_states_.resize(file_readers_.size(), ColumnGroupState(0, -1, 0));
chunk_manager_ = std::make_unique<ChunkManager>(needed_column_offsets_, 0);
Expand All @@ -70,7 +85,7 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
LOG_STORAGE_DEBUG_ << "No more row groups in file " << i;
return -1;
}
int64_t rg_size = std::stoll(reader->parquet_reader()->metadata()->key_value_metadata()->value(rg));
int64_t rg_size = row_group_sizes_[i][rg];
if (plan_buffer_size + rg_size >= buffer_available_) {
LOG_STORAGE_DEBUG_ << "buffer is full now " << i;
return -1;
Expand Down
83 changes: 39 additions & 44 deletions cpp/src/packed/splitter/size_based_splitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,65 +27,60 @@ SizeBasedSplitter::SizeBasedSplitter(size_t max_group_size) : max_group_size_(ma

void SizeBasedSplitter::Init() {}

Result<std::vector<ColumnGroup>> SizeBasedSplitter::SplitRecordBatches(
std::vector<ColumnGroup> SizeBasedSplitter::SplitRecordBatches(
const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
auto schema = batches[0]->schema();

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto merged_table, arrow::Table::FromRecordBatches(schema, batches));

std::vector<std::shared_ptr<Array>> arrays;
for (const auto& column : merged_table->columns()) {
// Concatenate all chunks of the current column
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto concatenated_array,
arrow::Concatenate(column->chunks(), arrow::default_memory_pool()));
arrays.push_back(concatenated_array);
}
std::shared_ptr<RecordBatch> batch =
RecordBatch::Make(merged_table->schema(), merged_table->num_rows(), std::move(arrays));
LOG_STORAGE_INFO_ << "split record batch: " << merged_table->num_rows();
return Split(batch);
}

std::vector<ColumnGroup> SizeBasedSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& record) {
if (!record) {
throw std::invalid_argument("RecordBatch is null");
if (batches.empty()) {
return {};
}
std::vector<ColumnGroup> column_groups;
std::vector<int> small_group_indices;
GroupId group_id = 0;
for (int i = 0; i < record->num_columns(); ++i) {
std::shared_ptr<arrow::Array> column = record->column(i);
if (!column) {
throw std::runtime_error("Column is null");
// calculate column sizes and rows
std::vector<size_t> column_sizes(batches[0]->num_columns(), 0);
std::vector<int64_t> column_rows(batches[0]->num_columns(), 0);
for (const auto& record : batches) {
for (int i = 0; i < record->num_columns(); ++i) {
std::shared_ptr<arrow::Array> column = record->column(i);
if (!column) {
throw std::runtime_error("Column is null");
}
column_sizes[i] += GetArrowArrayMemorySize(column);
column_rows[i] += record->num_rows();
}
size_t avg_size = GetArrowArrayMemorySize(column) / record->num_rows();
}

// split column indices into small and large groups
std::vector<std::vector<int>> group_indices;
std::vector<int> small_group_indices;
for (int i = 0; i < column_sizes.size(); ++i) {
size_t avg_size = column_sizes[i] / column_rows[i];
if (small_group_indices.size() >= max_group_size_) {
AddColumnGroup(record, column_groups, small_group_indices, group_id);
group_indices.push_back(small_group_indices);
small_group_indices.clear();
}

if (avg_size >= SPLIT_THRESHOLD) {
std::vector<int> indices = {i};
AddColumnGroup(record, column_groups, indices, group_id);
group_indices.push_back({i});
} else {
small_group_indices.push_back(i);
}
}
group_indices.push_back(small_group_indices);
small_group_indices.clear();

AddColumnGroup(record, column_groups, small_group_indices, group_id);
// create column groups
std::vector<ColumnGroup> column_groups;
for (auto& record : batches) {
for (GroupId group_id = 0; group_id < group_indices.size(); ++group_id) {
auto batch = record->SelectColumns(group_indices[group_id]).ValueOrDie();
if (column_groups.size() < group_indices.size()) {
column_groups.push_back(ColumnGroup(group_id, group_indices[group_id], batch));
} else {
column_groups[group_id].AddRecordBatch(batch);
}
}
}
return column_groups;
}

void SizeBasedSplitter::AddColumnGroup(const std::shared_ptr<arrow::RecordBatch>& record,
std::vector<ColumnGroup>& column_groups,
std::vector<int>& indices,
GroupId& group_id) {
if (indices.empty() || !record) {
return;
}
auto batch = record->SelectColumns(indices).ValueOrDie();
column_groups.push_back(ColumnGroup(group_id++, indices, batch));
indices.clear();
std::vector<ColumnGroup> SizeBasedSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& record) {
return SplitRecordBatches({record});
}

} // namespace milvus_storage
6 changes: 4 additions & 2 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,17 @@ Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>&

Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
std::vector<ColumnGroup> groups =
SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_).value();
SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_);
std::vector<std::vector<int>> group_indices;
for (GroupId i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
std::string group_path = file_path_ + "/" + std::to_string(i);
auto writer =
std::make_unique<ColumnGroupWriter>(i, group.Schema(), fs_, group_path, props_, group.GetOriginColumnIndices());
RETURN_NOT_OK(writer->Init());
RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0)));
for (auto& batch : group.GetRecordBatches()) {
RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0)));
}

max_heap_.emplace(i, group.GetMemoryUsage());
group_indices.emplace_back(group.GetOriginColumnIndices());
Expand Down

0 comments on commit 8883c9c

Please sign in to comment.