Skip to content

Commit

Permalink
Add support to read from Azure Storage (#6675)
Browse files Browse the repository at this point in the history
Summary:
This change adds support to read from Abfs (Azure Gen2 Storage).

The dependencies of this connector are already installed via setup-adapters.sh.

The connector is able to open abfs uris and read from related remote azure storage,
However, it cannot rename, create directories nor remove directories.

The support to write to abfs is going to be provided later.

It's part of work related to issue #6415

Pull Request resolved: #6675

Reviewed By: amitkdutta

Differential Revision: D50559376

Pulled By: pedroerp

fbshipit-source-id: 0651687d753f681f5e4d2d82293e08f26f0a515d
  • Loading branch information
gaoyangxiaozhu authored and facebook-github-bot committed Oct 24, 2023
1 parent 004de09 commit a840fba
Show file tree
Hide file tree
Showing 20 changed files with 1,259 additions and 4 deletions.
7 changes: 7 additions & 0 deletions .circleci/dist_compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ jobs:
"-DVELOX_ENABLE_HDFS=ON"
"-DVELOX_ENABLE_S3=ON"
"-DVELOX_ENABLE_GCS=ON"
"-DVELOX_ENABLE_ABFS=ON"
"-DVELOX_ENABLE_SUBSTRAIT=ON"
"-DVELOX_ENABLE_REMOTE_FUNCTIONS=ON"
)
Expand All @@ -417,6 +418,12 @@ jobs:
export LIBHDFS3_CONF=$(pwd)/.circleci/hdfs-client.xml
export HADOOP_HOME='/usr/local/hadoop'
export PATH=~/adapter-deps/install/bin:/usr/local/hadoop/bin:${PATH}
# The following is used to install Azurite in the CI for running Abfs Hive Connector unit tests.
# Azurite is an emulator for local Azure Storage development, and it is a required component for running Abfs Hive Connector unit tests.
# It can be installed using npm. The following is used to install Node.js and npm for Azurite installation.
curl -sL https://rpm.nodesource.com/setup_10.x | bash -
yum install -y nodejs
npm install -g azurite
cd _build/release && ctest -j 16 -VV --output-on-failure
no_output_timeout: 1h
- post-steps
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/storage_adapters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
add_subdirectory(s3fs)
add_subdirectory(hdfs)
add_subdirectory(gcs)
add_subdirectory(abfs)
253 changes: 253 additions & 0 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"
#include "velox/core/Config.h"

#include <azure/storage/blobs/blob_client.hpp>
#include <fmt/format.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <glog/logging.h>

namespace facebook::velox::filesystems::abfs {
using namespace Azure::Storage::Blobs;
class AbfsConfig {
public:
AbfsConfig(const Config* config) : config_(config) {}

std::string connectionString(const std::string& path) const {
auto abfsAccount = AbfsAccount(path);
auto key = abfsAccount.credKey();
VELOX_USER_CHECK(
config_->isValueExists(key), "Failed to find storage credentials");

return abfsAccount.connectionString(config_->get(key).value());
}

private:
const Config* FOLLY_NONNULL config_;
};

class AbfsReadFile::Impl {
constexpr static uint64_t kNaturalReadSize = 4 << 20; // 4M
constexpr static uint64_t kReadConcurrency = 8;

public:
explicit Impl(const std::string& path, const std::string& connectStr)
: path_(path), connectStr_(connectStr) {
auto abfsAccount = AbfsAccount(path_);
fileSystem_ = abfsAccount.fileSystem();
fileName_ = abfsAccount.filePath();
fileClient_ =
std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
connectStr_, fileSystem_, fileName_));
}

void initialize() {
if (length_ != -1) {
return;
}
try {
auto properties = fileClient_->GetProperties();
length_ = properties.Value.BlobSize;
} catch (Azure::Storage::StorageException& e) {
throwStorageExceptionWithOperationDetails("GetProperties", fileName_, e);
}

VELOX_CHECK_GE(length_, 0);
}

std::string_view pread(uint64_t offset, uint64_t length, void* buffer) const {
preadInternal(offset, length, static_cast<char*>(buffer));
return {static_cast<char*>(buffer), length};
}

std::string pread(uint64_t offset, uint64_t length) const {
std::string result(length, 0);
preadInternal(offset, length, result.data());
return result;
}

uint64_t preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
size_t length = 0;
auto size = buffers.size();
for (auto& range : buffers) {
length += range.size();
}
std::string result(length, 0);
preadInternal(offset, length, static_cast<char*>(result.data()));
size_t resultOffset = 0;
for (auto range : buffers) {
if (range.data()) {
memcpy(range.data(), &(result.data()[resultOffset]), range.size());
}
resultOffset += range.size();
}

return length;
}

void preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const {
VELOX_CHECK_EQ(regions.size(), iobufs.size());
for (size_t i = 0; i < regions.size(); ++i) {
const auto& region = regions[i];
auto& output = iobufs[i];
output = folly::IOBuf(folly::IOBuf::CREATE, region.length);
pread(region.offset, region.length, output.writableData());
output.append(region.length);
}
}

uint64_t size() const {
return length_;
}

uint64_t memoryUsage() const {
return 3 * sizeof(std::string) + sizeof(int64_t);
}

