Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance]: use byte array for packed kv meta to reduce meta size #146

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cpp/include/milvus-storage/common/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,24 @@ static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";

class PackedMetaSerde {
public:
// Serialize a vector of size_t to a byte array and convert it to a string
static std::string serialize(const std::vector<size_t>& sizes) {
std::vector<uint8_t> byteArray(sizes.size() * sizeof(size_t));
std::memcpy(byteArray.data(), sizes.data(), byteArray.size());
return std::string(byteArray.begin(), byteArray.end());
}

// Deserialize a string back to a vector of size_t
static std::vector<size_t> deserialize(const std::string& input) {
std::vector<uint8_t> byteArray(input.begin(), input.end());
std::vector<size_t> sizes(byteArray.size() / sizeof(size_t));
std::memcpy(sizes.data(), byteArray.data(), byteArray.size());
return sizes;
}
};

} // namespace milvus_storage
2 changes: 1 addition & 1 deletion cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
std::unique_ptr<ChunkManager> chunk_manager_;
int64_t absolute_row_position_;
std::vector<ColumnOffset> needed_column_offsets_;
std::vector<std::vector<int64_t>> row_group_sizes_;
std::vector<std::vector<size_t>> row_group_sizes_;
int read_count_;
};

Expand Down
6 changes: 2 additions & 4 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,8 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vector<std::shared_ptr<a
int64_t ParquetFileWriter::count() { return count_; }

Status ParquetFileWriter::Close() {
std::stringstream ss;
copy(row_group_sizes_.begin(), row_group_sizes_.end(), std::ostream_iterator<int>(ss, ","));
std::string value = ss.str();
kv_metadata_->Append("row_group_sizes", value.substr(0, value.length() - 1));
std::string meta = PackedMetaSerde::serialize(row_group_sizes_);
kv_metadata_->Append(ROW_GROUP_SIZE_META_KEY, meta);
RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_));
RETURN_ARROW_NOT_OK(writer_->Close());
return Status::OK();
Expand Down
15 changes: 3 additions & 12 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <parquet/properties.h>
#include <memory>
#include "common/arrow_util.h"
#include "common/fs_util.h"
#include "common/log.h"
#include "packed/chunk_manager.h"

Expand Down Expand Up @@ -49,19 +50,9 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
file_readers_.emplace_back(std::move(result.value()));
}

auto parse_size = [&](const std::string& input) -> std::vector<int64_t> {
std::vector<int64_t> sizes;
std::string token;
std::stringstream ss(input);

while (std::getline(ss, token, ',')) {
sizes.push_back(std::stoll(token));
}
return sizes;
};
for (int i = 0; i < file_readers_.size(); ++i) {
row_group_sizes_.push_back(
parse_size(file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->value(0)));
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie()));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
}
// Initialize table states and chunk manager
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/packed/splitter/size_based_splitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ std::vector<ColumnGroup> SizeBasedSplitter::SplitRecordBatches(
for (const auto& record : batches) {
for (int i = 0; i < record->num_columns(); ++i) {
std::shared_ptr<arrow::Array> column = record->column(i);
if (!column) {
throw std::runtime_error("Column is null");
}
column_sizes[i] += GetArrowArrayMemorySize(column);
column_rows[i] += record->num_rows();
}
Expand Down
Loading