Skip to content

Commit

Permalink
byte array kv meta for packed
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Sep 10, 2024
1 parent 3b9c278 commit b6526dc
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
20 changes: 20 additions & 0 deletions cpp/include/milvus-storage/common/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,24 @@ static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";

class PackedMetaSerde {
public:
// Serialize a vector of size_t to a byte array and convert it to a string
static std::string serialize(const std::vector<size_t>& sizes) {
std::vector<uint8_t> byteArray(sizes.size() * sizeof(size_t));
std::memcpy(byteArray.data(), sizes.data(), byteArray.size());
return std::string(byteArray.begin(), byteArray.end());
}

// Deserialize a string back to a vector of size_t
static std::vector<size_t> deserialize(const std::string& input) {
std::vector<uint8_t> byteArray(input.begin(), input.end());
std::vector<size_t> sizes(byteArray.size() / sizeof(size_t));
std::memcpy(sizes.data(), byteArray.data(), byteArray.size());
return sizes;
}
};

} // namespace milvus_storage
5 changes: 2 additions & 3 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vector<std::shared_ptr<a
int64_t ParquetFileWriter::count() { return count_; }

Status ParquetFileWriter::Close() {
std::vector<uint8_t> byteArray(row_group_sizes_.size() * sizeof(size_t));
std::memcpy(byteArray.data(), row_group_sizes_.data(), byteArray.size());
kv_metadata_->Append("row_group_sizes", std::string(byteArray.begin(), byteArray.end()));
std::string meta = PackedMetaSerde::serialize(row_group_sizes_);
kv_metadata_->Append(ROW_GROUP_SIZE_META_KEY, meta);
RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_));
RETURN_ARROW_NOT_OK(writer_->Close());
return Status::OK();
Expand Down
11 changes: 3 additions & 8 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <parquet/properties.h>
#include <memory>
#include "common/arrow_util.h"
#include "common/fs_util.h"
#include "common/log.h"
#include "packed/chunk_manager.h"

Expand Down Expand Up @@ -49,15 +50,9 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
file_readers_.emplace_back(std::move(result.value()));
}

auto parse_size = [&](const std::string& input) -> std::vector<size_t> {
std::vector<uint8_t> byteArray = std::vector<uint8_t>(input.begin(), input.end());
std::vector<size_t> vec(byteArray.size() / sizeof(size_t));
std::memcpy(vec.data(), byteArray.data(), byteArray.size());
return vec;
};
for (int i = 0; i < file_readers_.size(); ++i) {
row_group_sizes_.push_back(
parse_size(file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->value(0)));
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie()));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
}
// Initialize table states and chunk manager
Expand Down

0 comments on commit b6526dc

Please sign in to comment.