Skip to content

Commit

Permalink
feat: add custom multi part upload fs (#152)
Browse files Browse the repository at this point in the history
related: #148

---------

Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang authored Oct 10, 2024
1 parent 8b6ca71 commit 55a9909
Show file tree
Hide file tree
Showing 29 changed files with 3,367 additions and 59 deletions.
19 changes: 15 additions & 4 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <arrow/array/builder_primitive.h>
#include <arrow/util/key_value_metadata.h>
#include "filesystem/fs.h"
#include "common/config.h"

#define SKIP_IF_NOT_OK(status, st) \
if (!status.ok()) { \
Expand All @@ -50,20 +51,27 @@ class S3Fixture : public benchmark::Fixture {
const char* secret_key = std::getenv(kEnvSecretKey);
const char* endpoint_url = std::getenv(kEnvS3EndpointUrl);
const char* file_path = std::getenv(kEnvFilePath);
std::string uri = "file:///tmp";
auto conf = StorageConfig();
conf.uri = "file:///tmp/";
if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) {
uri = endpoint_url;
conf.uri = std::string(endpoint_url);
conf.access_key_id = std::string(access_key);
conf.access_key_value = std::string(secret_key);
conf.file_path = std::string(file_path);
}
storage_config_ = std::move(conf);

auto base = std::string();
auto factory = std::make_shared<FileSystemFactory>();
auto result = factory->BuildFileSystem(uri, &base);
auto result = factory->BuildFileSystem(conf, &base);
if (!result.ok()) {
state.SkipWithError("Failed to build file system!");
}
fs_ = std::move(result).value();
}

std::shared_ptr<arrow::fs::FileSystem> fs_;
StorageConfig storage_config_;
};

static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
Expand Down Expand Up @@ -123,7 +131,10 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
auto record_batch = arrow::RecordBatch::Make(schema, 3, arrays);

for (auto _ : st) {
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, *parquet::default_writer_properties());
auto conf = StorageConfig();
conf.use_custom_part_upload_size = true;
conf.part_size = 30 * 1024 * 1024;
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties());
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,35 @@
// limitations under the License.

#pragma once
#include <arrow/filesystem/filesystem.h>
#include <memory>
#include <string>

#include <sstream>

using namespace std;

namespace milvus_storage {

struct StorageConfig {
std::string uri = "";
std::string bucket_name = "";
std::string access_key_id = "";
std::string access_key_value = "";
std::string file_path = "";
std::string root_path = "";
std::string cloud_provider = "";
std::string region = "";
bool use_custom_part_upload_size = false;
int64_t part_size = 0;

std::string ToString() const {
std::stringstream ss;
ss << "[uri=" << uri << ", bucket_name=" << bucket_name << ", root_path=" << root_path
<< ", cloud_provider=" << cloud_provider << ", region=" << region
<< ", use_custom_part_upload_size=" << use_custom_part_upload_size << "]";

return ss.str();
}
};

static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB

// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
Expand All @@ -32,4 +55,4 @@ static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;

} // namespace milvus_storage
} // namespace milvus_storage
51 changes: 51 additions & 0 deletions cpp/include/milvus-storage/common/path_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <string>
#include "arrow/status.h"

namespace milvus_storage {

constexpr char kSep = '/';

arrow::Status NotAFile(std::string_view path) {
return arrow::Status::IOError("Not a regular file: " + std::string(path));
}

bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; }

std::string EnsureTrailingSlash(std::string_view v) {
if (!v.empty() && !HasTrailingSlash(v)) {
// XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"...
// Unless the local filesystem always uses absolute paths
return std::string(v) + kSep;
} else {
return std::string(v);
}
}

std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
// XXX should strip trailing slash?

auto pos = s.find_last_of(kSep);
if (pos == std::string::npos) {
// Empty parent
return {{}, s};
}
return {s.substr(0, pos), s.substr(pos + 1)};
}

} // namespace milvus_storage
5 changes: 5 additions & 0 deletions cpp/include/milvus-storage/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Status {

static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); }

static Status IOError(const std::string& msg) { return Status(kIOError, msg); }

bool ok() const { return code_ == kOk; }

bool IsArrowError() const { return code_ == kArrowError; }
Expand All @@ -49,6 +51,8 @@ class Status {

bool IsWriterError() const { return code_ == kWriterError; }

bool IsIOError() const { return code_ == kIOError; }

std::string ToString() const;

private:
Expand All @@ -59,6 +63,7 @@ class Status {
kInternalStateError = 3,
kFileNotFound = 4,
kWriterError = 5,
kIOError = 6,
};

explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}
Expand Down
15 changes: 8 additions & 7 deletions cpp/include/milvus-storage/filesystem/azure/azure_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,24 @@ class AzureFileSystemProducer : public FileSystemProducer {
public:
AzureFileSystemProducer(){};

Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const std::string& uri, std::string* out_path) override {
Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) override {
arrow::util::Uri uri_parser;
RETURN_ARROW_NOT_OK(uri_parser.Parse(uri));
RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri));

