diff --git a/cpp/benchmark/benchmark_packed.cpp b/cpp/benchmark/benchmark_packed.cpp index b391ef9..634d3ab 100644 --- a/cpp/benchmark/benchmark_packed.cpp +++ b/cpp/benchmark/benchmark_packed.cpp @@ -30,6 +30,7 @@ #include #include #include "filesystem/fs.h" +#include "common/config.h" #define SKIP_IF_NOT_OK(status, st) \ if (!status.ok()) { \ @@ -50,13 +51,19 @@ class S3Fixture : public benchmark::Fixture { const char* secret_key = std::getenv(kEnvSecretKey); const char* endpoint_url = std::getenv(kEnvS3EndpointUrl); const char* file_path = std::getenv(kEnvFilePath); - std::string uri = "file:///tmp"; + auto conf = StorageConfig(); + conf.uri = "file:///tmp/"; if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && file_path != nullptr) { - uri = endpoint_url; + conf.uri = std::string(endpoint_url); + conf.access_key_id = std::string(access_key); + conf.access_key_value = std::string(secret_key); + conf.file_path = std::string(file_path); } + storage_config_ = std::move(conf); + auto base = std::string(); auto factory = std::make_shared(); - auto result = factory->BuildFileSystem(uri, &base); + auto result = factory->BuildFileSystem(conf, &base); if (!result.ok()) { state.SkipWithError("Failed to build file system!"); } @@ -64,6 +71,7 @@ class S3Fixture : public benchmark::Fixture { } std::shared_ptr fs_; + StorageConfig storage_config_; }; static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) { @@ -123,7 +131,10 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s auto record_batch = arrow::RecordBatch::Make(schema, 3, arrays); for (auto _ : st) { - PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, *parquet::default_writer_properties()); + auto conf = StorageConfig(); + conf.use_custom_part_upload_size = true; + conf.part_size = 30 * 1024 * 1024; + PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties()); for (int i = 0; i < 8 * 1024; ++i) { auto r = writer.Write(record_batch); if (!r.ok()) { diff --git a/cpp/include/milvus-storage/packed/utils/config.h b/cpp/include/milvus-storage/common/config.h similarity index 63% rename from cpp/include/milvus-storage/packed/utils/config.h rename to cpp/include/milvus-storage/common/config.h index c6cab2d..27b5ebc 100644 --- a/cpp/include/milvus-storage/packed/utils/config.h +++ b/cpp/include/milvus-storage/common/config.h @@ -13,12 +13,35 @@ // limitations under the License. #pragma once -#include -#include -#include + +#include + +using namespace std; namespace milvus_storage { +struct StorageConfig { + std::string uri = ""; + std::string bucket_name = ""; + std::string access_key_id = ""; + std::string access_key_value = ""; + std::string file_path = ""; + std::string root_path = ""; + std::string cloud_provider = ""; + std::string region = ""; + bool use_custom_part_upload_size = false; + int64_t part_size = 0; + + std::string ToString() const { + std::stringstream ss; + ss << "[uri=" << uri << ", bucket_name=" << bucket_name << ", root_path=" << root_path + << ", cloud_provider=" << cloud_provider << ", region=" << region + << ", use_custom_part_upload_size=" << use_custom_part_upload_size << "]"; + + return ss.str(); + } +}; + static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB // https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596 @@ -32,4 +55,4 @@ static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size"; static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024; static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024; -} // namespace milvus_storage +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/common/path_util.h b/cpp/include/milvus-storage/common/path_util.h new file mode 100644 index 0000000..1bbec94 --- /dev/null +++ b/cpp/include/milvus-storage/common/path_util.h @@ -0,0 +1,51 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include "arrow/status.h" + +namespace milvus_storage { + +constexpr char kSep = '/'; + +arrow::Status NotAFile(std::string_view path) { + return arrow::Status::IOError("Not a regular file: " + std::string(path)); +} + +bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; } + +std::string EnsureTrailingSlash(std::string_view v) { + if (!v.empty() && !HasTrailingSlash(v)) { + // XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"... + // Unless the local filesystem always uses absolute paths + return std::string(v) + kSep; + } else { + return std::string(v); + } +} + +std::pair GetAbstractPathParent(const std::string& s) { + // XXX should strip trailing slash? + + auto pos = s.find_last_of(kSep); + if (pos == std::string::npos) { + // Empty parent + return {{}, s}; + } + return {s.substr(0, pos), s.substr(pos + 1)}; +} + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/common/status.h b/cpp/include/milvus-storage/common/status.h index db99911..fa6d42c 100644 --- a/cpp/include/milvus-storage/common/status.h +++ b/cpp/include/milvus-storage/common/status.h @@ -37,6 +37,8 @@ class Status { static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); } + static Status IOError(const std::string& msg) { return Status(kIOError, msg); } + bool ok() const { return code_ == kOk; } bool IsArrowError() const { return code_ == kArrowError; } @@ -49,6 +51,8 @@ class Status { bool IsWriterError() const { return code_ == kWriterError; } + bool IsIOError() const { return code_ == kIOError; } + std::string ToString() const; private: @@ -59,6 +63,7 @@ class Status { kInternalStateError = 3, kFileNotFound = 4, kWriterError = 5, + kIOError = 6, }; explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {} diff --git a/cpp/include/milvus-storage/filesystem/azure/azure_fs.h b/cpp/include/milvus-storage/filesystem/azure/azure_fs.h index 9f11331..b0c18bb 100644 --- a/cpp/include/milvus-storage/filesystem/azure/azure_fs.h +++ b/cpp/include/milvus-storage/filesystem/azure/azure_fs.h @@ -26,18 +26,19 @@ class AzureFileSystemProducer : public FileSystemProducer { public: AzureFileSystemProducer(){}; - Result> Make(const std::string& uri, std::string* out_path) override { + Result> Make(const StorageConfig& storage_config, + std::string* out_path) override { arrow::util::Uri uri_parser; - RETURN_ARROW_NOT_OK(uri_parser.Parse(uri)); + RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri)); arrow::fs::AzureOptions options; - auto account = std::getenv("AZURE_STORAGE_ACCOUNT"); - auto key = std::getenv("AZURE_SECRET_KEY"); - if (account == nullptr || key == nullptr) { + auto account = storage_config.access_key_id; + auto key = storage_config.access_key_value; + if (account.empty() || key.empty()) { return Status::InvalidArgument("Please provide azure storage account and azure secret key"); } options.account_name = account; - RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(std::getenv("AZURE_SECRET_KEY"))); + RETURN_ARROW_NOT_OK(options.ConfigureAccountKeyCredential(key)); ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::AzureFileSystem::Make(options)); fs->CreateDir(*out_path); @@ -45,4 +46,4 @@ class AzureFileSystemProducer : public FileSystemProducer { } }; -} // namespace milvus_storage \ No newline at end of file +} // namespace milvus_storage diff --git a/cpp/include/milvus-storage/filesystem/fs.h b/cpp/include/milvus-storage/filesystem/fs.h index 17b5f1b..482f7ec 100644 --- a/cpp/include/milvus-storage/filesystem/fs.h +++ b/cpp/include/milvus-storage/filesystem/fs.h @@ -19,6 +19,7 @@ #include #include #include "common/result.h" +#include "common/config.h" namespace milvus_storage { @@ -26,7 +27,8 @@ class FileSystemProducer { public: virtual ~FileSystemProducer() = default; - virtual Result> Make(const std::string& uri, std::string* out_path) = 0; + virtual Result> Make(const StorageConfig& storage_config, + std::string* out_path) = 0; std::string UriToPath(const std::string& uri) { arrow::util::Uri uri_parser; @@ -41,7 +43,8 @@ class FileSystemProducer { class FileSystemFactory { public: - Result> BuildFileSystem(const std::string& uri, std::string* out_path); + Result> BuildFileSystem(const StorageConfig& storage_config, + std::string* out_path); }; } // namespace milvus_storage diff --git a/cpp/include/milvus-storage/filesystem/io/io_util.h b/cpp/include/milvus-storage/filesystem/io/io_util.h new file mode 100644 index 0000000..880c539 --- /dev/null +++ b/cpp/include/milvus-storage/filesystem/io/io_util.h @@ -0,0 +1,26 @@ +#include +#include +#include +#include "common/log.h" + +namespace milvus_storage { + +template +auto SubmitIO(arrow::io::IOContext io_context, SubmitArgs&&... submit_args) + -> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) { + arrow::internal::TaskHints hints; + hints.external_id = io_context.external_id(); + return io_context.executor()->Submit(hints, io_context.stop_token(), std::forward(submit_args)...); +}; + +void CloseFromDestructor(arrow::io::FileInterface* file) { + arrow::Status st = file->Close(); + if (!st.ok()) { + auto file_type = typeid(*file).name(); + std::stringstream ss; + ss << "When destroying file of type " << file_type << ": " << st.message(); + LOG_STORAGE_FATAL_ << st.WithMessage(ss.str()); + } +} + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h b/cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h new file mode 100644 index 0000000..99fbc4a --- /dev/null +++ b/cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h @@ -0,0 +1,99 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include "common/log.h" +#include "common/macro.h" + +#include +#include +#include "arrow/filesystem/filesystem.h" +#include "arrow/util/macros.h" +#include "arrow/util/uri.h" +#include "arrow/io/interfaces.h" + +using namespace arrow; +using ::arrow::fs::FileInfo; +using ::arrow::fs::FileInfoGenerator; + +namespace milvus_storage { + +class MultiPartUploadS3FS : public arrow::fs::S3FileSystem { + public: + ~MultiPartUploadS3FS() override; + + std::string type_name() const override { return "multiPartUploadS3"; } + + bool Equals(const FileSystem& other) const override; + + arrow::Result PathFromUri(const std::string& uri_string) const override; + + arrow::Result GetFileInfo(const std::string& path) override; + + arrow::Result> GetFileInfo(const arrow::fs::FileSelector& select) override; + + FileInfoGenerator GetFileInfoGenerator(const arrow::fs::FileSelector& select) override; + + arrow::Status CreateDir(const std::string& path, bool recursive) override; + + arrow::Status DeleteDir(const std::string& path) override; + + arrow::Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; + + Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override; + + arrow::Status DeleteRootDirContents() override; + + arrow::Status DeleteFile(const std::string& path) override; + + arrow::Status Move(const std::string& src, const std::string& dest) override; + + arrow::Status CopyFile(const std::string& src, const std::string& dest) override; + + arrow::Result> OpenOutputStreamWithUploadSize(const std::string& s, + int64_t part_size); + + arrow::Result> OpenOutputStreamWithUploadSize( + const std::string& s, const std::shared_ptr& metadata, int64_t part_size); + + static arrow::Result> Make( + const arrow::fs::S3Options& options, const arrow::io::IOContext& = arrow::io::default_io_context()); + + arrow::Result> OpenInputStream(const std::string& path) override; + + arrow::Result> OpenInputStream(const FileInfo& info) override; + + arrow::Result> OpenInputFile(const std::string& s) override; + + arrow::Result> OpenInputFile(const FileInfo& info) override; + + arrow::Result> OpenOutputStream( + const std::string& path, const std::shared_ptr& metadata) override; + + arrow::Result> OpenAppendStream( + const std::string& path, const std::shared_ptr& metadata) override; + + protected: + explicit MultiPartUploadS3FS(const arrow::fs::S3Options& options, const arrow::io::IOContext& io_context); + + class Impl; + std::shared_ptr impl_; +}; + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/filesystem/s3/s3_fs.h b/cpp/include/milvus-storage/filesystem/s3/s3_fs.h index baa8505..68d7984 100644 --- a/cpp/include/milvus-storage/filesystem/s3/s3_fs.h +++ b/cpp/include/milvus-storage/filesystem/s3/s3_fs.h @@ -20,6 +20,7 @@ #include "common/log.h" #include "common/macro.h" #include "filesystem/fs.h" +#include "filesystem/s3/multi_part_upload_s3_fs.h" namespace milvus_storage { @@ -27,9 +28,10 @@ class S3FileSystemProducer : public FileSystemProducer { public: S3FileSystemProducer(){}; - Result> Make(const std::string& uri, std::string* out_path) override { + Result> Make(const StorageConfig& storage_config, + std::string* out_path) override { arrow::util::Uri uri_parser; - RETURN_ARROW_NOT_OK(uri_parser.Parse(uri)); + RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri)); if (!arrow::fs::IsS3Initialized()) { arrow::fs::S3GlobalOptions global_options; @@ -44,10 +46,15 @@ class S3FileSystemProducer : public FileSystemProducer { arrow::fs::S3Options options; options.endpoint_override = uri_parser.ToString(); - options.ConfigureAccessKey(std::getenv("ACCESS_KEY"), std::getenv("SECRET_KEY")); + options.ConfigureAccessKey(storage_config.access_key_id, storage_config.access_key_value); - if (std::getenv("REGION") != nullptr) { - options.region = std::getenv("REGION"); + if (!storage_config.region.empty()) { + options.region = storage_config.region; + } + + if (storage_config.use_custom_part_upload_size) { + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, MultiPartUploadS3FS::Make(options)); + return std::shared_ptr(fs); } ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(options)); @@ -55,4 +62,4 @@ class S3FileSystemProducer : public FileSystemProducer { } }; -} // namespace milvus_storage \ No newline at end of file +} // namespace milvus_storage diff --git a/cpp/include/milvus-storage/filesystem/s3/s3_internal.h b/cpp/include/milvus-storage/filesystem/s3/s3_internal.h new file mode 100644 index 0000000..a58bcda --- /dev/null +++ b/cpp/include/milvus-storage/filesystem/s3/s3_internal.h @@ -0,0 +1,273 @@ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/s3fs.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" +#include "arrow/util/print.h" +#include "arrow/util/string.h" + +namespace arrow { +namespace fs { +namespace internal { + +// XXX Should we expose this at some point? +enum class S3Backend { Amazon, Minio, Other }; + +// Detect the S3 backend type from the S3 server's response headers +inline S3Backend DetectS3Backend(const Aws::Http::HeaderValueCollection& headers) { + const auto it = headers.find("server"); + if (it != headers.end()) { + const auto& value = std::string_view(it->second); + if (value.find("AmazonS3") != std::string::npos) { + return S3Backend::Amazon; + } + if (value.find("MinIO") != std::string::npos) { + return S3Backend::Minio; + } + } + return S3Backend::Other; +} + +template +inline S3Backend DetectS3Backend(const Aws::Client::AWSError& error) { + return DetectS3Backend(error.GetResponseHeaders()); +} + +template +inline bool IsConnectError(const Aws::Client::AWSError& error) { + if (error.ShouldRetry()) { + return true; + } + // Sometimes Minio may fail with a 503 error + // (exception name: XMinioServerNotInitialized, + // message: "Server not initialized, please try again") + if (error.GetExceptionName() == "XMinioServerNotInitialized") { + return true; + } + return false; +} + +template +inline std::optional BucketRegionFromError(const Aws::Client::AWSError& error) { + if constexpr (std::is_same_v) { + const auto& headers = error.GetResponseHeaders(); + const auto it = headers.find("x-amz-bucket-region"); + if (it != headers.end()) { + const std::string region(it->second.begin(), it->second.end()); + return region; + } + } + return std::nullopt; +} + +inline bool IsNotFound(const Aws::Client::AWSError& error) { + const auto error_type = error.GetErrorType(); + return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET || error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND); +} + +inline bool IsAlreadyExists(const Aws::Client::AWSError& error) { + const auto error_type = error.GetErrorType(); + return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS || + error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU); +} + +inline std::string S3ErrorToString(Aws::S3::S3Errors error_type) { + switch (error_type) { +#define S3_ERROR_CASE(NAME) \ + case Aws::S3::S3Errors::NAME: \ + return #NAME; + + S3_ERROR_CASE(INCOMPLETE_SIGNATURE) + S3_ERROR_CASE(INTERNAL_FAILURE) + S3_ERROR_CASE(INVALID_ACTION) + S3_ERROR_CASE(INVALID_CLIENT_TOKEN_ID) + S3_ERROR_CASE(INVALID_PARAMETER_COMBINATION) + S3_ERROR_CASE(INVALID_QUERY_PARAMETER) + S3_ERROR_CASE(INVALID_PARAMETER_VALUE) + S3_ERROR_CASE(MISSING_ACTION) + S3_ERROR_CASE(MISSING_AUTHENTICATION_TOKEN) + S3_ERROR_CASE(MISSING_PARAMETER) + S3_ERROR_CASE(OPT_IN_REQUIRED) + S3_ERROR_CASE(REQUEST_EXPIRED) + S3_ERROR_CASE(SERVICE_UNAVAILABLE) + S3_ERROR_CASE(THROTTLING) + S3_ERROR_CASE(VALIDATION) + S3_ERROR_CASE(ACCESS_DENIED) + S3_ERROR_CASE(RESOURCE_NOT_FOUND) + S3_ERROR_CASE(UNRECOGNIZED_CLIENT) + S3_ERROR_CASE(MALFORMED_QUERY_STRING) + S3_ERROR_CASE(SLOW_DOWN) + S3_ERROR_CASE(REQUEST_TIME_TOO_SKEWED) + S3_ERROR_CASE(INVALID_SIGNATURE) + S3_ERROR_CASE(SIGNATURE_DOES_NOT_MATCH) + S3_ERROR_CASE(INVALID_ACCESS_KEY_ID) + S3_ERROR_CASE(REQUEST_TIMEOUT) + S3_ERROR_CASE(NETWORK_CONNECTION) + S3_ERROR_CASE(UNKNOWN) + S3_ERROR_CASE(BUCKET_ALREADY_EXISTS) + S3_ERROR_CASE(BUCKET_ALREADY_OWNED_BY_YOU) + // The following is the most recent addition to S3Errors + // and is not supported yet for some versions of the SDK + // that Apache Arrow is using. This is not a big deal + // since this error will happen only in very specialized + // settings and we will print the correct numerical error + // code as per the "default" case down below. We should + // put it back once the SDK has been upgraded in all + // Apache Arrow build configurations. + // S3_ERROR_CASE(INVALID_OBJECT_STATE) + S3_ERROR_CASE(NO_SUCH_BUCKET) + S3_ERROR_CASE(NO_SUCH_KEY) + S3_ERROR_CASE(NO_SUCH_UPLOAD) + S3_ERROR_CASE(OBJECT_ALREADY_IN_ACTIVE_TIER) + S3_ERROR_CASE(OBJECT_NOT_IN_ACTIVE_TIER) + +#undef S3_ERROR_CASE + default: + return "[code " + ::arrow::internal::ToChars(static_cast(error_type)) + "]"; + } +} + +// TODO qualify error messages with a prefix indicating context +// (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...") +template +Status ErrorToStatus(const std::string& prefix, + const std::string& operation, + const Aws::Client::AWSError& error, + const std::optional& region = std::nullopt) { + // XXX Handle fine-grained error types + // See + // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371 + auto error_type = static_cast(error.GetErrorType()); + std::stringstream ss; + ss << S3ErrorToString(error_type); + if (error_type == Aws::S3::S3Errors::UNKNOWN) { + ss << " (HTTP status " << static_cast(error.GetResponseCode()) << ")"; + } + + // Possibly an error due to wrong region configuration from client and bucket. + std::optional wrong_region_msg = std::nullopt; + if (region.has_value()) { + const auto maybe_region = BucketRegionFromError(error); + if (maybe_region.has_value() && maybe_region.value() != region.value()) { + wrong_region_msg = " Looks like the configured region is '" + region.value() + + "' while the bucket is located in '" + maybe_region.value() + "'."; + } + } + return Status::IOError(prefix, "AWS Error ", ss.str(), " during ", operation, " operation: ", error.GetMessage(), + wrong_region_msg.value_or("")); +} + +template +Status ErrorToStatus(const std::tuple& prefix, + const std::string& operation, + const Aws::Client::AWSError& error) { + std::stringstream ss; + ::arrow::internal::PrintTuple(&ss, prefix); + return ErrorToStatus(ss.str(), operation, error); +} + +template +Status ErrorToStatus(const std::string& operation, const Aws::Client::AWSError& error) { + return ErrorToStatus(std::string(), operation, error); +} + +template +Status OutcomeToStatus(const std::string& prefix, + const std::string& operation, + const Aws::Utils::Outcome& outcome) { + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + return ErrorToStatus(prefix, operation, outcome.GetError()); + } +} + +template +Status OutcomeToStatus(const std::tuple& prefix, + const std::string& operation, + const Aws::Utils::Outcome& outcome) { + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + return ErrorToStatus(prefix, operation, outcome.GetError()); + } +} + +template +Status OutcomeToStatus(const std::string& operation, const Aws::Utils::Outcome& outcome) { + return OutcomeToStatus(std::string(), operation, outcome); +} + +template +Result OutcomeToResult(const std::string& operation, Aws::Utils::Outcome outcome) { + if (outcome.IsSuccess()) { + return std::move(outcome).GetResultWithOwnership(); + } else { + return ErrorToStatus(operation, outcome.GetError()); + } +} + +inline Aws::String ToAwsString(const std::string& s) { + // Direct construction of Aws::String from std::string doesn't work because + // it uses a specific Allocator class. + return Aws::String(s.begin(), s.end()); +} + +inline std::string_view FromAwsString(const Aws::String& s) { return {s.data(), s.length()}; } + +inline Aws::String ToURLEncodedAwsString(const std::string& s) { return Aws::Utils::StringUtils::URLEncode(s.data()); } + +inline TimePoint FromAwsDatetime(const Aws::Utils::DateTime& dt) { + return std::chrono::time_point_cast(dt.UnderlyingTimestamp()); +} + +// A connect retry strategy with a controlled max duration. + +class ConnectRetryStrategy : public Aws::Client::RetryStrategy { + public: + static const int32_t kDefaultRetryInterval = 200; /* milliseconds */ + static const int32_t kDefaultMaxRetryDuration = 6000; /* milliseconds */ + + explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval, + int32_t max_retry_duration = kDefaultMaxRetryDuration) + : retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {} + + bool ShouldRetry(const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + if (!IsConnectError(error)) { + // Not a connect error, don't retry + return false; + } + return attempted_retries * retry_interval_ < max_retry_duration_; + } + + long CalculateDelayBeforeNextRetry( // NOLINT runtime/int + const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + return retry_interval_; + } + + protected: + int32_t retry_interval_; + int32_t max_retry_duration_; +}; + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/cpp/include/milvus-storage/filesystem/s3/util_internal.h b/cpp/include/milvus-storage/filesystem/s3/util_internal.h new file mode 100644 index 0000000..cf8b95f --- /dev/null +++ b/cpp/include/milvus-storage/filesystem/s3/util_internal.h @@ -0,0 +1,108 @@ + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/io/interfaces.h" +#include "arrow/status.h" +#include "arrow/util/uri.h" +#include "arrow/util/visibility.h" + +namespace arrow { +using util::Uri; +namespace fs { +namespace internal { + +template +inline OutputType checked_cast(InputType&& value) { + static_assert( + std::is_class::type>::type>::value, + "checked_cast input type must be a class"); + static_assert( + std::is_class::type>::type>::value, + "checked_cast output type must be a class"); +#ifdef NDEBUG + return static_cast(value); +#else + return dynamic_cast(value); +#endif +} + +ARROW_EXPORT +TimePoint CurrentTimePoint(); + +ARROW_EXPORT +Status CopyStream(const std::shared_ptr& src, + const std::shared_ptr& dest, + int64_t chunk_size, + const io::IOContext& io_context); + +ARROW_EXPORT +Status PathNotFound(std::string_view path); + +ARROW_EXPORT +Status IsADir(std::string_view path); + +ARROW_EXPORT +Status NotADir(std::string_view path); + +ARROW_EXPORT +Status NotEmpty(std::string_view path); + +ARROW_EXPORT +Status NotAFile(std::string_view path); + +ARROW_EXPORT +Status InvalidDeleteDirContents(std::string_view path); + +/// \brief Parse the string as a URI +/// \param uri_string the string to parse +/// +/// This is the same as Uri::Parse except it tolerates Windows +/// file URIs that contain backslash instead of / +Result ParseFileSystemUri(const std::string& uri_string); + +/// \brief check if the string is a local absolute path +ARROW_EXPORT +bool DetectAbsolutePath(const std::string& s); + +/// \brief describes how to handle the authority (host) component of the URI +enum class AuthorityHandlingBehavior { + // Return an invalid status if the authority is non-empty + kDisallow = 0, + // Prepend the authority to the path (e.g. authority/some/path) + kPrepend = 1, + // Convert to a Windows style network path (e.g. //authority/some/path) + kWindows = 2, + // Ignore the authority and just use the path + kIgnore = 3 +}; + +/// \brief check to see if uri_string matches one of the supported schemes and return the +/// path component +/// \param uri_string a uri or local path to test and convert +/// \param supported_schemes the set of URI schemes that should be accepted +/// \param accept_local_paths if true, allow an absolute path +/// \return the path portion of the URI +Result PathFromUriHelper(const std::string& uri_string, + std::vector supported_schemes, + bool accept_local_paths, + AuthorityHandlingBehavior authority_handling); + +/// \brief Return files matching the glob pattern on the filesystem +/// +/// Globbing starts from the root of the filesystem. +ARROW_EXPORT +Result GlobFiles(const std::shared_ptr& filesystem, const std::string& glob); + +extern FileSystemGlobalOptions global_options; + +ARROW_EXPORT +Status PathNotFound(std::string_view path); + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/cpp/include/milvus-storage/format/parquet/file_writer.h b/cpp/include/milvus-storage/format/parquet/file_writer.h index f010f4d..aa86b8f 100644 --- a/cpp/include/milvus-storage/format/parquet/file_writer.h +++ b/cpp/include/milvus-storage/format/parquet/file_writer.h @@ -21,18 +21,23 @@ #include "parquet/arrow/writer.h" #include "arrow/table.h" #include +#include "common/config.h" namespace milvus_storage { class ParquetFileWriter : public FileWriter { public: // with default WriterProperties - ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path); + ParquetFileWriter(std::shared_ptr schema, + arrow::fs::FileSystem& fs, + const std::string& file_path, + const StorageConfig& storage_config); // with custom WriterProperties ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + const StorageConfig& storage_config, const parquet::WriterProperties& props); Status Init() override; @@ -52,6 +57,7 @@ class ParquetFileWriter : public FileWriter { arrow::fs::FileSystem& fs_; std::shared_ptr schema_; const std::string file_path_; + const StorageConfig& storage_config_; std::unique_ptr writer_; std::shared_ptr kv_metadata_; diff --git a/cpp/include/milvus-storage/packed/column_group_writer.h b/cpp/include/milvus-storage/packed/column_group_writer.h index 2794d7b..df8a62c 100644 --- a/cpp/include/milvus-storage/packed/column_group_writer.h +++ b/cpp/include/milvus-storage/packed/column_group_writer.h @@ -21,6 +21,7 @@ #include "arrow/filesystem/filesystem.h" #include "common/status.h" #include "packed/column_group.h" +#include "common/config.h" namespace milvus_storage { @@ -30,12 +31,14 @@ class ColumnGroupWriter { std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + const StorageConfig& storage_config, const std::vector& origin_column_indices); ColumnGroupWriter(GroupId group_id, std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + const StorageConfig& storage_config, const parquet::WriterProperties& props, const std::vector& origin_column_indices); diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index d67137f..06ed199 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -16,7 +16,7 @@ #include #include -#include "packed/utils/config.h" +#include "common/config.h" #include #include #include diff --git a/cpp/include/milvus-storage/packed/writer.h b/cpp/include/milvus-storage/packed/writer.h index eec754c..5faa6b1 100644 --- a/cpp/include/milvus-storage/packed/writer.h +++ b/cpp/include/milvus-storage/packed/writer.h @@ -17,6 +17,7 @@ #include "packed/column_group_writer.h" #include "packed/column_group.h" #include "packed/splitter/indices_based_splitter.h" +#include "common/config.h" namespace milvus_storage { @@ -35,6 +36,7 @@ class PackedRecordBatchWriter { std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + StorageConfig& storage_config, parquet::WriterProperties& props); // Put the record batch into the corresponding column group, @@ -56,6 +58,7 @@ class PackedRecordBatchWriter { std::shared_ptr schema_; arrow::fs::FileSystem& fs_; const std::string& file_path_; + const StorageConfig& storage_config_; parquet::WriterProperties& props_; size_t current_memory_usage_; std::vector> group_writers_; diff --git a/cpp/src/filesystem/fs.cpp b/cpp/src/filesystem/fs.cpp index 8b0586f..775235e 100644 --- a/cpp/src/filesystem/fs.cpp +++ b/cpp/src/filesystem/fs.cpp @@ -22,10 +22,10 @@ namespace milvus_storage { -Result> FileSystemFactory::BuildFileSystem(const std::string& uri, +Result> FileSystemFactory::BuildFileSystem(const StorageConfig& storage_config, std::string* out_path) { arrow::util::Uri uri_parser; - RETURN_ARROW_NOT_OK(uri_parser.Parse(uri)); + RETURN_ARROW_NOT_OK(uri_parser.Parse(storage_config.uri)); auto scheme = uri_parser.scheme(); auto host = uri_parser.host(); if (scheme == "file") { @@ -38,10 +38,10 @@ Result> FileSystemFactory::BuildFileSyste if (host.find("s3") != std::string::npos || host.find("googleapis") != std::string::npos || host.find("oss") != std::string::npos || host.find("cos") != std::string::npos) { auto producer = std::make_shared(); - return producer->Make(uri, out_path); + return producer->Make(storage_config, out_path); } else if (host.find("blob.core.windows.net") != std::string::npos) { auto producer = std::make_shared(); - return producer->Make(uri, out_path); + return producer->Make(storage_config, out_path); } } diff --git a/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp b/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp new file mode 100644 index 0000000..1c91496 --- /dev/null +++ b/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp @@ -0,0 +1,2419 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "filesystem/s3/multi_part_upload_s3_fs.h" +#include "filesystem/s3/s3_internal.h" +#include "filesystem/s3/util_internal.h" + +#include "common/path_util.h" +#include "filesystem/io/io_util.h" + +#include "arrow/util/async_generator.h" +#include "arrow/util/logging.h" +#include "arrow/buffer.h" +#include "arrow/result.h" +#include "arrow/io/memory.h" +#include "arrow/util/future.h" +#include "arrow/util/thread_pool.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/filesystem/type_fwd.h" +#include "arrow/util/string.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +using namespace std; + +static constexpr const char kSep = '/'; + +using ::arrow::Buffer; +using ::arrow::Future; +using ::arrow::Result; +using ::arrow::Status; +using ::arrow::fs::FileInfo; +using ::arrow::fs::FileInfoGenerator; +using ::arrow::fs::FileInfoVector; +using ::arrow::fs::FileSelector; +using ::arrow::fs::FileType; +using ::arrow::fs::kNoSize; +using ::arrow::fs::S3FileSystem; +using ::arrow::fs::S3Options; +using ::arrow::fs::S3RetryStrategy; +using ::arrow::fs::internal::ConnectRetryStrategy; +using ::arrow::fs::internal::DetectS3Backend; +using ::arrow::fs::internal::ErrorToStatus; +using ::arrow::fs::internal::FromAwsDatetime; +using ::arrow::fs::internal::FromAwsString; +using ::arrow::fs::internal::IsAlreadyExists; +using ::arrow::fs::internal::IsNotFound; +using ::arrow::fs::internal::OutcomeToResult; +using ::arrow::fs::internal::OutcomeToStatus; +using ::arrow::fs::internal::RemoveTrailingSlash; +using ::arrow::fs::internal::S3Backend; +using ::arrow::fs::internal::ToAwsString; +using ::arrow::fs::internal::ToURLEncodedAwsString; +using ::Aws::Client::AWSError; +using ::Aws::S3::S3Errors; + +using namespace ::arrow; +namespace S3Model = Aws::S3::Model; + +namespace milvus_storage { + +static constexpr const char kAwsEndpointUrlEnvVar[] = "AWS_ENDPOINT_URL"; +static constexpr const char kAwsEndpointUrlS3EnvVar[] = "AWS_ENDPOINT_URL_S3"; +static constexpr const char kAwsDirectoryContentType[] = "application/x-directory"; + +bool IsDirectory(std::string_view key, const S3Model::HeadObjectResult& result) { + // If it has a non-zero length, it's a regular file. We do this even if + // the key has a trailing slash, as directory markers should never have + // any data associated to them. + if (result.GetContentLength() > 0) { + return false; + } + // Otherwise, if it has a trailing slash, it's a directory + if (arrow::fs::internal::HasTrailingSlash(key)) { + return true; + } + // Otherwise, if its content type starts with "application/x-directory", + // it's a directory + if (::arrow::internal::StartsWith(result.GetContentType(), kAwsDirectoryContentType)) { + return true; + } + // Otherwise, it's a regular file. + return false; +} + +inline Aws::String ToAwsString(const std::string& s) { + // Direct construction of Aws::String from std::string doesn't work because + // it uses a specific Allocator class. + return Aws::String(s.begin(), s.end()); +} + +inline std::string_view FromAwsString(const Aws::String& s) { return {s.data(), s.length()}; } + +template +struct ObjectMetadataSetter { + using Setter = std::function; + + static std::unordered_map GetSetters() { + return {{"ACL", CannedACLSetter()}, + {"Cache-Control", StringSetter(&ObjectRequest::SetCacheControl)}, + {"Content-Type", StringSetter(&ObjectRequest::SetContentType)}, + {"Content-Language", StringSetter(&ObjectRequest::SetContentLanguage)}, + {"Expires", DateTimeSetter(&ObjectRequest::SetExpires)}}; + } + + private: + static Setter StringSetter(void (ObjectRequest::*req_method)(Aws::String&&)) { + return [req_method](const std::string& v, ObjectRequest* req) { + (req->*req_method)(ToAwsString(v)); + return Status::OK(); + }; + } + + static Setter DateTimeSetter(void (ObjectRequest::*req_method)(Aws::Utils::DateTime&&)) { + return [req_method](const std::string& v, ObjectRequest* req) { + (req->*req_method)(Aws::Utils::DateTime(v.data(), Aws::Utils::DateFormat::ISO_8601)); + return Status::OK(); + }; + } + + static Setter CannedACLSetter() { + return [](const std::string& v, ObjectRequest* req) { + ARROW_ASSIGN_OR_RAISE(auto acl, ParseACL(v)); + req->SetACL(acl); + return Status::OK(); + }; + } + + static Result ParseACL(const std::string& v) { + if (v.empty()) { + return S3Model::ObjectCannedACL::NOT_SET; + } + auto acl = S3Model::ObjectCannedACLMapper::GetObjectCannedACLForName(ToAwsString(v)); + if (acl == S3Model::ObjectCannedACL::NOT_SET) { + // XXX This actually never happens, as the AWS SDK dynamically + // expands the enum range using Aws::GetEnumOverflowContainer() + return Status::Invalid("Invalid S3 canned ACL: '", v, "'"); + } + return acl; + } +}; + +struct S3Path { + std::string full_path; + std::string bucket; + std::string key; + std::vector key_parts; + + static Result FromString(const std::string& s) { + if (arrow::fs::internal::IsLikelyUri(s)) { + return arrow::Status::Invalid("Expected an S3 object path of the form 'bucket/key...', got a URI: '", s, "'"); + } + const auto src = RemoveTrailingSlash(s); + auto first_sep = src.find_first_of(kSep); + if (first_sep == 0) { + return arrow::Status::Invalid("Path cannot start with a separator ('", s, "')"); + } + if (first_sep == std::string::npos) { + return S3Path{std::string(src), std::string(src), "", {}}; + } + S3Path path; + path.full_path = std::string(src); + path.bucket = std::string(src.substr(0, first_sep)); + path.key = std::string(src.substr(first_sep + 1)); + path.key_parts = arrow::fs::internal::SplitAbstractPath(path.key); + ARROW_RETURN_NOT_OK(Validate(path)); + return path; + } + + static arrow::Status Validate(const S3Path& path) { + auto st = arrow::fs::internal::ValidateAbstractPath(path.full_path); + if (!st.ok()) { + return arrow::Status::Invalid(st.message(), " in path ", path.full_path); + } + return arrow::Status::OK(); + } + + Aws::String ToAwsString() const { + Aws::String res(bucket.begin(), bucket.end()); + res.reserve(bucket.size() + key.size() + 1); + res += kSep; + res.append(key.begin(), key.end()); + return res; + } + + S3Path parent() const { + DCHECK(!key_parts.empty()); + auto parent = S3Path{"", bucket, "", key_parts}; + parent.key_parts.pop_back(); + parent.key = arrow::fs::internal::JoinAbstractPath(parent.key_parts); + parent.full_path = parent.bucket + kSep + parent.key; + return parent; + } + + bool has_parent() const { return !key.empty(); } + + bool empty() const { return bucket.empty() && key.empty(); } + + bool operator==(const S3Path& other) const { return bucket == other.bucket && key == other.key; } +}; + +Status PathNotFound(const S3Path& path) { return ::arrow::fs::internal::PathNotFound(path.full_path); } + +Status PathNotFound(const std::string& bucket, const std::string& key) { + return ::arrow::fs::internal::PathNotFound(bucket + kSep + key); +} + +arrow::Status NotAFile(const S3Path& path) { return NotAFile(path.full_path); } + +arrow::Status ValidateFilePath(const S3Path& path) { + if (path.bucket.empty() || path.key.empty()) { + return NotAFile(path); + } + return arrow::Status::OK(); +}; + +arrow::Status ErrorS3Finalized() { return arrow::Status::Invalid("S3 subsystem is finalized"); } + +arrow::Status CheckS3Initialized() { + if (!arrow::fs::IsS3Initialized()) { + if (arrow::fs::IsS3Finalized()) { + return ErrorS3Finalized(); + } + return arrow::Status::Invalid( + "S3 subsystem is not initialized; please call InitializeS3() " + "before carrying out any S3-related operation"); + } + return arrow::Status::OK(); +}; + +class WrappedRetryStrategy : public Aws::Client::RetryStrategy { + public: + explicit WrappedRetryStrategy(const std::shared_ptr& s3_retry_strategy) + : s3_retry_strategy_(s3_retry_strategy) {} + + bool ShouldRetry(const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + arrow::fs::S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error); + return s3_retry_strategy_->ShouldRetry(detail, static_cast(attempted_retries)); + } + + long CalculateDelayBeforeNextRetry( // NOLINT runtime/int + const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + arrow::fs::S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error); + return static_cast( // NOLINT runtime/int + s3_retry_strategy_->CalculateDelayBeforeNextRetry(detail, static_cast(attempted_retries))); + } + + private: + template + static arrow::fs::S3RetryStrategy::AWSErrorDetail ErrorToDetail(const Aws::Client::AWSError& error) { + arrow::fs::S3RetryStrategy::AWSErrorDetail detail; + detail.error_type = static_cast(error.GetErrorType()); + detail.message = std::string(FromAwsString(error.GetMessage())); + detail.exception_name = std::string(FromAwsString(error.GetExceptionName())); + detail.should_retry = error.ShouldRetry(); + return detail; + } + + std::shared_ptr s3_retry_strategy_; +}; + +class S3Client : public Aws::S3::S3Client { + public: + using Aws::S3::S3Client::S3Client; + + static inline constexpr auto kBucketRegionHeaderName = "x-amz-bucket-region"; + + std::string GetBucketRegionFromHeaders(const Aws::Http::HeaderValueCollection& headers) { + const auto it = headers.find(ToAwsString(kBucketRegionHeaderName)); + if (it != headers.end()) { + return std::string(FromAwsString(it->second)); + } + return std::string(); + } + + template + arrow::Result GetBucketRegionFromError(const std::string& bucket, + const Aws::Client::AWSError& error) { + std::string region = GetBucketRegionFromHeaders(error.GetResponseHeaders()); + if (!region.empty()) { + return region; + } else if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return arrow::Status::IOError("Bucket '", bucket, "' not found"); + } else { + return arrow::Status::IOError("When resolving region for bucket: ", bucket); + } + } + + Result GetBucketRegion(const std::string& bucket, const S3Model::HeadBucketRequest& request) { + auto uri = GeneratePresignedUrl(request.GetBucket(), + /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD); + // NOTE: The signer region argument isn't passed here, as there's no easy + // way of computing it (the relevant method is private). + auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD, Aws::Auth::SIGV4_SIGNER); + if (!outcome.IsSuccess()) { + return GetBucketRegionFromError(bucket, outcome.GetError()); + } + std::string region = GetBucketRegionFromHeaders(outcome.GetResult().GetHeaderValueCollection()); + if (!region.empty()) { + return region; + } else if (outcome.GetResult().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return arrow::Status::IOError("Bucket '", request.GetBucket(), "' not found"); + } else { + return arrow::Status::IOError("When resolving region for bucket '", request.GetBucket(), + "': missing 'x-amz-bucket-region' header in response"); + } + } + + Result GetBucketRegion(const std::string& bucket) { + S3Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(bucket)); + return GetBucketRegion(bucket, req); + } + + S3Model::CompleteMultipartUploadOutcome CompleteMultipartUploadWithErrorFixup( + S3Model::CompleteMultipartUploadRequest&& request) const { + // CompletedMultipartUpload can return a 200 OK response with an error + // encoded in the response body, in which case we should either retry + // or propagate the error to the user (see + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html). + // + // Unfortunately the AWS SDK doesn't detect such situations but lets them + // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658). + // + // We work around the issue by registering a DataReceivedEventHandler + // which parses the XML response for embedded errors. + + std::optional> aws_error; + + auto handler = [&](const Aws::Http::HttpRequest* http_req, Aws::Http::HttpResponse* http_resp, + long long) { // NOLINT runtime/int + auto& stream = http_resp->GetResponseBody(); + const auto pos = stream.tellg(); + const auto doc = Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream); + // Rewind stream for later + stream.clear(); + stream.seekg(pos); + + if (doc.WasParseSuccessful()) { + auto root = doc.GetRootElement(); + if (!root.IsNull()) { + // Detect something that looks like an abnormal CompletedMultipartUpload + // response. + if (root.GetName() != "CompleteMultipartUploadResult" || !root.FirstChild("Error").IsNull() || + !root.FirstChild("Errors").IsNull()) { + // Make sure the error marshaller doesn't see a 200 OK + http_resp->SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR); + aws_error = GetErrorMarshaller()->Marshall(*http_resp); + // Rewind stream for later + stream.clear(); + stream.seekg(pos); + } + } + } + }; + + request.SetDataReceivedEventHandler(std::move(handler)); + + // We don't have access to the configured AWS retry strategy + // (m_retryStrategy is a private member of AwsClient), so don't use that. + std::unique_ptr retry_strategy; + if (s3_retry_strategy_) { + retry_strategy.reset(new WrappedRetryStrategy(s3_retry_strategy_)); + } else { + // Note that DefaultRetryStrategy, unlike StandardRetryStrategy, + // has empty definitions for RequestBookkeeping() and GetSendToken(), + // which simplifies the code below. + retry_strategy.reset(new Aws::Client::DefaultRetryStrategy()); + } + + for (int32_t retries = 0;; retries++) { + aws_error.reset(); + auto outcome = Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request); + if (!outcome.IsSuccess()) { + // Error returned in HTTP headers (or client failure) + return outcome; + } + if (!aws_error.has_value()) { + // Genuinely successful outcome + return outcome; + } + + const bool should_retry = retry_strategy->ShouldRetry(*aws_error, retries); + + ARROW_LOG(WARNING) << "CompletedMultipartUpload got error embedded in a 200 OK response: " + << aws_error->GetExceptionName() << " (\"" << aws_error->GetMessage() + << "\"), retry = " << should_retry; + + if (!should_retry) { + break; + } + const auto delay = std::chrono::milliseconds(retry_strategy->CalculateDelayBeforeNextRetry(*aws_error, retries)); + std::this_thread::sleep_for(delay); + } + + DCHECK(aws_error.has_value()); + auto s3_error = AWSError(std::move(aws_error).value()); + return S3Model::CompleteMultipartUploadOutcome(std::move(s3_error)); + } + + std::shared_ptr s3_retry_strategy_; +}; + +class S3ClientFinalizer; + +class S3ClientLock { + public: + S3Client* get() { return client_.get(); } + S3Client* operator->() { return client_.get(); } + + // Move this S3ClientLock into a temporary instance + // + // It is counter-intuitive, but lock ordering issues can happen even + // with a shared mutex locked in shared mode. + // The reason is that locking again in shared mode can block while + // there are threads waiting to take the lock in exclusive mode. + // Therefore, we should avoid obtaining the S3ClientLock when + // we already have it locked. + // + // This methods helps by moving the S3ClientLock into a temporary + // that is immediately destroyed so the lock will be released as + // soon as we are done making the call to the underlying client. + // + // (see GH-36523) + S3ClientLock Move() { return std::move(*this); } + + protected: + friend class S3ClientHolder; + + // Locks the finalizer until the S3ClientLock gets out of scope. + std::shared_lock lock_; + std::shared_ptr client_; +}; + +class S3ClientHolder { + public: + /// \brief Return a RAII guard guaranteeing a S3Client is safe for use + /// + /// S3 finalization will be deferred until the returned S3ClientLock + /// goes out of scope. + /// An error is returned if S3 is already finalized. + arrow::Result Lock(); + + S3ClientHolder(std::weak_ptr finalizer, std::shared_ptr client) + : finalizer_(std::move(finalizer)), client_(std::move(client)) {} + + void Finalize(); + + protected: + std::mutex mutex_; + std::weak_ptr finalizer_; + std::shared_ptr client_; +}; + +class S3ClientFinalizer : public std::enable_shared_from_this { + using ClientHolderList = std::vector>; + + public: + arrow::Result> AddClient(std::shared_ptr client) { + std::unique_lock lock(mutex_); + if (finalized_) { + return ErrorS3Finalized(); + } + + auto holder = std::make_shared(shared_from_this(), std::move(client)); + + // Remove expired entries before adding new one + auto end = std::remove_if(holders_.begin(), holders_.end(), + [](std::weak_ptr holder) { return holder.expired(); }); + holders_.erase(end, holders_.end()); + holders_.emplace_back(holder); + return holder; + } + + void Finalize() { + std::unique_lock lock(mutex_); + finalized_ = true; + + ClientHolderList finalizing = std::move(holders_); + lock.unlock(); // avoid lock ordering issue with S3ClientHolder::Finalize + + // Finalize all client holders, such that no S3Client remains alive + // after this. + for (auto&& weak_holder : finalizing) { + auto holder = weak_holder.lock(); + if (holder) { + holder->Finalize(); + } + } + } + + auto LockShared() { return std::shared_lock(mutex_); } + + protected: + friend class S3ClientHolder; + + std::shared_mutex mutex_; + ClientHolderList holders_; + bool finalized_ = false; +}; + +arrow::Result S3ClientHolder::Lock() { + std::shared_ptr finalizer; + std::shared_ptr client; + { + std::unique_lock lock(mutex_); + finalizer = finalizer_.lock(); + client = client_; + } + // Do not hold mutex while taking finalizer lock below. + // + // Acquiring a shared_mutex in shared mode may block even if not already + // acquired in exclusive mode, because of pending writers: + // https://github.com/google/sanitizers/issues/1668#issuecomment-1624985664 + // """It is implementation-defined whether the calling thread acquires + // the lock when a writer does not hold the lock and there are writers + // blocked on the lock""". + // + // Therefore, we want to avoid potential lock ordering issues + // even when a shared lock is involved (GH-36523). + if (!finalizer) { + return ErrorS3Finalized(); + } + + S3ClientLock client_lock; + // Lock the finalizer before examining it + client_lock.lock_ = finalizer->LockShared(); + if (finalizer->finalized_) { + return ErrorS3Finalized(); + } + // (the client can be cleared only if finalizer->finalized_ is true) + DCHECK(client) << "inconsistent S3ClientHolder"; + client_lock.client_ = std::move(client); + return client_lock; +} + +void S3ClientHolder::Finalize() { + std::shared_ptr client; + { + std::unique_lock lock(mutex_); + client = std::move(client_); + } + // Do not hold mutex while ~S3Client potentially runs +} + +std::shared_ptr GetClientFinalizer() { + static auto finalizer = std::make_shared(); + return finalizer; +} + +arrow::Result> GetClientHolder(std::shared_ptr client) { + return GetClientFinalizer()->AddClient(std::move(client)); +} + +template +arrow::Status SetObjectMetadata(const std::shared_ptr& metadata, ObjectRequest* req) { + static auto setters = ObjectMetadataSetter::GetSetters(); + + DCHECK_NE(metadata, nullptr); + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (size_t i = 0; i < keys.size(); ++i) { + auto it = setters.find(keys[i]); + if (it != setters.end()) { + RETURN_NOT_OK(it->second(values[i], req)); + } + } + return arrow::Status::OK(); +} + +class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream { + public: + StringViewStream(const void* data, int64_t nbytes) + : Aws::Utils::Stream::PreallocatedStreamBuf(reinterpret_cast(const_cast(data)), + static_cast(nbytes)), + std::iostream(this) {} +}; + +class ClientBuilder { + public: + explicit ClientBuilder(S3Options options) : options_(std::move(options)) {} + + const Aws::Client::ClientConfiguration& config() const { return client_config_; } + + Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; } + + Result> BuildClient(std::optional io_context = std::nullopt) { + credentials_provider_ = options_.credentials_provider; + if (!options_.region.empty()) { + client_config_.region = ToAwsString(options_.region); + } + if (options_.request_timeout > 0) { + // Use ceil() to avoid setting it to 0 as that probably means no timeout. + client_config_.requestTimeoutMs = static_cast(ceil(options_.request_timeout * 1000)); // NOLINT runtime/int + } + if (options_.connect_timeout > 0) { + client_config_.connectTimeoutMs = static_cast(ceil(options_.connect_timeout * 1000)); // NOLINT runtime/int + } + + client_config_.endpointOverride = ToAwsString(options_.endpoint_override); + if (options_.scheme == "http") { + client_config_.scheme = Aws::Http::Scheme::HTTP; + } else if (options_.scheme == "https") { + client_config_.scheme = Aws::Http::Scheme::HTTPS; + } else { + return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'"); + } + if (options_.retry_strategy) { + client_config_.retryStrategy = std::make_shared(options_.retry_strategy); + } else { + client_config_.retryStrategy = std::make_shared(); + } + if (!arrow::fs::internal::global_options.tls_ca_file_path.empty()) { + client_config_.caFile = ToAwsString(arrow::fs::internal::global_options.tls_ca_file_path); + } + if (!arrow::fs::internal::global_options.tls_ca_dir_path.empty()) { + client_config_.caPath = ToAwsString(arrow::fs::internal::global_options.tls_ca_dir_path); + } + + // Set proxy options if provided + if (!options_.proxy_options.scheme.empty()) { + if (options_.proxy_options.scheme == "http") { + client_config_.proxyScheme = Aws::Http::Scheme::HTTP; + } else if (options_.proxy_options.scheme == "https") { + client_config_.proxyScheme = Aws::Http::Scheme::HTTPS; + } else { + return Status::Invalid("Invalid proxy connection scheme '", options_.proxy_options.scheme, "'"); + } + } + if (!options_.proxy_options.host.empty()) { + client_config_.proxyHost = ToAwsString(options_.proxy_options.host); + } + if (options_.proxy_options.port != -1) { + client_config_.proxyPort = options_.proxy_options.port; + } + if (!options_.proxy_options.username.empty()) { + client_config_.proxyUserName = ToAwsString(options_.proxy_options.username); + } + if (!options_.proxy_options.password.empty()) { + client_config_.proxyPassword = ToAwsString(options_.proxy_options.password); + } + + if (io_context) { + // TODO: Once ARROW-15035 is done we can get rid of the "at least 25" fallback + client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25); + } + + const bool use_virtual_addressing = options_.endpoint_override.empty() || options_.force_virtual_addressing; + +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + client_config_.useVirtualAddressing = use_virtual_addressing; + auto endpoint_provider = EndpointProviderCache::Instance()->Lookup(client_config_); + auto client = std::make_shared(credentials_provider_, endpoint_provider, client_config_); +#else + auto client = + std::make_shared(credentials_provider_, client_config_, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing); +#endif + client->s3_retry_strategy_ = options_.retry_strategy; + return GetClientHolder(std::move(client)); + } + + const S3Options& options() const { return options_; } + + protected: + S3Options options_; +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + Aws::S3::S3ClientConfiguration client_config_; +#else + Aws::Client::ClientConfiguration client_config_; +#endif + std::shared_ptr credentials_provider_; +}; + +std::string FormatRange(int64_t start, int64_t length) { + // Format a HTTP range header value + std::stringstream ss; + ss << "bytes=" << start << "-" << start + length - 1; + return ss.str(); +} + +Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) { + return [=]() { return Aws::New("", data, nbytes); }; +} + +Result GetObjectRange( + Aws::S3::S3Client* client, const S3Path& path, int64_t start, int64_t length, void* out) { + S3Model::GetObjectRequest req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetKey(ToAwsString(path.key)); + req.SetRange(ToAwsString(FormatRange(start, length))); + req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length)); + return OutcomeToResult("GetObject", client->GetObject(req)); +} + +template +std::shared_ptr GetObjectMetadata(const ObjectResult& result) { + auto md = std::make_shared(); + + auto push = [&](std::string k, const Aws::String& v) { + if (!v.empty()) { + md->Append(std::move(k), std::string(FromAwsString(v))); + } + }; + auto push_datetime = [&](std::string k, const Aws::Utils::DateTime& v) { + if (v != Aws::Utils::DateTime(0.0)) { + push(std::move(k), v.ToGmtString(Aws::Utils::DateFormat::ISO_8601)); + } + }; + + md->Append("Content-Length", ToChars(result.GetContentLength())); + push("Cache-Control", result.GetCacheControl()); + push("Content-Type", result.GetContentType()); + push("Content-Language", result.GetContentLanguage()); + push("ETag", result.GetETag()); + push("VersionId", result.GetVersionId()); + push_datetime("Last-Modified", result.GetLastModified()); + push_datetime("Expires", result.GetExpires()); + // NOTE the "canned ACL" isn't available for reading (one can get an expanded + // ACL using a separate GetObjectAcl request) + return md; +} + +class ObjectInputFile final : public io::RandomAccessFile { + public: + ObjectInputFile(std::shared_ptr holder, + const io::IOContext& io_context, + const S3Path& path, + int64_t size = kNoSize) + : holder_(std::move(holder)), io_context_(io_context), path_(path), content_length_(size) {} + + Status Init() { + // Issue a HEAD Object to get the content-length and ensure any + // errors (e.g. file not found) don't wait until the first Read() call. + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock.Move()->HeadObject(req); + if (!outcome.IsSuccess()) { + if (IsNotFound(outcome.GetError())) { + return PathNotFound(path_); + } else { + return ErrorToStatus(std::forward_as_tuple("When reading information for key '", path_.key, "' in bucket '", + path_.bucket, "': "), + "HeadObject", outcome.GetError()); + } + } + content_length_ = outcome.GetResult().GetContentLength(); + DCHECK_GE(content_length_, 0); + metadata_ = GetObjectMetadata(outcome.GetResult()); + return Status::OK(); + } + + Status CheckClosed() const { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return Status::OK(); + } + + Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return Status::IOError("Cannot ", action, " past end of file"); + } + return Status::OK(); + } + + // RandomAccessFile APIs + + Result> ReadMetadata() override { return metadata_; } + + Future> ReadMetadataAsync(const io::IOContext& io_context) override { + return metadata_; + } + + Status Close() override { + holder_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + Result GetSize() override { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + Status Seek(int64_t position) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return Status::OK(); + } + + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + nbytes = std::min(nbytes, content_length_ - position); + if (nbytes == 0) { + return 0; + } + + // Read the desired range of bytes + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result, + GetObjectRange(client_lock.get(), path_, position, nbytes, out)); + + auto& stream = result.GetBody(); + stream.ignore(nbytes); + // NOTE: the stream is a stringstream by default, there is no actual error + // to check for. However, stream.fail() may return true if EOF is reached. + return stream.gcount(); + } + + Result> ReadAt(int64_t position, int64_t nbytes) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, io_context_.pool())); + if (nbytes > 0) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buf->mutable_data())); + DCHECK_LE(bytes_read, nbytes); + RETURN_NOT_OK(buf->Resize(bytes_read)); + } + // R build with openSUSE155 requires an explicit shared_ptr construction + return std::shared_ptr(std::move(buf)); + } + + Result Read(int64_t nbytes, void* out) override { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return buffer; + } + + protected: + std::shared_ptr holder_; + const io::IOContext io_context_; + S3Path path_; + + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; + std::shared_ptr metadata_; +}; + +void FileObjectToInfo(std::string_view key, const S3Model::HeadObjectResult& obj, FileInfo* info) { + if (IsDirectory(key, obj)) { + info->set_type(FileType::Directory); + } else { + info->set_type(FileType::File); + } + info->set_size(static_cast(obj.GetContentLength())); + info->set_mtime(FromAwsDatetime(obj.GetLastModified())); +} + +void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) { + info->set_type(arrow::fs::FileType::File); + info->set_size(static_cast(obj.GetSize())); + info->set_mtime(FromAwsDatetime(obj.GetLastModified())); +} + +class CustomOutputStream final : public arrow::io::OutputStream { + protected: + struct UploadState; + + public: + CustomOutputStream(std::shared_ptr holder, + const arrow::io::IOContext& io_context, + const S3Path& path, + const S3Options& options, + const std::shared_ptr& metadata, + const int64_t part_size) + : holder_(std::move(holder)), + io_context_(io_context), + path_(path), + metadata_(metadata), + default_metadata_(options.default_metadata), + background_writes_(options.background_writes), + part_upload_size_(part_size) {} + + ~CustomOutputStream() override { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + CloseFromDestructor(this); + } + + std::shared_ptr Self() { + return std::dynamic_pointer_cast(shared_from_this()); + } + + arrow::Status Init() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + // Initiate the multi-part upload + S3Model::CreateMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + if (metadata_ && metadata_->size() != 0) { + RETURN_NOT_OK(SetObjectMetadata(metadata_, &req)); + } else if (default_metadata_ && default_metadata_->size() != 0) { + RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req)); + } + + // If we do not set anything then the SDK will default to application/xml + // which confuses some tools (https://github.com/apache/arrow/issues/11934) + // So we instead default to application/octet-stream which is less misleading + if (!req.ContentTypeHasBeenSet()) { + req.SetContentType("application/octet-stream"); + } + + auto outcome = client_lock.Move()->CreateMultipartUpload(req); + if (!outcome.IsSuccess()) { + return ErrorToStatus(std::forward_as_tuple("When initiating multiple part upload for key '", path_.key, + "' in bucket '", path_.bucket, "': "), + "CreateMultipartUpload", outcome.GetError()); + } + upload_id_ = outcome.GetResult().GetUploadId(); + upload_state_ = std::make_shared(); + closed_ = false; + return Status::OK(); + } + + arrow::Status Abort() override { + if (closed_) { + return arrow::Status::OK(); + } + + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + S3Model::AbortMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + + auto outcome = client_lock.Move()->AbortMultipartUpload(req); + if (!outcome.IsSuccess()) { + return arrow::Status::Invalid("When aborting multiple part upload for key '", path_.key, "' in bucket '", + path_.bucket, "': ", "AbortMultipartUpload", outcome.GetError()); + } + + current_part_.reset(); + holder_ = nullptr; + closed_ = true; + + return arrow::Status::OK(); + } + + // OutputStream interface + + arrow::Status EnsureReadyToFlushFromClose() { + if (current_part_) { + // Upload last part + RETURN_NOT_OK(CommitCurrentPart()); + } + + // S3 mandates at least one part, upload an empty one if necessary + if (part_number_ == 1) { + RETURN_NOT_OK(UploadPart("", 0)); + } + + return Status::OK(); + } + + arrow::Status FinishPartUploadAfterFlush() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + // At this point, all part uploads have finished successfully + DCHECK_GT(part_number_, 1); + DCHECK_EQ(upload_state_->completed_parts.size(), static_cast(part_number_ - 1)); + + S3Model::CompletedMultipartUpload completed_upload; + completed_upload.SetParts(upload_state_->completed_parts); + S3Model::CompleteMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + req.SetMultipartUpload(std::move(completed_upload)); + + auto outcome = client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req)); + if (!outcome.IsSuccess()) { + return ErrorToStatus(std::forward_as_tuple("When completing multiple part upload for key '", path_.key, + "' in bucket '", path_.bucket, "': "), + "CompleteMultipartUpload", outcome.GetError()); + } + + holder_ = nullptr; + closed_ = true; + return arrow::Status::OK(); + } + + arrow::Status Close() override { + if (closed_) + return arrow::Status::OK(); + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + + RETURN_NOT_OK(Flush()); + + return FinishPartUploadAfterFlush(); + } + + Future<> CloseAsync() override { + if (closed_) + return Status::OK(); + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + + // Wait for in-progress uploads to finish (if async writes are enabled) + return FlushAsync().Then([self = Self()]() { return self->FinishPartUploadAfterFlush(); }); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return pos_; + } + + arrow::Status Write(const std::shared_ptr& buffer) override { + return DoWrite(buffer->data(), buffer->size(), buffer); + } + + arrow::Status Write(const void* data, int64_t nbytes) override { return DoWrite(data, nbytes); } + + arrow::Status DoWrite(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + + const int8_t* data_ptr = reinterpret_cast(data); + auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) { + data_ptr += offset; + nbytes -= offset; + }; + + // Handle case where we have some bytes buffered from prior calls. + if (current_part_size_ > 0) { + // Try to fill current buffer + const int64_t to_copy = std::min(nbytes, part_upload_size_ - current_part_size_); + RETURN_NOT_OK(current_part_->Write(data_ptr, to_copy)); + current_part_size_ += to_copy; + advance_ptr(to_copy); + pos_ += to_copy; + + // If buffer isn't full, break + if (current_part_size_ < part_upload_size_) { + return arrow::Status::OK(); + } + + RETURN_NOT_OK(CommitCurrentPart()); + } + + // We can upload chunks without copying them into a buffer + while (nbytes >= part_upload_size_) { + RETURN_NOT_OK(UploadPart(data_ptr, part_upload_size_)); + advance_ptr(part_upload_size_); + pos_ += part_upload_size_; + } + + // Buffer remaining bytes + if (nbytes > 0) { + current_part_size_ = nbytes; + ARROW_ASSIGN_OR_RAISE(current_part_, + arrow::io::BufferOutputStream::Create(part_upload_size_, io_context_.pool())); + RETURN_NOT_OK(current_part_->Write(data_ptr, current_part_size_)); + pos_ += current_part_size_; + } + + return arrow::Status::OK(); + } + + arrow::Status Flush() override { + auto fut = FlushAsync(); + return fut.status(); + } + + Future<> FlushAsync() { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + // Wait for background writes to finish + std::unique_lock lock(upload_state_->mutex); + return upload_state_->pending_parts_completed; + } + + // Upload-related helpers + + arrow::Status CommitCurrentPart() { + ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish()); + current_part_.reset(); + current_part_size_ = 0; + return UploadPart(buf); + } + + Status UploadPart(std::shared_ptr buffer) { return UploadPart(buffer->data(), buffer->size(), buffer); } + + Status UploadPart(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + S3Model::UploadPartRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + req.SetPartNumber(part_number_); + req.SetContentLength(nbytes); + + if (!background_writes_) { + req.SetBody(std::make_shared(data, nbytes)); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock.Move()->UploadPart(req); + if (!outcome.IsSuccess()) { + return UploadPartError(req, outcome); + } else { + AddCompletedPart(upload_state_, part_number_, outcome.GetResult()); + } + } else { + // If the data isn't owned, make an immutable copy for the lifetime of the closure + if (owned_buffer == nullptr) { + ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); + memcpy(owned_buffer->mutable_data(), data, nbytes); + } else { + DCHECK_EQ(data, owned_buffer->data()); + DCHECK_EQ(nbytes, owned_buffer->size()); + } + req.SetBody(std::make_shared(owned_buffer->data(), owned_buffer->size())); + + { + std::unique_lock lock(upload_state_->mutex); + if (upload_state_->parts_in_progress++ == 0) { + upload_state_->pending_parts_completed = Future<>::Make(); + } + } + + // The closure keeps the buffer and the upload state alive + auto deferred = [owned_buffer, holder = holder_, req = std::move(req), state = upload_state_, + part_number = part_number_]() mutable -> Status { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + auto outcome = client_lock.Move()->UploadPart(req); + HandleUploadOutcome(state, part_number, req, outcome); + return Status::OK(); + }; + ARROW_RETURN_NOT_OK(SubmitIO(io_context_, std::move(deferred))); + } + + ++part_number_; + + return Status::OK(); + } + + static void HandleUploadOutcome(const std::shared_ptr& state, + int part_number, + const S3Model::UploadPartRequest& req, + const Result& result) { + std::unique_lock lock(state->mutex); + if (!result.ok()) { + state->status &= result.status(); + } else { + const auto& outcome = *result; + if (!outcome.IsSuccess()) { + state->status &= UploadPartError(req, outcome); + } else { + AddCompletedPart(state, part_number, outcome.GetResult()); + } + } + // Notify completion + if (--state->parts_in_progress == 0) { + // GH-41862: avoid potential deadlock if the Future's callback is called + // with the mutex taken. + auto fut = state->pending_parts_completed; + lock.unlock(); + // State could be mutated concurrently if another thread writes to the + // stream, but in this case the Flush() call is only advisory anyway. + // Besides, it's not generally sound to write to an OutputStream from + // several threads at once. + fut.MarkFinished(state->status); + } + } + + static void AddCompletedPart(const std::shared_ptr& state, + int part_number, + const S3Model::UploadPartResult& result) { + S3Model::CompletedPart part; + // Append ETag and part number for this uploaded part + // (will be needed for upload completion in Close()) + part.SetPartNumber(part_number); + part.SetETag(result.GetETag()); + int slot = part_number - 1; + if (state->completed_parts.size() <= static_cast(slot)) { + state->completed_parts.resize(slot + 1); + } + DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet()); + state->completed_parts[slot] = std::move(part); + } + + static Status UploadPartError(const S3Model::UploadPartRequest& req, const S3Model::UploadPartOutcome& outcome) { + return ErrorToStatus( + std::forward_as_tuple("When uploading part for key '", req.GetKey(), "' in bucket '", req.GetBucket(), "': "), + "UploadPart", outcome.GetError()); + } + + protected: + std::shared_ptr holder_; + const arrow::io::IOContext io_context_; + const S3Path path_; + const std::shared_ptr metadata_; + const std::shared_ptr default_metadata_; + const bool background_writes_; + + int64_t part_upload_size_; + + Aws::String upload_id_; + bool closed_ = true; + int64_t pos_ = 0; + int32_t part_number_ = 1; + std::shared_ptr current_part_; + int64_t current_part_size_ = 0; + + // This struct is kept alive through background writes to avoid problems + // in the completion handler. + struct UploadState { + std::mutex mutex; + // Only populated for multi-part uploads. + Aws::Vector completed_parts; + int64_t parts_in_progress = 0; + arrow::Status status; + arrow::Future<> pending_parts_completed = arrow::Future<>::MakeFinished(arrow::Status::OK()); + }; + std::shared_ptr upload_state_; +}; + +class MultiPartUploadS3FS::Impl : public std::enable_shared_from_this { + public: + ClientBuilder builder_; + arrow::io::IOContext io_context_; + std::shared_ptr holder_; + std::optional backend_; + + static constexpr int32_t kListObjectsMaxKeys = 1000; + // At most 1000 keys per multiple-delete request + static constexpr int32_t kMultipleDeleteMaxKeys = 1000; + + explicit Impl(S3Options options, arrow::io::IOContext io_context) + : builder_(std::move(options)), io_context_(io_context) {} + + arrow::Status Init() { return builder_.BuildClient(io_context_).Value(&holder_); } + + const S3Options& options() const { return builder_.options(); } + + std::string region() const { return std::string(FromAwsString(builder_.config().region)); } + + template + void SaveBackend(const Aws::Client::AWSError& error) { + if (!backend_ || *backend_ == S3Backend::Other) { + backend_ = DetectS3Backend(error); + } + } + + // Tests to see if a bucket exists + Result BucketExists(const std::string& bucket) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + S3Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(bucket)); + + auto outcome = client_lock.Move()->HeadBucket(req); + if (!outcome.IsSuccess()) { + if (!IsNotFound(outcome.GetError())) { + return ErrorToStatus(std::forward_as_tuple("When testing for existence of bucket '", bucket, "': "), + "HeadBucket", outcome.GetError()); + } + return false; + } + return true; + } + + // Create a bucket. Successful if bucket already exists. + arrow::Status CreateBucket(const std::string& bucket) { + // Check bucket exists first. + { + S3Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(bucket)); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock.Move()->HeadBucket(req); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else if (!IsNotFound(outcome.GetError())) { + return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "), "HeadBucket", + outcome.GetError()); + } + + if (!options().allow_bucket_creation) { + return Status::IOError("Bucket '", bucket, "' not found. ", + "To create buckets, enable the allow_bucket_creation option."); + } + } + + S3Model::CreateBucketConfiguration config; + S3Model::CreateBucketRequest req; + auto _region = region(); + // AWS S3 treats the us-east-1 differently than other regions + // https://docs.aws.amazon.com/cli/latest/reference/s3api/create-bucket.html + if (_region != "us-east-1") { + config.SetLocationConstraint( + S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(ToAwsString(_region))); + } + req.SetBucket(ToAwsString(bucket)); + req.SetCreateBucketConfiguration(config); + + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock.Move()->CreateBucket(req); + if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) { + return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "), "CreateBucket", + outcome.GetError()); + } + return Status::OK(); + } + + // Create a directory-like object with empty contents. Successful if already exists. + arrow::Status CreateEmptyDir(const std::string& bucket, std::string_view key_view) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + auto key = EnsureTrailingSlash(key_view); + S3Model::PutObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + req.SetContentType(kAwsDirectoryContentType); + req.SetBody(std::make_shared("")); + return OutcomeToStatus(std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "), + "PutObject", client_lock.Move()->PutObject(req)); + } + + arrow::Status DeleteObject(const std::string& bucket, const std::string& key) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + S3Model::DeleteObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + return OutcomeToStatus(std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "), + "DeleteObject", client_lock.Move()->DeleteObject(req)); + } + + arrow::Status CopyObject(const S3Path& src_path, const S3Path& dest_path) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + S3Model::CopyObjectRequest req; + req.SetBucket(ToAwsString(dest_path.bucket)); + req.SetKey(ToAwsString(dest_path.key)); + // ARROW-13048: Copy source "Must be URL-encoded" according to AWS SDK docs. + // However at least in 1.8 and 1.9 the SDK URL-encodes the path for you + req.SetCopySource(src_path.ToAwsString()); + return OutcomeToStatus(std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '", src_path.bucket, + "' to key '", dest_path.key, "' in bucket '", dest_path.bucket, "': "), + "CopyObject", client_lock.Move()->CopyObject(req)); + } + + // On Minio, an empty "directory" doesn't satisfy the same API requests as + // a non-empty "directory". This is a Minio-specific quirk, but we need + // to handle it for unit testing. + + // If this method is called after HEAD on "bucket/key" already returned a 404, + // can pass the given outcome to spare a spurious HEAD call. + Result IsEmptyDirectory(const std::string& bucket, + const std::string& key, + const S3Model::HeadObjectOutcome* previous_outcome = nullptr) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + if (previous_outcome) { + // Fetch the backend from the previous error + DCHECK(!previous_outcome->IsSuccess()); + if (!backend_) { + SaveBackend(previous_outcome->GetError()); + DCHECK(backend_); + } + if (backend_ != S3Backend::Minio) { + // HEAD already returned a 404, nothing more to do + return false; + } + } + + // We come here in one of two situations: + // - we don't know the backend and there is no previous outcome + // - the backend is Minio + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + if (backend_ && *backend_ == S3Backend::Minio) { + // Minio wants a slash at the end, Amazon doesn't + req.SetKey(ToAwsString(key) + kSep); + } else { + req.SetKey(ToAwsString(key)); + } + + auto outcome = client_lock.Move()->HeadObject(req); + if (outcome.IsSuccess()) { + return true; + } + if (!backend_) { + SaveBackend(outcome.GetError()); + DCHECK(backend_); + if (*backend_ == S3Backend::Minio) { + // Try again with separator-terminated key (see above) + return IsEmptyDirectory(bucket, key); + } + } + if (IsNotFound(outcome.GetError())) { + return false; + } + return ErrorToStatus( + std::forward_as_tuple("When reading information for key '", key, "' in bucket '", bucket, "': "), "HeadObject", + outcome.GetError()); + } + + Result IsEmptyDirectory(const S3Path& path, const S3Model::HeadObjectOutcome* previous_outcome = nullptr) { + return IsEmptyDirectory(path.bucket, path.key, previous_outcome); + } + + Result IsNonEmptyDirectory(const S3Path& path) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + S3Model::ListObjectsV2Request req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetPrefix(ToAwsString(path.key) + kSep); + req.SetDelimiter(Aws::String() + kSep); + req.SetMaxKeys(1); + auto outcome = client_lock.Move()->ListObjectsV2(req); + if (outcome.IsSuccess()) { + const S3Model::ListObjectsV2Result& r = outcome.GetResult(); + // In some cases, there may be 0 keys but some prefixes + return r.GetKeyCount() > 0 || !r.GetCommonPrefixes().empty(); + } + if (IsNotFound(outcome.GetError())) { + return false; + } + return ErrorToStatus( + std::forward_as_tuple("When listing objects under key '", path.key, "' in bucket '", path.bucket, "': "), + "ListObjectsV2", outcome.GetError()); + } + + static FileInfo MakeDirectoryInfo(std::string dirname) { + FileInfo dir; + dir.set_type(FileType::Directory); + dir.set_path(std::move(dirname)); + return dir; + } + + static std::vector MakeDirectoryInfos(std::vector dirnames) { + std::vector dir_infos; + for (auto& dirname : dirnames) { + dir_infos.push_back(MakeDirectoryInfo(std::move(dirname))); + } + return dir_infos; + } + + using FileInfoSink = PushGenerator>::Producer; + + struct FileListerState { + FileInfoSink files_queue; + const bool allow_not_found; + const int max_recursion; + const bool include_implicit_dirs; + const io::IOContext io_context; + S3ClientHolder* const holder; + + S3Model::ListObjectsV2Request req; + std::unordered_set directories; + bool empty = true; + + FileListerState(PushGenerator>::Producer files_queue, + FileSelector select, + const std::string& bucket, + const std::string& key, + bool include_implicit_dirs, + io::IOContext io_context, + S3ClientHolder* holder) + : files_queue(std::move(files_queue)), + allow_not_found(select.allow_not_found), + max_recursion(select.max_recursion), + include_implicit_dirs(include_implicit_dirs), + io_context(std::move(io_context)), + holder(holder) { + req.SetBucket(bucket); + req.SetMaxKeys(kListObjectsMaxKeys); + if (!key.empty()) { + req.SetPrefix(key + kSep); + } + if (!select.recursive) { + req.SetDelimiter(Aws::String() + kSep); + } + } + + void Finish() { + // `empty` means that we didn't get a single file info back from S3. This may be + // a situation that we should consider as PathNotFound. + // + // * If the prefix is empty then we were querying the contents of an entire bucket + // and this is not a PathNotFound case because if the bucket didn't exist then + // we would have received an error and not an empty set of results. + // + // * If the prefix is not empty then we asked for all files under a particular + // directory. S3 will also return the directory itself, if it exists. So if + // we get zero results then we know that there are no files under the directory + // and the directory itself doesn't exist. This should be considered PathNotFound + if (empty && !allow_not_found && !req.GetPrefix().empty()) { + files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix())); + } + } + + // Given a path, iterate through all possible sub-paths and, if we haven't + // seen that sub-path before, return it. + // + // For example, given A/B/C we might return A/B and A if we have not seen + // those paths before. This allows us to consider "implicit" directories which + // don't exist as objects in S3 but can be inferred. + std::vector GetNewDirectories(const std::string_view& path) { + std::string current(path); + std::string base = req.GetBucket(); + if (!req.GetPrefix().empty()) { + base = base + kSep + std::string(RemoveTrailingSlash(req.GetPrefix())); + } + std::vector new_directories; + while (true) { + const std::string parent_dir = GetAbstractPathParent(current).first; + if (parent_dir.empty()) { + break; + } + current = parent_dir; + if (current == base) { + break; + } + if (directories.insert(parent_dir).second) { + new_directories.push_back(std::move(parent_dir)); + } + } + return new_directories; + } + }; + + struct FileListerTask : public util::AsyncTaskScheduler::Task { + std::shared_ptr state; + util::AsyncTaskScheduler* scheduler; + + FileListerTask(std::shared_ptr state, util::AsyncTaskScheduler* scheduler) + : state(std::move(state)), scheduler(scheduler) {} + + std::vector ToFileInfos(const std::string& bucket, + const std::string& prefix, + const S3Model::ListObjectsV2Result& result) { + std::vector file_infos; + // If this is a non-recursive listing we may see "common prefixes" which represent + // directories we did not recurse into. We will add those as directories. + for (const auto& child_prefix : result.GetCommonPrefixes()) { + const auto child_key = RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix())); + std::stringstream child_path_ss; + child_path_ss << bucket << kSep << child_key; + FileInfo info; + info.set_path(child_path_ss.str()); + info.set_type(FileType::Directory); + file_infos.push_back(std::move(info)); + } + // S3 doesn't have any concept of "max depth" and so we emulate it by counting the + // number of '/' characters. E.g. if the user is searching bucket/subdirA/subdirB + // then the starting depth is 2. + // A file subdirA/subdirB/somefile will have a child depth of 2 and a "depth" of 0. + // A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 and a + // "depth" of 1 + int base_depth = arrow::fs::internal::GetAbstractPathDepth(prefix); + for (const auto& obj : result.GetContents()) { + if (obj.GetKey() == prefix) { + // S3 will return the basedir itself (if it is a file / empty file). We don't + // want that. But this is still considered "finding the basedir" and so we mark + // it "not empty". + state->empty = false; + continue; + } + std::string child_key = std::string(RemoveTrailingSlash(FromAwsString(obj.GetKey()))); + bool had_trailing_slash = child_key.size() != obj.GetKey().size(); + int child_depth = arrow::fs::internal::GetAbstractPathDepth(child_key); + // Recursion depth is 1 smaller because a path with depth 1 (e.g. foo) is + // considered to have a "recursion" of 0 + int recursion_depth = child_depth - base_depth - 1; + if (recursion_depth > state->max_recursion) { + // If we have A/B/C/D and max_recursion is 2 then we ignore this (don't add it + // to file_infos) but we still want to potentially add A and A/B as directories. + // So we "pretend" like we have a file A/B/C for the call to GetNewDirectories + // below + int to_trim = recursion_depth - state->max_recursion - 1; + if (to_trim > 0) { + child_key = bucket + kSep + arrow::fs::internal::SliceAbstractPath(child_key, 0, child_depth - to_trim); + } else { + child_key = bucket + kSep + child_key; + } + } else { + // If the file isn't beyond our max recursion then count it as a file + // unless it's empty and then it depends on whether or not the file ends + // with a trailing slash + std::stringstream child_path_ss; + child_path_ss << bucket << kSep << child_key; + child_key = child_path_ss.str(); + if (obj.GetSize() > 0 || !had_trailing_slash) { + // We found a real file. + // XXX Ideally, for 0-sized files we would also check the Content-Type + // against kAwsDirectoryContentType, but ListObjectsV2 does not give + // that information. + FileInfo info; + info.set_path(child_key); + FileObjectToInfo(obj, &info); + file_infos.push_back(std::move(info)); + } else { + // We found an empty file and we want to treat it like a directory. Only + // add it if we haven't seen this directory before. + if (state->directories.insert(child_key).second) { + file_infos.push_back(MakeDirectoryInfo(child_key)); + } + } + } + + if (state->include_implicit_dirs) { + // Now that we've dealt with the file itself we need to look at each of the + // parent paths and potentially add them as directories. For example, after + // finding a file A/B/C/D we want to consider adding directories A, A/B, and + // A/B/C. + for (const auto& newdir : state->GetNewDirectories(child_key)) { + file_infos.push_back(MakeDirectoryInfo(newdir)); + } + } + } + if (file_infos.size() > 0) { + state->empty = false; + } + return file_infos; + } + + void Run() { + // We are on an I/O thread now so just synchronously make the call and interpret the + // results. + Result client_lock = state->holder->Lock(); + if (!client_lock.ok()) { + state->files_queue.Push(client_lock.status()); + return; + } + S3Model::ListObjectsV2Outcome outcome = client_lock->Move()->ListObjectsV2(state->req); + if (!outcome.IsSuccess()) { + const auto& err = outcome.GetError(); + if (state->allow_not_found && IsNotFound(err)) { + return; + } + state->files_queue.Push( + ErrorToStatus(std::forward_as_tuple("When listing objects under key '", state->req.GetPrefix(), + "' in bucket '", state->req.GetBucket(), "': "), + "ListObjectsV2", err)); + return; + } + const S3Model::ListObjectsV2Result& result = outcome.GetResult(); + // We could immediately schedule the continuation (if there are enough results to + // trigger paging) but that would introduce race condition complexity for arguably + // little benefit. + std::vector file_infos = ToFileInfos(state->req.GetBucket(), state->req.GetPrefix(), result); + if (file_infos.size() > 0) { + state->files_queue.Push(std::move(file_infos)); + } + + // If there are enough files to warrant a continuation then go ahead and schedule + // that now. + if (result.GetIsTruncated()) { + DCHECK(!result.GetNextContinuationToken().empty()); + state->req.SetContinuationToken(result.GetNextContinuationToken()); + scheduler->AddTask(std::make_unique(state, scheduler)); + } else { + // Otherwise, we have finished listing all the files + state->Finish(); + } + } + + Result> operator()() override { + return state->io_context.executor()->Submit([this] { + Run(); + return Status::OK(); + }); + } + std::string_view name() const override { return "S3ListFiles"; } + }; + + // Lists all file, potentially recursively, in a bucket + // + // include_implicit_dirs controls whether or not implicit directories should be + // included. These are directories that are not actually file objects but instead are + // inferred from other objects. + // + // For example, if a file exists with path A/B/C then implicit directories A/ and A/B/ + // will exist even if there are no file objects with these paths. + void ListAsync(const FileSelector& select, + const std::string& bucket, + const std::string& key, + bool include_implicit_dirs, + util::AsyncTaskScheduler* scheduler, + FileInfoSink sink) { + // We can only fetch kListObjectsMaxKeys files at a time and so we create a + // scheduler and schedule a task to grab the first batch. Once that's done we + // schedule a new task for the next batch. All of these tasks share the same + // FileListerState object but none of these tasks run in parallel so there is + // no need to worry about mutexes + auto state = std::make_shared(sink, select, bucket, key, include_implicit_dirs, io_context_, + this->holder_.get()); + + // Create the first file lister task (it may spawn more) + auto file_lister_task = std::make_unique(state, scheduler); + scheduler->AddTask(std::move(file_lister_task)); + } + + // Fully list all files from all buckets + void FullListAsync(bool include_implicit_dirs, + util::AsyncTaskScheduler* scheduler, + FileInfoSink sink, + bool recursive) { + scheduler->AddSimpleTask( + [this, scheduler, sink, include_implicit_dirs, recursive]() mutable { + return ListBucketsAsync().Then([this, scheduler, sink, include_implicit_dirs, + recursive](const std::vector& buckets) mutable { + // Return the buckets themselves as directories + std::vector buckets_as_directories = MakeDirectoryInfos(buckets); + sink.Push(std::move(buckets_as_directories)); + + if (recursive) { + // Recursively list each bucket (these will run in parallel but sink + // should be thread safe and so this is ok) + for (const auto& bucket : buckets) { + FileSelector select; + select.allow_not_found = true; + select.recursive = true; + select.base_dir = bucket; + ListAsync(select, bucket, "", include_implicit_dirs, scheduler, sink); + } + } + }); + }, + std::string_view("FullListBucketScan")); + } + + // Delete multiple objects at once + Future<> DeleteObjectsAsync(const std::string& bucket, const std::vector& keys) { + struct DeleteCallback { + std::string bucket; + + arrow::Status operator()(const S3Model::DeleteObjectsOutcome& outcome) const { + if (!outcome.IsSuccess()) { + return ErrorToStatus("DeleteObjects", outcome.GetError()); + } + // Also need to check per-key errors, even on successful outcome + // See + // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html + const auto& errors = outcome.GetResult().GetErrors(); + if (!errors.empty()) { + std::stringstream ss; + ss << "Got the following " << errors.size() << " errors when deleting objects in S3 bucket '" << bucket + << "':\n"; + for (const auto& error : errors) { + ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n"; + } + return Status::IOError(ss.str()); + } + return Status::OK(); + } + }; + + const auto chunk_size = static_cast(kMultipleDeleteMaxKeys); + const DeleteCallback delete_cb{bucket}; + + std::vector> futures; + futures.reserve(bit_util::CeilDiv(keys.size(), chunk_size)); + + for (size_t start = 0; start < keys.size(); start += chunk_size) { + S3Model::DeleteObjectsRequest req; + S3Model::Delete del; + size_t remaining = keys.size() - start; + size_t next_chunk_size = std::min(remaining, chunk_size); + for (size_t i = start; i < start + next_chunk_size; ++i) { + del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i]))); + } + req.SetBucket(ToAwsString(bucket)); + req.SetDelete(std::move(del)); + ARROW_ASSIGN_OR_RAISE( + auto fut, SubmitIO(io_context_, [holder = holder_, req = std::move(req), delete_cb]() -> arrow::Status { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + return delete_cb(client_lock.Move()->DeleteObjects(req)); + })); + futures.push_back(std::move(fut)); + } + + return AllFinished(futures); + } + + arrow::Status DeleteObjects(const std::string& bucket, const std::vector& keys) { + return DeleteObjectsAsync(bucket, keys).status(); + } + + // Check to make sure the given path is not a file + // + // Returns true if the path seems to be a directory, false if it is a file + Future EnsureIsDirAsync(const std::string& bucket, const std::string& key) { + if (key.empty()) { + // There is no way for a bucket to be a file + return Future::MakeFinished(true); + } + auto self = shared_from_this(); + return DeferNotOk(SubmitIO(io_context_, [self, bucket, key]() mutable -> Result { + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + + ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock()); + auto outcome = client_lock.Move()->HeadObject(req); + if (outcome.IsSuccess()) { + return IsDirectory(key, outcome.GetResult()); + } + if (IsNotFound(outcome.GetError())) { + // If we can't find it then it isn't a file. + return true; + } else { + return ErrorToStatus( + std::forward_as_tuple("When getting information for key '", key, "' in bucket '", bucket, "': "), + "HeadObject", outcome.GetError()); + } + })); + } + + // Some operations require running multiple S3 calls, either in parallel or serially. We + // need to ensure that the S3 filesystem instance stays valid and that S3 isn't + // finalized. We do this by wrapping all the tasks in a scheduler which keeps the + // resources alive + Future<> RunInScheduler(std::function callable) { + auto self = shared_from_this(); + FnOnce initial_task = [callable = std::move(callable), + this](util::AsyncTaskScheduler* scheduler) mutable { + return callable(scheduler, this); + }; + Future<> scheduler_fut = util::AsyncTaskScheduler::Make( + std::move(initial_task), + /*abort_callback=*/ + [](const Status& st) { + // No need for special abort logic. + }, + io_context_.stop_token()); + // Keep self alive until all tasks finish + return scheduler_fut.Then([self]() { return Status::OK(); }); + } + + Future<> DoDeleteDirContentsAsync(const std::string& bucket, const std::string& key) { + return RunInScheduler([bucket, key](util::AsyncTaskScheduler* scheduler, MultiPartUploadS3FS::Impl* self) { + scheduler->AddSimpleTask( + [=] { + FileSelector select; + select.base_dir = bucket + kSep + key; + select.recursive = true; + select.allow_not_found = false; + + FileInfoGenerator file_infos = self->GetFileInfoGenerator(select); + + auto handle_file_infos = [=](const std::vector& file_infos) { + std::vector file_paths; + for (const auto& file_info : file_infos) { + DCHECK_GT(file_info.path().size(), bucket.size()); + auto file_path = file_info.path().substr(bucket.size() + 1); + if (file_info.IsDirectory()) { + // The selector returns FileInfo objects for directories with a + // a path that never ends in a trailing slash, but for AWS the file + // needs to have a trailing slash to recognize it as directory + // (https://github.com/apache/arrow/issues/38618) + DCHECK_OK(arrow::fs::internal::AssertNoTrailingSlash(file_path)); + file_path = file_path + kSep; + } + file_paths.push_back(std::move(file_path)); + } + scheduler->AddSimpleTask( + [=, file_paths = std::move(file_paths)] { return self->DeleteObjectsAsync(bucket, file_paths); }, + std::string_view("DeleteDirContentsDeleteTask")); + return Status::OK(); + }; + + return VisitAsyncGenerator(AsyncGenerator>(std::move(file_infos)), + std::move(handle_file_infos)); + }, + std::string_view("ListFilesForDelete")); + return Status::OK(); + }); + } + + Future<> DeleteDirContentsAsync(const std::string& bucket, const std::string& key) { + auto self = shared_from_this(); + return EnsureIsDirAsync(bucket, key).Then([self, bucket, key](bool is_dir) -> Future<> { + if (!is_dir) { + return Status::IOError("Cannot delete directory contents at ", bucket, kSep, key, " because it is a file"); + } + return self->DoDeleteDirContentsAsync(bucket, key); + }); + } + + FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) { + auto maybe_base_path = S3Path::FromString(select.base_dir); + if (!maybe_base_path.ok()) { + return MakeFailingGenerator(maybe_base_path.status()); + } + auto base_path = *std::move(maybe_base_path); + + PushGenerator> generator; + Future<> scheduler_fut = RunInScheduler([select, base_path, sink = generator.producer()]( + util::AsyncTaskScheduler* scheduler, MultiPartUploadS3FS::Impl* self) { + if (base_path.empty()) { + bool should_recurse = select.recursive && select.max_recursion > 0; + self->FullListAsync(/*include_implicit_dirs=*/true, scheduler, sink, should_recurse); + } else { + self->ListAsync(select, base_path.bucket, base_path.key, + /*include_implicit_dirs=*/true, scheduler, sink); + } + return Status::OK(); + }); + + // Mark the generator done once all tasks are finished + scheduler_fut.AddCallback([sink = generator.producer()](const Status& st) mutable { + if (!st.ok()) { + sink.Push(st); + } + sink.Close(); + }); + + return generator; + } + + arrow::Status EnsureDirectoryExists(const S3Path& path) { + if (!path.key.empty()) { + return CreateEmptyDir(path.bucket, path.key); + } + return Status::OK(); + } + + arrow::Status EnsureParentExists(const S3Path& path) { + if (path.has_parent()) { + return EnsureDirectoryExists(path.parent()); + } + return Status::OK(); + } + + static Result> ProcessListBuckets(const S3Model::ListBucketsOutcome& outcome) { + if (!outcome.IsSuccess()) { + return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), "ListBuckets", outcome.GetError()); + } + std::vector buckets; + buckets.reserve(outcome.GetResult().GetBuckets().size()); + for (const auto& bucket : outcome.GetResult().GetBuckets()) { + buckets.emplace_back(FromAwsString(bucket.GetName())); + } + return buckets; + } + + Result> ListBuckets() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + return ProcessListBuckets(client_lock.Move()->ListBuckets()); + } + + Future> ListBucketsAsync() { + auto deferred = [self = shared_from_this()]() mutable -> Result> { + ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock()); + return self->ProcessListBuckets(client_lock.Move()->ListBuckets()); + }; + return DeferNotOk(SubmitIO(io_context_, std::move(deferred))); + } + + Result> OpenInputFile(const std::string& s, S3FileSystem* fs) { + ARROW_RETURN_NOT_OK(arrow::fs::internal::AssertNoTrailingSlash(s)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + RETURN_NOT_OK(ValidateFilePath(path)); + + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = std::make_shared(holder_, fs->io_context(), path); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenInputFile(const FileInfo& info, S3FileSystem* fs) { + ARROW_RETURN_NOT_OK(arrow::fs::internal::AssertNoTrailingSlash(info.path())); + if (info.type() == FileType::NotFound) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return ::arrow::fs::internal::NotAFile(info.path()); + } + + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); + RETURN_NOT_OK(ValidateFilePath(path)); + + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = std::make_shared(holder_, fs->io_context(), path, info.size()); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } +}; + +MultiPartUploadS3FS::~MultiPartUploadS3FS() {} + +Result> MultiPartUploadS3FS::Make(const S3Options& options, + const io::IOContext& io_context) { + RETURN_NOT_OK(CheckS3Initialized()); + + std::shared_ptr ptr(new MultiPartUploadS3FS(options, io_context)); + RETURN_NOT_OK(ptr->impl_->Init()); + return ptr; +} + +bool MultiPartUploadS3FS::Equals(const FileSystem& other) const { + if (this == &other) { + return true; + } + if (other.type_name() != type_name()) { + return false; + } + const auto& s3fs = ::arrow::fs::internal::checked_cast(other); + return options().Equals(s3fs.options()); +} + +Result MultiPartUploadS3FS::PathFromUri(const std::string& uri_string) const { + return arrow::fs::internal::PathFromUriHelper(uri_string, {"multiPartUploadS3"}, /*accept_local_paths=*/false, + arrow::fs::internal::AuthorityHandlingBehavior::kPrepend); +} + +Result MultiPartUploadS3FS::GetFileInfo(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, impl_->holder_->Lock()); + + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + FileInfo info; + info.set_path(s); + + if (path.empty()) { + // It's the root path "" + info.set_type(FileType::Directory); + return info; + } else if (path.key.empty()) { + // It's a bucket + S3Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(path.bucket)); + + auto outcome = client_lock.Move()->HeadBucket(req); + if (!outcome.IsSuccess()) { + if (!IsNotFound(outcome.GetError())) { + const auto msg = "When getting information for bucket '" + path.bucket + "': "; + return ErrorToStatus(msg, "HeadBucket", outcome.GetError(), impl_->options().region); + } + info.set_type(FileType::NotFound); + return info; + } + // NOTE: S3 doesn't have a bucket modification time. Only a creation + // time is available, and you have to list all buckets to get it. + info.set_type(FileType::Directory); + return info; + } else { + // It's an object + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetKey(ToAwsString(path.key)); + + auto outcome = client_lock.Move()->HeadObject(req); + if (outcome.IsSuccess()) { + // "File" object found + FileObjectToInfo(path.key, outcome.GetResult(), &info); + return info; + } + if (!IsNotFound(outcome.GetError())) { + const auto msg = "When getting information for key '" + path.key + "' in bucket '" + path.bucket + "': "; + return ErrorToStatus(msg, "HeadObject", outcome.GetError(), impl_->options().region); + } + // Not found => perhaps it's an empty "directory" + ARROW_ASSIGN_OR_RAISE(bool is_dir, impl_->IsEmptyDirectory(path, &outcome)); + if (is_dir) { + info.set_type(FileType::Directory); + return info; + } + // Not found => perhaps it's a non-empty "directory" + ARROW_ASSIGN_OR_RAISE(is_dir, impl_->IsNonEmptyDirectory(path)); + if (is_dir) { + info.set_type(FileType::Directory); + } else { + info.set_type(FileType::NotFound); + } + return info; + } +} + +Result MultiPartUploadS3FS::GetFileInfo(const FileSelector& select) { + Future> file_infos_fut = CollectAsyncGenerator(GetFileInfoGenerator(select)); + ARROW_ASSIGN_OR_RAISE(std::vector file_infos, file_infos_fut.result()); + FileInfoVector combined_file_infos; + for (const auto& file_info_vec : file_infos) { + combined_file_infos.insert(combined_file_infos.end(), file_info_vec.begin(), file_info_vec.end()); + } + return combined_file_infos; +} + +FileInfoGenerator MultiPartUploadS3FS::GetFileInfoGenerator(const FileSelector& select) { + return impl_->GetFileInfoGenerator(select); +} + +arrow::Status MultiPartUploadS3FS::CreateDir(const std::string& s, bool recursive) { + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + + if (path.key.empty()) { + // Create bucket + return impl_->CreateBucket(path.bucket); + } + + FileInfo file_info; + // Create object + if (recursive) { + // Ensure bucket exists + ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket)); + if (!bucket_exists) { + RETURN_NOT_OK(impl_->CreateBucket(path.bucket)); + } + + auto key_i = path.key_parts.begin(); + std::string parent_key{}; + if (options().check_directory_existence_before_creation) { + // Walk up the directory first to find the first existing parent + for (const auto& part : path.key_parts) { + parent_key += part; + parent_key += kSep; + } + for (key_i = path.key_parts.end(); key_i-- != path.key_parts.begin();) { + ARROW_ASSIGN_OR_RAISE(file_info, this->GetFileInfo(path.bucket + kSep + parent_key)); + if (file_info.type() != FileType::NotFound) { + // Found! + break; + } else { + // remove the kSep and the part + parent_key.pop_back(); + parent_key.erase(parent_key.end() - key_i->size(), parent_key.end()); + } + } + key_i++; // Above for loop moves one extra iterator at the end + } + // Ensure that all parents exist, then the directory itself + // Create all missing directories + for (; key_i < path.key_parts.end(); ++key_i) { + parent_key += *key_i; + parent_key += kSep; + RETURN_NOT_OK(impl_->CreateEmptyDir(path.bucket, parent_key)); + } + return Status::OK(); + } else { + // Check parent dir exists + if (path.has_parent()) { + S3Path parent_path = path.parent(); + ARROW_ASSIGN_OR_RAISE(bool exists, impl_->IsNonEmptyDirectory(parent_path)); + if (!exists) { + ARROW_ASSIGN_OR_RAISE(exists, impl_->IsEmptyDirectory(parent_path)); + } + if (!exists) { + return Status::IOError("Cannot create directory '", path.full_path, "': parent directory does not exist"); + } + } + } + + // Check if the directory exists already + if (options().check_directory_existence_before_creation) { + ARROW_ASSIGN_OR_RAISE(file_info, this->GetFileInfo(path.full_path)); + if (file_info.type() != FileType::NotFound) { + return Status::OK(); + } + } + // XXX Should we check that no non-directory entry exists? + // Minio does it for us, not sure about other S3 implementations. + return impl_->CreateEmptyDir(path.bucket, path.key); +} + +arrow::Status MultiPartUploadS3FS::DeleteDir(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + if (path.empty()) { + return Status::NotImplemented("Cannot delete all S3 buckets"); + } + RETURN_NOT_OK(impl_->DeleteDirContentsAsync(path.bucket, path.key).status()); + if (path.key.empty() && options().allow_bucket_deletion) { + // Delete bucket + ARROW_ASSIGN_OR_RAISE(auto client_lock, impl_->holder_->Lock()); + S3Model::DeleteBucketRequest req; + req.SetBucket(ToAwsString(path.bucket)); + return OutcomeToStatus(std::forward_as_tuple("When deleting bucket '", path.bucket, "': "), "DeleteBucket", + client_lock.Move()->DeleteBucket(req)); + } else if (path.key.empty()) { + return Status::IOError("Would delete bucket '", path.bucket, "'. ", + "To delete buckets, enable the allow_bucket_deletion option."); + } else { + // Delete "directory" + RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep)); + // Parent may be implicitly deleted if it became empty, recreate it + return impl_->EnsureParentExists(path); + } +} + +arrow::Status MultiPartUploadS3FS::DeleteDirContents(const std::string& s, bool missing_dir_ok) { + return DeleteDirContentsAsync(s, missing_dir_ok).status(); +} + +Future<> MultiPartUploadS3FS::DeleteDirContentsAsync(const std::string& s, bool missing_dir_ok) { + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + + if (path.empty()) { + return Status::NotImplemented("Cannot delete all S3 buckets"); + } + auto self = impl_; + return impl_->DeleteDirContentsAsync(path.bucket, path.key) + .Then( + [path, self]() { + // Directory may be implicitly deleted, recreate it + return self->EnsureDirectoryExists(path); + }, + [missing_dir_ok](const Status& err) { + if (missing_dir_ok && ::arrow::internal::ErrnoFromStatus(err) == ENOENT) { + return Status::OK(); + } + return err; + }); +} + +arrow::Status MultiPartUploadS3FS::DeleteRootDirContents() { + return Status::NotImplemented("Cannot delete all S3 buckets"); +} + +arrow::Status MultiPartUploadS3FS::DeleteFile(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, impl_->holder_->Lock()); + + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + RETURN_NOT_OK(ValidateFilePath(path)); + + // Check the object exists + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetKey(ToAwsString(path.key)); + + auto outcome = client_lock.Move()->HeadObject(req); + if (!outcome.IsSuccess()) { + if (IsNotFound(outcome.GetError())) { + return PathNotFound(path); + } else { + return ErrorToStatus( + std::forward_as_tuple("When getting information for key '", path.key, "' in bucket '", path.bucket, "': "), + "HeadObject", outcome.GetError()); + } + } + // Object found, delete it + RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key)); + // Parent may be implicitly deleted if it became empty, recreate it + return impl_->EnsureParentExists(path); +} + +arrow::Status MultiPartUploadS3FS::Move(const std::string& src, const std::string& dest) { + // XXX We don't implement moving directories as it would be too expensive: + // one must copy all directory contents one by one (including object data), + // then delete the original contents. + + ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src)); + RETURN_NOT_OK(ValidateFilePath(src_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest)); + RETURN_NOT_OK(ValidateFilePath(dest_path)); + + if (src_path == dest_path) { + return Status::OK(); + } + RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path)); + RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key)); + // Source parent may be implicitly deleted if it became empty, recreate it + return impl_->EnsureParentExists(src_path); +} + +arrow::Status MultiPartUploadS3FS::CopyFile(const std::string& src, const std::string& dest) { + ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src)); + RETURN_NOT_OK(ValidateFilePath(src_path)); + ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest)); + RETURN_NOT_OK(ValidateFilePath(dest_path)); + + if (src_path == dest_path) { + return Status::OK(); + } + return impl_->CopyObject(src_path, dest_path); +} + +arrow::Result> MultiPartUploadS3FS::OpenOutputStreamWithUploadSize( + const std::string& s, int64_t upload_size) { + return OpenOutputStreamWithUploadSize(s, std::shared_ptr{}, upload_size); +}; + +arrow::Result> MultiPartUploadS3FS::OpenOutputStreamWithUploadSize( + const std::string& s, const std::shared_ptr& metadata, int64_t upload_size) { + ARROW_RETURN_NOT_OK(arrow::fs::internal::AssertNoTrailingSlash(s)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + RETURN_NOT_OK(ValidateFilePath(path)); + + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = + std::make_shared(impl_->holder_, io_context(), path, impl_->options(), metadata, upload_size); + RETURN_NOT_OK(ptr->Init()); + return ptr; +}; + +MultiPartUploadS3FS::MultiPartUploadS3FS(const arrow::fs::S3Options& options, const arrow::io::IOContext& io_context) + : arrow::fs::S3FileSystem(options, io_context), impl_(std::make_shared(options, io_context)) { + default_async_is_sync_ = false; +} + +arrow::Result> MultiPartUploadS3FS::OpenInputStream(const std::string& s) { + return impl_->OpenInputFile(s, this); +} + +arrow::Result> MultiPartUploadS3FS::OpenInputStream(const FileInfo& info) { + return impl_->OpenInputFile(info, this); +} + +arrow::Result> MultiPartUploadS3FS::OpenInputFile(const std::string& s) { + return impl_->OpenInputFile(s, this); +} + +arrow::Result> MultiPartUploadS3FS::OpenInputFile(const FileInfo& info) { + return impl_->OpenInputFile(info, this); +} + +arrow::Result> MultiPartUploadS3FS::OpenOutputStream( + const std::string& s, const std::shared_ptr& metadata) { + return OpenOutputStreamWithUploadSize(s, std::shared_ptr{}, 10 * 1024 * 1024); +}; + +arrow::Result> MultiPartUploadS3FS::OpenAppendStream( + const std::string& path, const std::shared_ptr& metadata) { + return Status::NotImplemented("It is not possible to append efficiently to S3 objects"); +} + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/src/filesystem/s3/util_internal.cpp b/cpp/src/filesystem/s3/util_internal.cpp new file mode 100644 index 0000000..a86461b --- /dev/null +++ b/cpp/src/filesystem/s3/util_internal.cpp @@ -0,0 +1,235 @@ + +#include "filesystem/s3/util_internal.h" + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/io_util.h" +#include "arrow/util/string.h" + +namespace arrow { + +using internal::StatusDetailFromErrno; +using util::Uri; + +namespace fs { +namespace internal { + +TimePoint CurrentTimePoint() { + auto now = std::chrono::system_clock::now(); + return TimePoint(std::chrono::duration_cast(now.time_since_epoch())); +} + +Status CopyStream(const std::shared_ptr& src, + const std::shared_ptr& dest, + int64_t chunk_size, + const io::IOContext& io_context) { + ARROW_ASSIGN_OR_RAISE(auto chunk, AllocateBuffer(chunk_size, io_context.pool())); + + while (true) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, src->Read(chunk_size, chunk->mutable_data())); + if (bytes_read == 0) { + // EOF + break; + } + RETURN_NOT_OK(dest->Write(chunk->data(), bytes_read)); + } + + return Status::OK(); +} + +Status PathNotFound(std::string_view path) { + return Status::IOError("Path does not exist '", path, "'").WithDetail(arrow::internal::StatusDetailFromErrno(ENOENT)); +} + +Status IsADir(std::string_view path) { + return Status::IOError("Is a directory: '", path, "'").WithDetail(StatusDetailFromErrno(EISDIR)); +} + +Status NotADir(std::string_view path) { + return Status::IOError("Not a directory: '", path, "'").WithDetail(StatusDetailFromErrno(ENOTDIR)); +} + +Status NotEmpty(std::string_view path) { + return Status::IOError("Directory not empty: '", path, "'").WithDetail(StatusDetailFromErrno(ENOTEMPTY)); +} + +Status NotAFile(std::string_view path) { return Status::IOError("Not a regular file: '", path, "'"); } + +Status InvalidDeleteDirContents(std::string_view path) { + return Status::Invalid("DeleteDirContents called on invalid path '", path, "'. ", + "If you wish to delete the root directory's contents, call DeleteRootDirContents."); +} + +Result ParseFileSystemUri(const std::string& uri_string) { + Uri uri; + auto status = uri.Parse(uri_string); + if (!status.ok()) { +#ifdef _WIN32 + // Could be a "file:..." URI with backslashes instead of regular slashes. + RETURN_NOT_OK(uri.Parse(ToSlashes(uri_string))); + if (uri.scheme() != "file") { + return status; + } +#else + return status; +#endif + } + return uri; +} + +#ifdef _WIN32 +static bool IsDriveLetter(char c) { + // Can't use locale-dependent functions from the C/C++ stdlib + return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z'); +} +#endif + +bool DetectAbsolutePath(const std::string& s) { + // Is it a /-prefixed local path? + if (s.length() >= 1 && s[0] == '/') { + return true; + } +#ifdef _WIN32 + // Is it a \-prefixed local path? + if (s.length() >= 1 && s[0] == '\\') { + return true; + } + // Does it start with a drive letter in addition to being /- or \-prefixed, + // e.g. "C:\..."? + if (s.length() >= 3 && s[1] == ':' && (s[2] == '/' || s[2] == '\\') && IsDriveLetter(s[0])) { + return true; + } +#endif + return false; +} + +Result PathFromUriHelper(const std::string& uri_string, + std::vector supported_schemes, + bool accept_local_paths, + AuthorityHandlingBehavior authority_handling) { + if (internal::DetectAbsolutePath(uri_string)) { + if (accept_local_paths) { + // Normalize the path and remove any trailing slash + return std::string(internal::RemoveTrailingSlash(ToSlashes(uri_string), /*preserve_root=*/true)); + } + return Status::Invalid( + "The filesystem is not capable of loading local paths. Expected a URI but " + "received ", + uri_string); + } + Uri uri; + ARROW_RETURN_NOT_OK(uri.Parse(uri_string)); + const auto scheme = uri.scheme(); + if (std::find(supported_schemes.begin(), supported_schemes.end(), scheme) == supported_schemes.end()) { + std::string expected_schemes = ::arrow::internal::JoinStrings(supported_schemes, ", "); + return Status::Invalid("The filesystem expected a URI with one of the schemes (", expected_schemes, + ") but received ", uri_string); + } + std::string host = uri.host(); + std::string path = uri.path(); + if (host.empty()) { + // Just a path, may be absolute or relative, only allow relative paths if local + if (path[0] == '/') { + return std::string(internal::RemoveTrailingSlash(path)); + } + if (accept_local_paths) { + return std::string(internal::RemoveTrailingSlash(path)); + } + return Status::Invalid("The filesystem does not support relative paths. Received ", uri_string); + } + if (authority_handling == AuthorityHandlingBehavior::kDisallow) { + return Status::Invalid( + "The filesystem does not support the authority (host) component of a URI. " + "Received ", + uri_string); + } + if (path[0] != '/') { + // This should not be possible + return Status::Invalid( + "The provided URI has a host component but a relative path which is not " + "supported. " + "Received ", + uri_string); + } + switch (authority_handling) { + case AuthorityHandlingBehavior::kPrepend: + return std::string(internal::RemoveTrailingSlash(host + path)); + case AuthorityHandlingBehavior::kWindows: + return std::string(internal::RemoveTrailingSlash("//" + host + path)); + case AuthorityHandlingBehavior::kIgnore: + return std::string(internal::RemoveTrailingSlash(path, /*preserve_root=*/true)); + default: + return Status::Invalid("Unrecognized authority_handling value"); + } +} + +Result GlobFiles(const std::shared_ptr& filesystem, const std::string& glob) { + // TODO: ARROW-17640 + // The candidate entries at the current depth level. + // We start with the filesystem root. + FileInfoVector results{FileInfo("", FileType::Directory)}; + // The exact tail that will later require matching with candidate entries + std::string current_tail; + auto is_leading_slash = HasLeadingSlash(glob); + auto split_glob = SplitAbstractPath(glob, '/'); + + // Process one depth level at once, from root to leaf + for (const auto& glob_component : split_glob) { + if (glob_component.find_first_of("*?") == std::string::npos) { + // If there are no wildcards at the current level, just append + // the exact glob path component. + current_tail = ConcatAbstractPath(current_tail, glob_component); + continue; + } else { + FileInfoVector children; + for (const auto& res : results) { + if (res.type() != FileType::Directory) { + continue; + } + FileSelector selector; + selector.base_dir = current_tail.empty() ? res.path() : ConcatAbstractPath(res.path(), current_tail); + if (is_leading_slash) { + selector.base_dir = EnsureLeadingSlash(selector.base_dir); + } + ARROW_ASSIGN_OR_RAISE(auto entries, filesystem->GetFileInfo(selector)); + Globber globber(ConcatAbstractPath(selector.base_dir, glob_component)); + for (auto&& entry : entries) { + if (globber.Matches(entry.path())) { + children.push_back(std::move(entry)); + } + } + } + results = std::move(children); + current_tail.clear(); + } + } + + if (!current_tail.empty()) { + std::vector paths; + paths.reserve(results.size()); + for (const auto& file : results) { + paths.push_back(ConcatAbstractPath(file.path(), current_tail)); + } + ARROW_ASSIGN_OR_RAISE(results, filesystem->GetFileInfo(paths)); + } + + std::vector out; + for (auto&& file : results) { + if (file.type() != FileType::NotFound) { + out.push_back(std::move(file)); + } + } + + return out; +} + +FileSystemGlobalOptions global_options; + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index 670d228..ae2d616 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -19,33 +19,45 @@ #include #include "filesystem/fs.h" #include -#include "packed/utils/config.h" +#include "common/config.h" #include "packed/utils/serde.h" +#include "filesystem/s3/multi_part_upload_s3_fs.h" namespace milvus_storage { ParquetFileWriter::ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, - const std::string& file_path) + const std::string& file_path, + const StorageConfig& storage_config) : schema_(std::move(schema)), fs_(fs), file_path_(file_path), + storage_config_(storage_config), props_(*parquet::default_writer_properties()), count_(0) {} ParquetFileWriter::ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + const StorageConfig& storage_config, const parquet::WriterProperties& props) - : schema_(std::move(schema)), fs_(fs), file_path_(file_path), props_(props) {} + : schema_(std::move(schema)), fs_(fs), file_path_(file_path), storage_config_(storage_config), props_(props) {} Status ParquetFileWriter::Init() { auto coln = schema_->num_fields(); - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto sink, fs_.OpenOutputStream(file_path_)); - ASSIGN_OR_RETURN_ARROW_NOT_OK(auto writer, - parquet::arrow::FileWriter::Open(*schema_, arrow::default_memory_pool(), sink)); - - writer_ = std::move(writer); + if (storage_config_.use_custom_part_upload_size && storage_config_.part_size > 0) { + auto& s3fs = dynamic_cast(fs_); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto sink, + s3fs.OpenOutputStreamWithUploadSize(file_path_, storage_config_.part_size)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto writer, + parquet::arrow::FileWriter::Open(*schema_, arrow::default_memory_pool(), sink)); + writer_ = std::move(writer); + } else { + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto sink, fs_.OpenOutputStream(file_path_)); + ASSIGN_OR_RETURN_ARROW_NOT_OK(auto writer, + parquet::arrow::FileWriter::Open(*schema_, arrow::default_memory_pool(), sink)); + writer_ = std::move(writer); + } kv_metadata_ = std::make_shared(); return Status::OK(); } diff --git a/cpp/src/packed/chunk_manager.cpp b/cpp/src/packed/chunk_manager.cpp index d3ddc2a..bca5136 100644 --- a/cpp/src/packed/chunk_manager.cpp +++ b/cpp/src/packed/chunk_manager.cpp @@ -17,7 +17,7 @@ #include #include #include "common/log.h" -#include "packed/utils/config.h" +#include "common/config.h" #include "packed/chunk_manager.h" #include #include diff --git a/cpp/src/packed/column_group_writer.cpp b/cpp/src/packed/column_group_writer.cpp index 609842a..fec7b47 100644 --- a/cpp/src/packed/column_group_writer.cpp +++ b/cpp/src/packed/column_group_writer.cpp @@ -29,9 +29,10 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + const StorageConfig& storage_config, const std::vector& origin_column_indices) : group_id_(group_id), - writer_(std::move(schema), fs, file_path), + writer_(std::move(schema), fs, file_path, storage_config), column_group_(group_id, origin_column_indices), finished_(false) {} @@ -39,10 +40,11 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + const StorageConfig& storage_config, const parquet::WriterProperties& props, const std::vector& origin_column_indices) : group_id_(group_id), - writer_(std::move(schema), fs, file_path, props), + writer_(std::move(schema), fs, file_path, storage_config, props), column_group_(group_id, origin_column_indices), flushed_batches_(0), flushed_rows_(0), diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index 6619a5c..3e9b94f 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -22,7 +22,7 @@ #include "filesystem/fs.h" #include "common/log.h" #include "packed/chunk_manager.h" -#include "packed/utils/config.h" +#include "common/config.h" #include "packed/utils/serde.h" namespace milvus_storage { diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index a93de71..1d5e449 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -22,7 +22,7 @@ #include "packed/column_group_writer.h" #include "packed/splitter/indices_based_splitter.h" #include "packed/splitter/size_based_splitter.h" -#include "packed/utils/config.h" +#include "common/config.h" #include "filesystem/fs.h" #include "common/arrow_util.h" @@ -32,11 +32,13 @@ PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit, std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, + StorageConfig& storage_config, parquet::WriterProperties& props) : memory_limit_(memory_limit), schema_(std::move(schema)), fs_(fs), file_path_(file_path), + storage_config_(storage_config), props_(props), splitter_({}), current_memory_usage_(0), @@ -67,8 +69,8 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { for (GroupId i = 0; i < groups.size(); ++i) { auto& group = groups[i]; std::string group_path = file_path_ + "/" + std::to_string(i); - auto writer = - std::make_unique(i, group.Schema(), fs_, group_path, props_, group.GetOriginColumnIndices()); + auto writer = std::make_unique(i, group.Schema(), fs_, group_path, storage_config_, props_, + group.GetOriginColumnIndices()); RETURN_NOT_OK(writer->Init()); for (auto& batch : group.GetRecordBatches()) { RETURN_NOT_OK(writer->Write(group.GetRecordBatch(0))); @@ -122,8 +124,8 @@ Status PackedRecordBatchWriter::Close() { std::string group_path = file_path_ + "/" + std::to_string(0); std::vector indices(buffered_batches_[0]->num_columns()); std::iota(std::begin(indices), std::end(indices), 0); - auto writer = - std::make_unique(0, buffered_batches_[0]->schema(), fs_, group_path, props_, indices); + auto writer = std::make_unique(0, buffered_batches_[0]->schema(), fs_, group_path, + storage_config_, props_, indices); RETURN_NOT_OK(writer->Init()); for (auto& batch : buffered_batches_) { RETURN_NOT_OK(writer->Write(batch)); diff --git a/cpp/src/storage/space.cpp b/cpp/src/storage/space.cpp index d438b7f..44aa3f6 100644 --- a/cpp/src/storage/space.cpp +++ b/cpp/src/storage/space.cpp @@ -28,6 +28,7 @@ #include "filesystem/fs.h" #include "common/log.h" #include "common/macro.h" +#include "common/config.h" #include "file/delete_fragment.h" #include "format/parquet/file_writer.h" #include "storage/space.h" @@ -95,14 +96,14 @@ Status Space::Write(arrow::RecordBatchReader& reader, const WriteOption& option) if (scalar_writer == nullptr) { auto scalar_file_path = GetNewParquetFilePath(GetScalarDataDir(path_)); - scalar_writer.reset(new ParquetFileWriter(scalar_schema, *fs_, scalar_file_path)); + scalar_writer.reset(new ParquetFileWriter(scalar_schema, *fs_, scalar_file_path, StorageConfig())); RETURN_NOT_OK(scalar_writer->Init()); scalar_fragment.add_file(scalar_file_path); } if (vector_writer == nullptr) { auto vector_file_path = GetNewParquetFilePath(GetVectorDataDir(path_)); - vector_writer.reset(new ParquetFileWriter(vector_schema, *fs_, vector_file_path)); + vector_writer.reset(new ParquetFileWriter(vector_schema, *fs_, vector_file_path, StorageConfig())); RETURN_NOT_OK(vector_writer->Init()); vector_fragment.add_file(vector_file_path); } @@ -152,7 +153,7 @@ Status Space::Delete(arrow::RecordBatchReader& reader) { if (!writer) { delete_file = GetNewParquetFilePath(GetDeleteDataDir(path_)); - writer = new ParquetFileWriter(manifest_->schema()->delete_schema(), *fs_, delete_file); + writer = new ParquetFileWriter(manifest_->schema()->delete_schema(), *fs_, delete_file, StorageConfig()); RETURN_NOT_OK(writer->Init()); } @@ -239,7 +240,9 @@ Result> Space::Open(const std::string& uri, const Options std::atomic_int64_t next_manifest_version = 1; auto factory = std::make_shared(); - ASSIGN_OR_RETURN_NOT_OK(fs, factory->BuildFileSystem(uri, &path)); + auto conf = StorageConfig(); + conf.uri = uri; + ASSIGN_OR_RETURN_NOT_OK(fs, factory->BuildFileSystem(conf, &path)); LOG_STORAGE_INFO_ << "Open space: " << path; RETURN_ARROW_NOT_OK(fs->CreateDir(GetManifestDir(path))); diff --git a/cpp/test/common/arrow_utils_test.cpp b/cpp/test/common/arrow_utils_test.cpp index aa8b467..ae13de2 100644 --- a/cpp/test/common/arrow_utils_test.cpp +++ b/cpp/test/common/arrow_utils_test.cpp @@ -35,7 +35,9 @@ class ArrowUtilsTest : public testing::Test { TEST_F(ArrowUtilsTest, TestMakeArrowRecordBatchReader) { std::string out; auto factory = std::make_shared(); - ASSERT_AND_ASSIGN(auto fs, factory->BuildFileSystem("file://" + path_.string(), &out)); + auto conf = StorageConfig(); + conf.uri = "file://" + path_.string(); + ASSERT_AND_ASSIGN(auto fs, factory->BuildFileSystem(conf, &out)); auto file_path = path_.string() + "/test.parquet"; auto schema = CreateArrowSchema({"f_int64"}, {arrow::int64()}); ASSERT_STATUS_OK(PrepareSimpleParquetFile(schema, *fs, file_path, 1)); diff --git a/cpp/test/multi_files_sequential_reader_test.cpp b/cpp/test/multi_files_sequential_reader_test.cpp index e27f5ea..328ffa7 100644 --- a/cpp/test/multi_files_sequential_reader_test.cpp +++ b/cpp/test/multi_files_sequential_reader_test.cpp @@ -28,6 +28,7 @@ #include "test_util.h" #include "arrow/table.h" #include "filesystem/fs.h" +#include "common/config.h" namespace milvus_storage { TEST(MultiFilesSeqReaderTest, ReadTest) { auto arrow_schema = CreateArrowSchema({"pk_field"}, {arrow::int64()}); @@ -41,7 +42,9 @@ TEST(MultiFilesSeqReaderTest, ReadTest) { std::string path; auto factory = std::make_shared(); - ASSERT_AND_ASSIGN(auto fs, factory->BuildFileSystem("file:///tmp/", &path)); + auto conf = StorageConfig(); + conf.uri = "file:///tmp/"; + ASSERT_AND_ASSIGN(auto fs, factory->BuildFileSystem(conf, &path)); ASSERT_AND_ARROW_ASSIGN(auto f1, fs->OpenOutputStream("/tmp/file1")); ASSERT_AND_ARROW_ASSIGN(auto w1, parquet::arrow::FileWriter::Open(*arrow_schema, arrow::default_memory_pool(), f1)); ASSERT_STATUS_OK(w1->WriteRecordBatch(*rec_batch)); diff --git a/cpp/test/packed/packed_test_base.h b/cpp/test/packed/packed_test_base.h index ef5b9de..cc92371 100644 --- a/cpp/test/packed/packed_test_base.h +++ b/cpp/test/packed/packed_test_base.h @@ -37,6 +37,7 @@ #include #include #include +#include using namespace std; @@ -52,15 +53,20 @@ class PackedTestBase : public ::testing::Test { const char* endpoint_url = std::getenv("S3_ENDPOINT_URL"); const char* env_file_path = std::getenv("FILE_PATH"); - std::string uri = "file:///tmp/"; + auto conf = StorageConfig(); + conf.uri = "file:///tmp/"; if (access_key != nullptr && secret_key != nullptr && endpoint_url != nullptr && env_file_path != nullptr) { - uri = endpoint_url; + conf.uri = std::string(endpoint_url); + conf.access_key_id = std::string(access_key); + conf.access_key_value = std::string(secret_key); + conf.use_custom_part_upload_size = true; + conf.part_size = 1 * 1024 * 1024; // 30 MB for S3FS part upload } - file_path_ = GenerateUniqueFilePath(env_file_path); + storage_config_ = std::move(conf); auto factory = std::make_shared(); - ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(uri, &file_path_)); + ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(storage_config_, &file_path_)); SetUpCommonData(); props_ = *parquet::default_writer_properties(); @@ -84,7 +90,7 @@ class PackedTestBase : public ::testing::Test { const std::vector& paths, const std::vector>& fields, const std::vector& column_offsets) { - PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, props_); + PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_); for (int i = 0; i < batch_size; ++i) { EXPECT_TRUE(writer.Write(record_batch_).ok()); } @@ -187,6 +193,7 @@ class PackedTestBase : public ::testing::Test { std::shared_ptr fs_; std::string file_path_; parquet::WriterProperties props_; + StorageConfig storage_config_; std::shared_ptr schema_; std::shared_ptr record_batch_; diff --git a/cpp/test/packed/reader_test.cpp b/cpp/test/packed/reader_test.cpp index 4840bb5..18b17b5 100644 --- a/cpp/test/packed/reader_test.cpp +++ b/cpp/test/packed/reader_test.cpp @@ -24,6 +24,7 @@ #include "test_util.h" #include "arrow/table.h" #include "filesystem/fs.h" +#include "common/config.h" namespace milvus_storage { @@ -38,7 +39,9 @@ class PackedRecordBatchReaderTest : public ::testing::Test { auto rec_batch = arrow::RecordBatch::Make(arrow_schema, 3, {pk_array}); std::string path; auto factory = std::make_shared(); - ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem("file:///tmp/", &path)); + auto conf = StorageConfig(); + conf.uri = "file:///tmp/"; + ASSERT_AND_ASSIGN(fs_, factory->BuildFileSystem(conf, &path)); ASSERT_AND_ARROW_ASSIGN(auto f1, fs_->OpenOutputStream("/tmp/f1")); ASSERT_AND_ARROW_ASSIGN(auto w1, parquet::arrow::FileWriter::Open(*arrow_schema, arrow::default_memory_pool(), f1)); ASSERT_STATUS_OK(w1->WriteRecordBatch(*rec_batch)); diff --git a/cpp/test/test_util.cpp b/cpp/test/test_util.cpp index e04b781..0b5b737 100644 --- a/cpp/test/test_util.cpp +++ b/cpp/test/test_util.cpp @@ -16,6 +16,7 @@ #include #include "format/parquet/file_writer.h" #include "arrow/array/builder_primitive.h" +#include "common/config.h" namespace milvus_storage { std::shared_ptr CreateArrowSchema(std::vector field_names, std::vector> field_types) { @@ -31,7 +32,7 @@ Status PrepareSimpleParquetFile(std::shared_ptr schema, const std::string& file_path, int num_rows) { // TODO: parse schema and generate data - ParquetFileWriter w(schema, fs, file_path); + ParquetFileWriter w(schema, fs, file_path, StorageConfig()); w.Init(); arrow::Int64Builder builder; for (int i = 0; i < num_rows; i++) {