diff --git a/cpp/include/milvus-storage/common/fs_util.h b/cpp/include/milvus-storage/common/fs_util.h index c307eb2..51ecc00 100644 --- a/cpp/include/milvus-storage/common/fs_util.h +++ b/cpp/include/milvus-storage/common/fs_util.h @@ -30,4 +30,24 @@ static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE; +static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size"; + +class PackedMetaSerde { + public: + // Serialize a vector of size_t to a byte array and convert it to a string + static std::string serialize(const std::vector& sizes) { + std::vector byteArray(sizes.size() * sizeof(size_t)); + std::memcpy(byteArray.data(), sizes.data(), byteArray.size()); + return std::string(byteArray.begin(), byteArray.end()); + } + + // Deserialize a string back to a vector of size_t + static std::vector deserialize(const std::string& input) { + std::vector byteArray(input.begin(), input.end()); + std::vector sizes(byteArray.size() / sizeof(size_t)); + std::memcpy(sizes.data(), byteArray.data(), byteArray.size()); + return sizes; + } +}; + } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index 04cb49b..3aab931 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -74,7 +74,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { std::unique_ptr chunk_manager_; int64_t absolute_row_position_; std::vector needed_column_offsets_; - std::vector> row_group_sizes_; + std::vector> row_group_sizes_; int read_count_; }; diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index 9de74c0..1360cae 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -89,10 +89,8 @@ Status ParquetFileWriter::WriteRecordBatches(const std::vector(ss, ",")); - std::string value = ss.str(); - kv_metadata_->Append("row_group_sizes", value.substr(0, value.length() - 1)); + std::string meta = PackedMetaSerde::serialize(row_group_sizes_); + kv_metadata_->Append(ROW_GROUP_SIZE_META_KEY, meta); RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_)); RETURN_ARROW_NOT_OK(writer_->Close()); return Status::OK(); diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index aea2eb5..621ad19 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -19,6 +19,7 @@ #include #include #include "common/arrow_util.h" +#include "common/fs_util.h" #include "common/log.h" #include "packed/chunk_manager.h" @@ -49,19 +50,9 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, file_readers_.emplace_back(std::move(result.value())); } - auto parse_size = [&](const std::string& input) -> std::vector { - std::vector sizes; - std::string token; - std::stringstream ss(input); - - while (std::getline(ss, token, ',')) { - sizes.push_back(std::stoll(token)); - } - return sizes; - }; for (int i = 0; i < file_readers_.size(); ++i) { - row_group_sizes_.push_back( - parse_size(file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->value(0))); + auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY); + row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie())); LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size(); } // Initialize table states and chunk manager diff --git a/cpp/src/packed/splitter/size_based_splitter.cpp b/cpp/src/packed/splitter/size_based_splitter.cpp index afc0622..263c75e 100644 --- a/cpp/src/packed/splitter/size_based_splitter.cpp +++ b/cpp/src/packed/splitter/size_based_splitter.cpp @@ -38,9 +38,6 @@ std::vector SizeBasedSplitter::SplitRecordBatches( for (const auto& record : batches) { for (int i = 0; i < record->num_columns(); ++i) { std::shared_ptr column = record->column(i); - if (!column) { - throw std::runtime_error("Column is null"); - } column_sizes[i] += GetArrowArrayMemorySize(column); column_rows[i] += record->num_rows(); }