arrow::fs::AzureOptions options;
auto account = std::getenv("AZURE_STORAGE_ACCOUNT");
auto key = std::getenv("AZURE_SECRET_KEY");
if (account == nullptr || key == nullptr) {
auto account = storage_config.access_key_id;
auto key = storage_config.access_key_value;
if (account.empty() || key.empty()) {
return Status::InvalidArgument("Please provide azure storage account and azure secret key");
}
options.account_name = account;
RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(std::getenv("AZURE_SECRET_KEY")));
RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(key));

ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options));
fs->CreateDir(*out_path);
return std::shared_ptr<arrow::fs::FileSystem>(fs);
}
};

} // namespace milvus_storage
} // namespace milvus_storage
7 changes: 5 additions & 2 deletions cpp/include/milvus-storage/filesystem/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
#include <memory>
#include <string>
#include "common/result.h"
#include "common/config.h"

namespace milvus_storage {

class FileSystemProducer {
public:
virtual ~FileSystemProducer() = default;

virtual Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const std::string& uri, std::string* out_path) = 0;
virtual Result<std::shared_ptr<arrow::fs::FileSystem>> Make(const StorageConfig& storage_config,
std::string* out_path) = 0;

std::string UriToPath(const std::string& uri) {
arrow::util::Uri uri_parser;
Expand All @@ -41,7 +43,8 @@ class FileSystemProducer {

class FileSystemFactory {
public:
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path);
Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const StorageConfig& storage_config,
std::string* out_path);
};

} // namespace milvus_storage
26 changes: 26 additions & 0 deletions cpp/include/milvus-storage/filesystem/io/io_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include <arrow/io/interfaces.h>
#include <arrow/status.h>
#include <arrow/util/thread_pool.h>
#include "common/log.h"

namespace milvus_storage {

template <typename... SubmitArgs>
auto SubmitIO(arrow::io::IOContext io_context, SubmitArgs&&... submit_args)
-> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) {
arrow::internal::TaskHints hints;
hints.external_id = io_context.external_id();
return io_context.executor()->Submit(hints, io_context.stop_token(), std::forward<SubmitArgs>(submit_args)...);
};

void CloseFromDestructor(arrow::io::FileInterface* file) {
arrow::Status st = file->Close();
if (!st.ok()) {
auto file_type = typeid(*file).name();
std::stringstream ss;
ss << "When destroying file of type " << file_type << ": " << st.message();
LOG_STORAGE_FATAL_ << st.WithMessage(ss.str());
}
}

} // namespace milvus_storage
99 changes: 99 additions & 0 deletions cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>
#include <cstdlib>
#include "common/log.h"
#include "common/macro.h"

#include <arrow/util/key_value_metadata.h>
#include <arrow/filesystem/s3fs.h>
#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"
#include "arrow/io/interfaces.h"

using namespace arrow;
using ::arrow::fs::FileInfo;
using ::arrow::fs::FileInfoGenerator;

namespace milvus_storage {

class MultiPartUploadS3FS : public arrow::fs::S3FileSystem {
public:
~MultiPartUploadS3FS() override;

std::string type_name() const override { return "multiPartUploadS3"; }

bool Equals(const FileSystem& other) const override;

arrow::Result<std::string> PathFromUri(const std::string& uri_string) const override;

arrow::Result<FileInfo> GetFileInfo(const std::string& path) override;

arrow::Result<std::vector<arrow::fs::FileInfo>> GetFileInfo(const arrow::fs::FileSelector& select) override;

FileInfoGenerator GetFileInfoGenerator(const arrow::fs::FileSelector& select) override;

arrow::Status CreateDir(const std::string& path, bool recursive) override;

arrow::Status DeleteDir(const std::string& path) override;

arrow::Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override;

Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override;

arrow::Status DeleteRootDirContents() override;

arrow::Status DeleteFile(const std::string& path) override;

arrow::Status Move(const std::string& src, const std::string& dest) override;

arrow::Status CopyFile(const std::string& src, const std::string& dest) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStreamWithUploadSize(const std::string& s,
int64_t part_size);

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStreamWithUploadSize(
const std::string& s, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata, int64_t part_size);

static arrow::Result<std::shared_ptr<MultiPartUploadS3FS>> Make(
const arrow::fs::S3Options& options, const arrow::io::IOContext& = arrow::io::default_io_context());

arrow::Result<std::shared_ptr<arrow::io::InputStream>> OpenInputStream(const std::string& path) override;

arrow::Result<std::shared_ptr<arrow::io::InputStream>> OpenInputStream(const FileInfo& info) override;

arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> OpenInputFile(const std::string& s) override;

arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> OpenInputFile(const FileInfo& info) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenAppendStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;

protected:
explicit MultiPartUploadS3FS(const arrow::fs::S3Options& options, const arrow::io::IOContext& io_context);

class Impl;
std::shared_ptr<Impl> impl_;
};

} // namespace milvus_storage
Loading

0 comments on commit 55a9909

Please sign in to comment.