Skip to content

Commit

Permalink
init buffer for packed writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Sep 2, 2024
1 parent c330061 commit 9307d13
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 72 deletions.
2 changes: 2 additions & 0 deletions cpp/include/milvus-storage/common/fs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB
// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB

static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;

} // namespace milvus_storage
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#pragma once

#include "splitter_plugin.h"
#include "common/result.h"

namespace milvus_storage {

Expand All @@ -23,6 +24,8 @@ class SizeBasedSplitter : public SplitterPlugin {

void Init() override;

Result<std::vector<ColumnGroup>> SplitRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);

std::vector<ColumnGroup> Split(const std::shared_ptr<arrow::RecordBatch>& record) override;

private:
Expand Down
11 changes: 8 additions & 3 deletions cpp/include/milvus-storage/packed/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ class PackedRecordBatchWriter {
arrow::fs::FileSystem& fs,
std::string& file_path,
parquet::WriterProperties& props);
// Init with the first batch of record.
// Split the first batch into column groups and initialize ColumnGroupWriters.
Status Init(const std::shared_ptr<arrow::RecordBatch>& record);

// Put the record batch into the corresponding column group,
// , and write the maximum buffer of column group to the file.
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Status Close();

private:
// split first buffer into column groups based on column size
// and init column group writer and put column groups into max heap
Status splitAndWriteFirstBuffer();

Status writeWithSplitIndex(const std::shared_ptr<arrow::RecordBatch>& record, size_t batch_size);
Status balanceMaxHeap();

std::vector<std::shared_ptr<arrow::RecordBatch>> buffered_batches_;
bool size_split_done_;
size_t memory_limit_;
std::shared_ptr<arrow::Schema> schema_;
arrow::fs::FileSystem& fs_;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
uri_parser.password();
options.ConfigureAccessKey(std::getenv("ACCESS_KEY"), std::getenv("SECRET_KEY"));
*out_path = std::getenv("FILE_PATH");
if (std::getenv("REGION") != nullptr) {
options.region = std::getenv("REGION");
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(options));

return std::shared_ptr<arrow::fs::FileSystem>(fs);
Expand Down
32 changes: 18 additions & 14 deletions cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/log.h"
#include "common/macro.h"
#include "format/parquet/file_writer.h"
#include <parquet/properties.h>
Expand Down Expand Up @@ -61,23 +62,26 @@ Status ParquetFileWriter::WriteTable(const arrow::Table& table) {

Status ParquetFileWriter::WriteRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
const std::vector<size_t>& batch_memory_sizes) {
size_t current_group_size = 0;
std::vector<std::shared_ptr<arrow::RecordBatch>> current_group_batches;
auto WriteRowGroup = [&](const std::vector<std::shared_ptr<arrow::RecordBatch>>& batch, size_t group_size) -> Status {
kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(group_size));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(batch));
RETURN_ARROW_NOT_OK(writer_->WriteTable(*table));
return Status::OK();
};

size_t current_size = 0;
std::vector<std::shared_ptr<arrow::RecordBatch>> current_batches;
for (int i = 0; i < batches.size(); i++) {
if (current_group_size + batch_memory_sizes[i] >= DEFAULT_MAX_ROW_GROUP_SIZE) {
kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(current_group_size));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(current_group_batches));
RETURN_ARROW_NOT_OK(writer_->WriteTable(*table));
current_group_batches.clear();
current_group_size = 0;
if (current_size + batch_memory_sizes[i] >= DEFAULT_MAX_ROW_GROUP_SIZE && !current_batches.empty()) {
RETURN_ARROW_NOT_OK(WriteRowGroup(current_batches, current_size));
current_batches.clear();
current_size = 0;
}
current_group_batches.push_back(batches[i]);
current_group_size += batch_memory_sizes[i];
current_batches.push_back(batches[i]);
current_size += batch_memory_sizes[i];
}
if (!current_group_batches.empty()) {
kv_metadata_->Append(std::to_string(row_group_num_++), std::to_string(current_group_size));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto table, arrow::Table::FromRecordBatches(current_group_batches));
RETURN_ARROW_NOT_OK(writer_->WriteTable(*table));
if (!current_batches.empty()) {
RETURN_ARROW_NOT_OK(WriteRowGroup(current_batches, current_size));
}
RETURN_ARROW_NOT_OK(writer_->AddKeyValueMetadata(kv_metadata_));
return Status::OK();
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/packed/splitter/size_based_splitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,38 @@

