diff --git a/CMakeLists.txt b/CMakeLists.txt index b34966b7a22a..af503653dd61 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -206,7 +206,7 @@ if(VELOX_ENABLE_ABFS) if(AZURESDK_ROOT_DIR) list(APPEND CMAKE_PREFIX_PATH ${AZURESDK_ROOT_DIR}) endif() - find_package(azure-storage-blobs-cpp CONFIG REQUIRED) + find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED) add_definitions(-DVELOX_ENABLE_ABFS) endif() diff --git a/scripts/setup-adapters.sh b/scripts/setup-adapters.sh index 5bc9c81e4605..e8deecd3ce7a 100755 --- a/scripts/setup-adapters.sh +++ b/scripts/setup-adapters.sh @@ -62,7 +62,7 @@ function install_gcs-sdk-cpp { } function install_azure-storage-sdk-cpp { - github_checkout azure/azure-sdk-for-cpp azure-storage-blobs_12.8.0 + github_checkout azure/azure-sdk-for-cpp azure-storage-files-datalake_12.8.0 cd sdk/core/azure-core if ! grep -q "baseline" vcpkg.json; then @@ -88,6 +88,14 @@ function install_azure-storage-sdk-cpp { sed -i 's/"version-semver"/"builtin-baseline": "dafef74af53669ef1cc9015f55e0ce809ead62aa","version-semver"/' vcpkg.json fi cmake_install -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF + + cd - + # install azure-storage-files-datalake + cd sdk/storage/azure-storage-files-datalake + if ! grep -q "baseline" vcpkg.json; then + sed -i 's/"version-semver"/"builtin-baseline": "dafef74af53669ef1cc9015f55e0ce809ead62aa","version-semver"/' vcpkg.json + fi + cmake_install -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF } function install_libhdfs3 { diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index de86aa7f4386..41d3863dc8ca 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -18,15 +18,18 @@ #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" +#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" #include "velox/core/Config.h" #include +#include #include #include #include namespace facebook::velox::filesystems::abfs { using namespace Azure::Storage::Blobs; + class AbfsConfig { public: AbfsConfig(const Config* config) : config_(config) {} @@ -213,6 +216,155 @@ uint64_t AbfsReadFile::getNaturalReadSize() const { return impl_->getNaturalReadSize(); } +class BlobStorageFileClient final : public IBlobStorageFileClient { + public: + BlobStorageFileClient(const DataLakeFileClient client) + : client_(std::make_unique(client)) {} + + void Create() override { + client_->Create(); + } + + PathProperties GetProperties() override { + return client_->GetProperties().Value; + } + + void Append(const uint8_t* buffer, size_t size, uint64_t offset) override { + auto bodyStream = Azure::Core::IO::MemoryBodyStream(buffer, size); + client_->Append(bodyStream, offset); + } + + void Flush(uint64_t position) override { + client_->Flush(position); + } + + void Close() override { + // do nothing. + } + + private: + std::unique_ptr client_; +}; + +class AbfsWriteFile::Impl { + public: + explicit Impl(const std::string& path, const std::string& connectStr) + : path_(path), connectStr_(connectStr) { + // Make it a no-op if invoked twice. + if (position_ != -1) { + return; + } + position_ = 0; + } + + void initialize() { + if (!blobStorageFileClient_) { + auto abfsAccount = AbfsAccount(path_); + auto fileClient = DataLakeFileClient::CreateFromConnectionString( + connectStr_, abfsAccount.fileSystem(), abfsAccount.filePath()); + blobStorageFileClient_ = std::make_unique( + BlobStorageFileClient(fileClient)); + } + + VELOX_CHECK(!exist(), "File already exists"); + blobStorageFileClient_->Create(); + } + + /// mainly for test purpose. + void setFileClient( + std::shared_ptr blobStorageManager) { + blobStorageFileClient_ = std::move(blobStorageManager); + } + + bool exist() { + try { + blobStorageFileClient_->GetProperties(); + return true; + } catch (Azure::Storage::StorageException& e) { + if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + return false; + } else { + throwStorageExceptionWithOperationDetails("GetProperties", path_, e); + } + } + } + + void close() { + if (!closed_) { + flush(); + blobStorageFileClient_->Close(); + closed_ = true; + } + } + + void flush() { + if (!closed_) { + blobStorageFileClient_->Flush(position_); + } + } + + void append(std::string_view data) { + VELOX_CHECK(!closed_, "File is not open"); + if (data.size() == 0) { + return; + } + append(data.data(), data.size()); + } + + uint64_t size() const { + auto properties = blobStorageFileClient_->GetProperties(); + return properties.FileSize; + } + + void append(const char* buffer, size_t size) { + auto offset = position_; + position_ += size; + blobStorageFileClient_->Append( + reinterpret_cast(buffer), size, offset); + } + + private: + const std::string path_; + const std::string connectStr_; + std::string fileSystem_; + std::string fileName_; + std::shared_ptr blobStorageFileClient_; + + uint64_t position_ = -1; + std::atomic closed_{false}; +}; + +AbfsWriteFile::AbfsWriteFile( + const std::string& path, + const std::string& connectStr) { + impl_ = std::make_shared(path, connectStr); +} + +void AbfsWriteFile::initialize() { + impl_->initialize(); +} + +void AbfsWriteFile::close() { + impl_->close(); +} + +void AbfsWriteFile::flush() { + impl_->flush(); +} + +void AbfsWriteFile::append(std::string_view data) { + impl_->append(data); +} + +uint64_t AbfsWriteFile::size() const { + return impl_->size(); +} + +void AbfsWriteFile::setFileClient( + std::shared_ptr fileClient) { + impl_->setFileClient(std::move(fileClient)); +} + class AbfsFileSystem::Impl { public: explicit Impl(const Config* config) : abfsConfig_(config) { @@ -250,4 +402,13 @@ std::unique_ptr AbfsFileSystem::openFileForRead( abfsfile->initialize(); return abfsfile; } + +std::unique_ptr AbfsFileSystem::openFileForWrite( + std::string_view path, + const FileOptions& /*unused*/) { + auto abfsfile = std::make_unique( + std::string(path), impl_->connectionString(std::string(path))); + abfsfile->initialize(); + return abfsfile; +} } // namespace facebook::velox::filesystems::abfs diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h index f67789243545..4b8ec74d5954 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h @@ -44,9 +44,7 @@ class AbfsFileSystem : public FileSystem { std::unique_ptr openFileForWrite( std::string_view path, - const FileOptions& options = {}) override { - VELOX_UNSUPPORTED("write for abfs not implemented"); - } + const FileOptions& options = {}) override; void rename( std::string_view path, diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h new file mode 100644 index 000000000000..1e88114f594a --- /dev/null +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h @@ -0,0 +1,90 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include "AbfsUtil.h" +#include "folly/io/Cursor.h" +#include "velox/common/file/File.h" + +namespace Azure::Storage::Files::DataLake::Models { +class PathProperties; +} + +namespace facebook::velox::filesystems::abfs { +using namespace Azure::Storage::Files::DataLake; +using namespace Azure::Storage::Files::DataLake::Models; + +/* + * We are using the DFS (Data Lake Storage) endpoint for Azure Blob File write + * operations because the DFS endpoint is designed to be compatible with file + * operation semantics, such as `Append` to a file and file `Flush` operations. + * The legacy Blob endpoint can only be used for blob level append and flush + * operations. When using the Blob endpoint, we would need to manually manage + * the creation, appending, and committing of file-related blocks. + * + * However, the Azurite Simulator does not yet support the DFS endpoint. + * (For more information, see https://github.com/Azure/Azurite/issues/553 and + * https://github.com/Azure/Azurite/issues/409). + * You can find a comparison between DFS and Blob endpoints here: + * https://github.com/Azure/Azurite/wiki/ADLS-Gen2-Implementation-Guidance + * + * To facilitate unit testing of file write scenarios, we define the + * IBlobStorageFileClient here, which can be mocked during testing. + */ +class IBlobStorageFileClient { + public: + virtual void Create() = 0; + virtual PathProperties GetProperties() = 0; + virtual void Append(const uint8_t* buffer, size_t size, uint64_t offset) = 0; + virtual void Flush(uint64_t position) = 0; + virtual void Close() = 0; +}; + +/// Implementation of abfs write file. Nothing written to the file should be +/// read back until it is closed. +class AbfsWriteFile : public WriteFile { + public: + constexpr static uint64_t kNaturalWriteSize = 8 << 20; // 8M + /// The constructor. + /// @param path The file path to write. + /// @param connectStr the connection string used to auth the storage account. + AbfsWriteFile(const std::string& path, const std::string& connectStr); + + /// check any issue reading file. + void initialize(); + + /// Get the file size. + uint64_t size() const override; + + /// Flush the data. + void flush() override; + + /// Write the data by append mode. + void append(std::string_view data) override; + + /// Close the file. + void close() override; + + /// mainly for test purpose. + void setFileClient(std::shared_ptr fileClient); + + protected: + class Impl; + std::shared_ptr impl_; +}; +} // namespace facebook::velox::filesystems::abfs diff --git a/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt index 9b0a7e1c7c79..db4dfb9e270d 100644 --- a/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt @@ -25,6 +25,7 @@ if(VELOX_ENABLE_ABFS) velox_hive_connector velox_dwio_common_exception Azure::azure-storage-blobs + Azure::azure-storage-files-datalake Folly::folly glog::glog fmt::fmt) diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp index 98354f9dd52c..88e0bed725ef 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp @@ -15,6 +15,8 @@ */ #include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" +#include +#include #include "gtest/gtest.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/File.h" @@ -22,15 +24,18 @@ #include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h" +#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" #include "velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h" #include "velox/exec/tests/utils/PortUtil.h" #include "velox/exec/tests/utils/TempFilePath.h" #include +#include #include using namespace facebook::velox; - +using namespace facebook::velox::filesystems::abfs; +using namespace Azure::Storage::Files::DataLake; using ::facebook::velox::common::Region; constexpr int kOneMB = 1 << 20; @@ -38,6 +43,59 @@ static const std::string filePath = "test_file.txt"; static const std::string fullFilePath = facebook::velox::filesystems::test::AzuriteABFSEndpoint + filePath; +// A mocked blob storage file client backend with local file store. +class MockBlobStorageFileClient : public IBlobStorageFileClient { + public: + MockBlobStorageFileClient() { + char tempFileName[] = "/tmp/velox_abfs_test_XXXXXX"; + int fd = mkstemp(tempFileName); + if (fd == -1) { + throw std::logic_error( + "[MockBlobStorageFileClient] Failed to create a temporary file"); + } + filePath_ = tempFileName; + std::fclose(fdopen(fd, "w")); + std::remove(tempFileName); + } + + void Create() override { + fileStream_ = std::ofstream( + filePath_, + std::ios_base::out | std::ios_base::binary | std::ios_base::app); + } + + PathProperties GetProperties() override { + if (!std::filesystem::exists(filePath_)) { + Azure::Storage::StorageException exp(filePath_ + "doesn't exists"); + exp.StatusCode = Azure::Core::Http::HttpStatusCode::NotFound; + throw exp; + } + std::ifstream file(filePath_, std::ios::binary | std::ios::ate); + uint64_t size = static_cast(file.tellg()); + PathProperties ret; + ret.FileSize = size; + return ret; + } + + void Append(const uint8_t* buffer, size_t size, uint64_t offset) override { + fileStream_.seekp(offset); + fileStream_.write(reinterpret_cast(buffer), size); + } + + void Flush(uint64_t position) override { + fileStream_.flush(); + } + + void Close() override { + fileStream_.flush(); + fileStream_.close(); + } + + private: + std::string filePath_; + std::ofstream fileStream_; +}; + class AbfsFileSystemTest : public testing::Test { public: static std::shared_ptr hiveConfig( @@ -72,13 +130,51 @@ class AbfsFileSystemTest : public testing::Test { azuriteServer->stop(); } + std::unique_ptr openFileForWrite( + std::string_view path, + std::shared_ptr client) { + auto abfsfile = + std::make_unique( + std::string(path), azuriteServer->connectionStr()); + abfsfile->setFileClient(client); + abfsfile->initialize(); + return abfsfile; + } + + static std::string generateRandomData(int size) { + static const char charset[] = + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::string data(size, ' '); + + for (int i = 0; i < size; ++i) { + int index = rand() % (sizeof(charset) - 1); + data[i] = charset[index]; + } + + return data; + } + private: - static std::shared_ptr<::exec::test::TempFilePath> createFile() { + static std::shared_ptr<::exec::test::TempFilePath> createFile( + uint64_t size = -1) { auto tempFile = ::exec::test::TempFilePath::create(); - tempFile->append("aaaaa"); - tempFile->append("bbbbb"); - tempFile->append(std::string(kOneMB, 'c')); - tempFile->append("ddddd"); + if (size == -1) { + tempFile->append("aaaaa"); + tempFile->append("bbbbb"); + tempFile->append(std::string(kOneMB, 'c')); + tempFile->append("ddddd"); + } else { + const uint64_t totalSize = size * 1024 * 1024; + const uint64_t chunkSize = 5 * 1024 * 1024; + uint64_t remainingSize = totalSize; + while (remainingSize > 0) { + uint64_t dataSize = std::min(remainingSize, chunkSize); + std::string randomData = generateRandomData(dataSize); + tempFile->append(randomData); + remainingSize -= dataSize; + } + } return tempFile; } }; @@ -185,13 +281,41 @@ TEST_F(AbfsFileSystemTest, missingFile) { } } -TEST_F(AbfsFileSystemTest, openFileForWriteNotImplemented) { - auto hiveConfig = AbfsFileSystemTest::hiveConfig( - {{"fs.azure.account.key.test.dfs.core.windows.net", - azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); +TEST_F(AbfsFileSystemTest, OpenFileForWriteTest) { + const std::string abfsFile = + facebook::velox::filesystems::test::AzuriteABFSEndpoint + "writetest.txt"; + auto mockClient = + std::make_shared(MockBlobStorageFileClient()); + auto abfsWriteFile = openFileForWrite(abfsFile, mockClient); + EXPECT_EQ(abfsWriteFile->size(), 0); + uint64_t totalSize = 0; + std::string randomData = + AbfsFileSystemTest::generateRandomData(1 * 1024 * 1024); + abfsWriteFile->append(randomData); + abfsWriteFile->append(randomData); + abfsWriteFile->append(randomData); + abfsWriteFile->append(randomData); + abfsWriteFile->append(randomData); + abfsWriteFile->append(randomData); + abfsWriteFile->append(randomData); + abfsWriteFile->append(randomData); + totalSize = randomData.size() * 8; + abfsWriteFile->flush(); + EXPECT_EQ(abfsWriteFile->size(), totalSize); + + randomData = AbfsFileSystemTest::generateRandomData(9 * 1024 * 1024); + abfsWriteFile->append(randomData); + totalSize += randomData.size(); + randomData = AbfsFileSystemTest::generateRandomData(2 * 1024 * 1024); + totalSize += randomData.size(); + abfsWriteFile->append(randomData); + abfsWriteFile->flush(); + EXPECT_EQ(abfsWriteFile->size(), totalSize); + abfsWriteFile->flush(); + abfsWriteFile->close(); + VELOX_ASSERT_THROW(abfsWriteFile->append("abc"), "File is not open"); VELOX_ASSERT_THROW( - abfs->openFileForWrite(fullFilePath), "write for abfs not implemented"); + openFileForWrite(abfsFile, mockClient), "File already exists"); } TEST_F(AbfsFileSystemTest, renameNotImplemented) { diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h index 165cb2767c11..4836183f3819 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h @@ -36,8 +36,8 @@ static const std::string AzuriteAccountKey{ "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="}; static const std::string AzuriteABFSEndpoint = fmt::format( "abfs://{}@{}.dfs.core.windows.net/", - AzuriteAccountName, - AzuriteContainerName); + AzuriteContainerName, + AzuriteAccountName); class AzuriteServer { public: diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt index 297a7db4e1bc..cdf5d8088e6b 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt @@ -26,4 +26,5 @@ target_link_libraries( velox_exec gtest gtest_main - Azure::azure-storage-blobs) + Azure::azure-storage-blobs + Azure::azure-storage-files-datalake)