bool shouldCoalesce() const {
return false;
}

std::string getName() const {
return fileName_;
}

uint64_t getNaturalReadSize() const {
return kNaturalReadSize;
}

private:
void preadInternal(uint64_t offset, uint64_t length, char* position) const {
// Read the desired range of bytes.
Azure::Core::Http::HttpRange range;
range.Offset = offset;
range.Length = length;

Azure::Storage::Blobs::DownloadBlobOptions blob;
blob.Range = range;

auto response = fileClient_->Download(blob);
response.Value.BodyStream->ReadToCount(
reinterpret_cast<uint8_t*>(position), length);
}

const std::string path_;
const std::string connectStr_;
std::string fileSystem_;
std::string fileName_;
std::unique_ptr<BlobClient> fileClient_;

int64_t length_ = -1;
};

AbfsReadFile::AbfsReadFile(
const std::string& path,
const std::string& connectStr) {
impl_ = std::make_shared<Impl>(path, connectStr);
}

void AbfsReadFile::initialize() {
return impl_->initialize();
}

std::string_view
AbfsReadFile::pread(uint64_t offset, uint64_t length, void* buffer) const {
return impl_->pread(offset, length, buffer);
}

std::string AbfsReadFile::pread(uint64_t offset, uint64_t length) const {
return impl_->pread(offset, length);
}

uint64_t AbfsReadFile::preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
return impl_->preadv(offset, buffers);
}

void AbfsReadFile::preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const {
return impl_->preadv(regions, iobufs);
}

uint64_t AbfsReadFile::size() const {
return impl_->size();
}

uint64_t AbfsReadFile::memoryUsage() const {
return impl_->memoryUsage();
}

bool AbfsReadFile::shouldCoalesce() const {
return false;
}

std::string AbfsReadFile::getName() const {
return impl_->getName();
}

uint64_t AbfsReadFile::getNaturalReadSize() const {
return impl_->getNaturalReadSize();
}

class AbfsFileSystem::Impl {
public:
explicit Impl(const Config* config) : abfsConfig_(config) {
LOG(INFO) << "Init Azure Blob file system";
}

~Impl() {
LOG(INFO) << "Dispose Azure Blob file system";
}

const std::string connectionString(const std::string& path) const {
// Extract account name
return abfsConfig_.connectionString(path);
}

private:
const AbfsConfig abfsConfig_;
std::shared_ptr<folly::Executor> ioExecutor_;
};

AbfsFileSystem::AbfsFileSystem(const std::shared_ptr<const Config>& config)
: FileSystem(config) {
impl_ = std::make_shared<Impl>(config.get());
}

std::string AbfsFileSystem::name() const {
return "ABFS";
}

std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
auto abfsfile = std::make_unique<AbfsReadFile>(
std::string(path), impl_->connectionString(std::string(path)));
abfsfile->initialize();
return abfsfile;
}
} // namespace facebook::velox::filesystems::abfs
84 changes: 84 additions & 0 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/common/file/FileSystems.h"

namespace facebook::velox::filesystems::abfs {

/// Implementation of the ABS (Azure Blob Storage) filesystem and file
/// interface. We provide a registration method for reading and writing files so
/// that the appropriate type of file can be constructed based on a filename.
/// The supported schema is `abfs(s)://` to align with the valid scheme
/// identifiers used in the Hadoop Filesystem ABFS driver when integrating with
/// Azure Blob Storage. One key difference here is that the ABFS Hadoop client
/// driver always uses Transport Layer Security (TLS) regardless of the
/// authentication method chosen when using the `abfss` schema, but not mandated
/// when using the `abfs` schema. In our implementation, we always use the HTTPS
/// protocol, regardless of whether the schema is `abfs://` or `abfss://`. The
/// legacy wabs(s):// schema is not supported as it has been deprecated already
/// by Azure Storage team. Reference document -
/// https://learn.microsoft.com/en-us/azure/databricks/storage/azure-storage.
class AbfsFileSystem : public FileSystem {
public:
explicit AbfsFileSystem(const std::shared_ptr<const Config>& config);

std::string name() const override;

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
const FileOptions& options = {}) override {
VELOX_UNSUPPORTED("write for abfs not implemented");
}

void rename(
std::string_view path,
std::string_view newPath,
bool overWrite = false) override {
VELOX_UNSUPPORTED("rename for abfs not implemented");
}

void remove(std::string_view path) override {
VELOX_UNSUPPORTED("remove for abfs not implemented");
}

bool exists(std::string_view path) override {
VELOX_UNSUPPORTED("exists for abfs not implemented");
}

std::vector<std::string> list(std::string_view path) override {
VELOX_UNSUPPORTED("list for abfs not implemented");
}

void mkdir(std::string_view path) override {
VELOX_UNSUPPORTED("mkdir for abfs not implemented");
}

void rmdir(std::string_view path) override {
VELOX_UNSUPPORTED("rmdir for abfs not implemented");
}

protected:
class Impl;
std::shared_ptr<Impl> impl_;
};

void registerAbfsFileSystem();
} // namespace facebook::velox::filesystems::abfs
Loading

0 comments on commit a840fba

Please sign in to comment.