#include "packed/splitter/size_based_splitter.h"
#include "common/arrow_util.h"
#include "common/log.h"
#include "common/macro.h"
#include "packed/column_group.h"
#include <stdexcept>
#include <arrow/table.h>
#include <arrow/array/concatenate.h>

namespace milvus_storage {

SizeBasedSplitter::SizeBasedSplitter(size_t max_group_size) : max_group_size_(max_group_size) {}

void SizeBasedSplitter::Init() {}

Result<std::vector<ColumnGroup>> SizeBasedSplitter::SplitRecordBatches(
const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
auto schema = batches[0]->schema();

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto merged_table, arrow::Table::FromRecordBatches(schema, batches));

std::vector<std::shared_ptr<Array>> arrays;
for (const auto& column : merged_table->columns()) {
// Concatenate all chunks of the current column
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto concatenated_array,
arrow::Concatenate(column->chunks(), arrow::default_memory_pool()));
arrays.push_back(concatenated_array);
}
std::shared_ptr<RecordBatch> batch =
RecordBatch::Make(merged_table->schema(), merged_table->num_rows(), std::move(arrays));
LOG_STORAGE_INFO_ << "split record batch: " << merged_table->num_rows();
return Split(batch);
}

std::vector<ColumnGroup> SizeBasedSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& record) {
if (!record) {
throw std::invalid_argument("RecordBatch is null");
Expand Down
94 changes: 41 additions & 53 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
#include "packed/writer.h"
#include <cstddef>
#include "common/log.h"
#include "common/macro.h"
#include "common/status.h"
#include "packed/column_group.h"
#include "packed/column_group_writer.h"
#include "packed/splitter/indices_based_splitter.h"
#include "packed/splitter/size_based_splitter.h"
#include "common/fs_util.h"
#include "common/arrow_util.h"

namespace milvus_storage {

Expand All @@ -35,40 +37,47 @@ PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit,
file_path_(file_path),
props_(props),
splitter_({}),
current_memory_usage_(0) {}
current_memory_usage_(0),
size_split_done_(false) {}

Status PackedRecordBatchWriter::Init(const std::shared_ptr<arrow::RecordBatch>& record) {
// split first batch into column groups
std::vector<ColumnGroup> groups = SizeBasedSplitter(record->num_columns()).Split(record);
Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>& record) {
size_t next_batch_size = GetRecordBatchMemorySize(record);
if (next_batch_size > memory_limit_) {
return Status::InvalidArgument("Provided record batch size exceeds memory limit");
}
if (!size_split_done_) {
if (current_memory_usage_ + next_batch_size < memory_limit_ / 2 || buffered_batches_.empty()) {
buffered_batches_.push_back(record);
current_memory_usage_ += next_batch_size;
return Status::OK();
} else {
size_split_done_ = true;
RETURN_NOT_OK(splitAndWriteFirstBuffer());
}
}
return writeWithSplitIndex(record, next_batch_size);
}

// init column group writer and
// put column groups into max heap
Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
std::vector<ColumnGroup> groups =
SizeBasedSplitter(buffered_batches_[0]->num_columns()).SplitRecordBatches(buffered_batches_).value();
std::vector<std::vector<int>> group_indices;
GroupId group_id = 0;
for (const ColumnGroup& group : groups) {
std::string group_path = file_path_ + "/" + std::to_string(group_id);
auto writer = std::make_unique<ColumnGroupWriter>(group_id, group.Schema(), fs_, group_path, props_,
group.GetOriginColumnIndices());
auto status = writer->Init();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to init column group writer: " << status.ToString();
return status;
}
current_memory_usage_ += group.GetMemoryUsage();
max_heap_.emplace(group_id, group.GetMemoryUsage());
status = writer->Write(group.GetRecordBatch(0));
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to write column group: " << group_id << ", " << status.ToString();
return status;
}
for (GroupId i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
std::string group_path = file_path_ + "/" + std::to_string(i);
auto writer =
std::make_unique<ColumnGroupWriter>(i, group.Schema(), fs_, group_path, props_, group.GetOriginColumnIndices());
RETURN_NOT_OK(writer->Init());
RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0)));

