Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor committed Apr 10, 2024
1 parent 607df96 commit 875086d
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 54 deletions.
2 changes: 1 addition & 1 deletion velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class LocalFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options) override {
const FileOptions& /*unused*/) override {
return std::make_unique<LocalReadFile>(extractPath(path));
}

Expand Down
7 changes: 2 additions & 5 deletions velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ struct FileOptions {

std::unordered_map<std::string, std::string> values;
memory::MemoryPool* pool{nullptr};
std::optional<int64_t> fileSize{std::nullopt};

std::optional<int64_t> getFileSize() const {
return fileSize;
}
std::optional<int64_t> fileSize{
std::nullopt}; // If specified then can be trusted to be the file size.
};

/// An abstract FileSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ class AbfsReadFile::Impl {
}

void initialize(const FileOptions& options) {
auto fileSize = options.getFileSize();

if (fileSize.has_value()) {
VELOX_CHECK_GE(fileSize.value(), 0, "Length must be non-negative");
length_ = fileSize.value();
if (options.fileSize.has_value()) {
VELOX_CHECK_GE(
options.fileSize.value(), 0, "File size must be non-negative");
length_ = options.fileSize.value();
}

if (length_ != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,44 +168,44 @@ void readData(ReadFile* readFile) {
"ccccc");
}

TEST_F(AbfsFileSystemTest, readFile) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
AbfsFileSystem abfs{hiveConfig};
auto readFile = abfs.openFileForRead(fullFilePath);
readData(readFile.get());
}

TEST_F(AbfsFileSystemTest, openFileForReadWithOptions) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};
FileOptions options;
options.fileSize = 15 + kOneMB;
auto readFile = abfs->openFileForRead(fullFilePath, options);
auto readFile = abfs.openFileForRead(fullFilePath, options);
readData(readFile.get());
}

TEST_F(AbfsFileSystemTest, openFileForReadWithInvalidOptions) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};
FileOptions options;
options.fileSize = -kOneMB;
VELOX_ASSERT_THROW(
abfs->openFileForRead(fullFilePath, options),
"Length must be non-negative");
}

TEST_F(AbfsFileSystemTest, readFile) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
auto readFile = abfs->openFileForRead(fullFilePath);
readData(readFile.get());
abfs.openFileForRead(fullFilePath, options),
"File size must be non-negative");
}

TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) {
std::atomic<bool> startThreads = false;
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};

std::vector<std::thread> threads;
std::mt19937 generator(std::random_device{}());
Expand All @@ -220,7 +220,7 @@ TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) {
}
std::this_thread::sleep_for(
std::chrono::microseconds(sleepTimesInMicroseconds[index]));
auto readFile = abfs->openFileForRead(fullFilePath);
auto readFile = abfs.openFileForRead(fullFilePath);
readData(readFile.get());
});
threads.emplace_back(std::move(thread));
Expand All @@ -237,9 +237,9 @@ TEST_F(AbfsFileSystemTest, missingFile) {
azuriteServer->connectionStr()}});
const std::string abfsFile =
facebook::velox::filesystems::test::AzuriteABFSEndpoint + "test.txt";
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_RUNTIME_THROW_CODE(
abfs->openFileForRead(abfsFile), error_code::kFileNotFound, "404");
abfs.openFileForRead(abfsFile), error_code::kFileNotFound, "404");
}

TEST_F(AbfsFileSystemTest, OpenFileForWriteTest) {
Expand Down Expand Up @@ -286,56 +286,56 @@ TEST_F(AbfsFileSystemTest, renameNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_THROW(
abfs->rename("text", "text2"), "rename for abfs not implemented");
abfs.rename("text", "text2"), "rename for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, removeNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
VELOX_ASSERT_THROW(abfs->remove("text"), "remove for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_THROW(abfs.remove("text"), "remove for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, existsNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
VELOX_ASSERT_THROW(abfs->exists("text"), "exists for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_THROW(abfs.exists("text"), "exists for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, listNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
VELOX_ASSERT_THROW(abfs->list("dir"), "list for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_THROW(abfs.list("dir"), "list for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, mkdirNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
VELOX_ASSERT_THROW(abfs->mkdir("dir"), "mkdir for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_THROW(abfs.mkdir("dir"), "mkdir for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, rmdirNotImplemented) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
VELOX_ASSERT_THROW(abfs->rmdir("dir"), "rmdir for abfs not implemented");
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_THROW(abfs.rmdir("dir"), "rmdir for abfs not implemented");
}

TEST_F(AbfsFileSystemTest, credNotFOund) {
const std::string abfsFile =
std::string("abfs://[email protected]/test");
auto hiveConfig = AbfsFileSystemTest::hiveConfig({});
auto abfs = std::make_shared<filesystems::abfs::AbfsFileSystem>(hiveConfig);
AbfsFileSystem abfs{hiveConfig};
VELOX_ASSERT_THROW(
abfs->openFileForRead(abfsFile), "Failed to find storage credentials");
abfs.openFileForRead(abfsFile), "Failed to find storage credentials");
}
9 changes: 4 additions & 5 deletions velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ class GCSReadFile final : public ReadFile {
// Gets the length of the file.
// Checks if there are any issues reading the file.
void initialize(const filesystems::FileOptions& options) {
auto fileSize = options.getFileSize();

if (fileSize.has_value()) {
VELOX_CHECK_GE(fileSize.value(), 0, "Length must be non-negative");
length_ = fileSize.value();
if (options.fileSize.has_value()) {
VELOX_CHECK_GE(
options.fileSize.value(), 0, "File size must be non-negative");
length_ = options.fileSize.value();
}

// Make it a no-op if invoked twice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ std::string HdfsFileSystem::name() const {

std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& /*unused*/) {
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
}
Expand Down
9 changes: 4 additions & 5 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ class S3ReadFile final : public ReadFile {
// Gets the length of the file.
// Checks if there are any issues reading the file.
void initialize(const filesystems::FileOptions& options) {
auto fileSize = options.getFileSize();

if (fileSize.has_value()) {
VELOX_CHECK_GE(fileSize.value(), 0, "Length must be non-negative");
length_ = fileSize.value();
if (options.fileSize.has_value()) {
VELOX_CHECK_GE(
options.fileSize.value(), 0, "File size must be non-negative");
length_ = options.fileSize.value();
}

// Make it a no-op if invoked twice.
Expand Down

0 comments on commit 875086d

Please sign in to comment.