max_heap_.emplace(i, group.GetMemoryUsage());
group_indices.emplace_back(group.GetOriginColumnIndices());
group_writers_.emplace_back(std::move(writer));
group_id++;
}
splitter_ = IndicesBasedSplitter(group_indices);

// check memory usage limit
size_t min_memory_limit = group_id * (DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE);
size_t min_memory_limit = groups.size() * MIN_BUFFER_SIZE_PER_FILE;
if (memory_limit_ < min_memory_limit) {
return Status::InvalidArgument("Please provide at least " + std::to_string(min_memory_limit / 1024 / 1024) +
" MB of memory for packed writer.");
Expand All @@ -77,42 +86,29 @@ Status PackedRecordBatchWriter::Init(const std::shared_ptr<arrow::RecordBatch>&
return balanceMaxHeap();
}

Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>& record) {
Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow::RecordBatch>& record,
size_t next_batch_size) {
std::vector<ColumnGroup> column_groups = splitter_.Split(record);

// Calculate the total memory usage of the new column groups
size_t new_memory_usage = 0;
for (const ColumnGroup& group : column_groups) {
new_memory_usage += group.GetMemoryUsage();
}

// Flush column groups until there's enough room for the new column groups
// to ensure that memory usage stays strictly below the limit
while (current_memory_usage_ + new_memory_usage >= memory_limit_ && !max_heap_.empty()) {
while (current_memory_usage_ + next_batch_size >= memory_limit_ && !max_heap_.empty()) {
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_
<< ", flushing column group: " << max_heap_.top().first;
auto max_group = max_heap_.top();
current_memory_usage_ -= max_group.second;

ColumnGroupWriter* writer = group_writers_[max_group.first].get();
max_heap_.pop();
auto status = writer->Flush();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to flush column group: " << max_group.first << ", " << status.ToString();
return status;
}
RETURN_NOT_OK(writer->Flush());
}

// After flushing, add the new column groups if memory usage allows
for (const ColumnGroup& group : column_groups) {
current_memory_usage_ += group.GetMemoryUsage();
max_heap_.emplace(group.group_id(), group.GetMemoryUsage());
ColumnGroupWriter* writer = group_writers_[group.group_id()].get();
auto status = writer->Write(group.GetRecordBatch(0));
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to write column group: " << group.group_id() << ", " << status.ToString();
return status;
}
RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0)));
}
return balanceMaxHeap();
}
Expand All @@ -125,19 +121,11 @@ Status PackedRecordBatchWriter::Close() {
ColumnGroupWriter* writer = group_writers_[max_group.first].get();

LOG_STORAGE_DEBUG_ << "Flushing remaining column group: " << max_group.first;
auto status = writer->Flush();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to flush column group: " << max_group.first << ", " << status.ToString();
return status;
}
RETURN_NOT_OK(writer->Flush());
current_memory_usage_ -= max_group.second;
}
for (auto& writer : group_writers_) {
auto status = writer->Close();
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Failed to close column group writer: " << status.ToString();
return status;
}
RETURN_NOT_OK(writer->Close());
}
return Status::OK();
}
Expand Down
3 changes: 1 addition & 2 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ class PackedIntegrationTest : public PackedTestBase {

TEST_F(PackedIntegrationTest, WriteAndRead) {
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_);
EXPECT_TRUE(writer.Init(record_batch_).ok());
for (int i = 1; i < bath_size; ++i) {
for (int i = 0; i < bath_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
Expand Down

0 comments on commit 9307d13

Please sign in to comment.