Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc: Add compression format suffix to the written Parquet file #11563

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,24 @@ CompressionKind stringToCompressionKind(const std::string& kind) {
VELOX_UNSUPPORTED("Not support compression kind {}", kind);
}
}

std::string fileNameSuffix(CompressionKind kind) {
switch (static_cast<int32_t>(kind)) {
case CompressionKind_ZLIB:
return ".zlib";
case CompressionKind_SNAPPY:
return ".snappy";
case CompressionKind_LZO:
return ".lzo";
case CompressionKind_ZSTD:
return ".zstd";
case CompressionKind_LZ4:
return ".lz4";
case CompressionKind_GZIP:
return ".gz";
case CompressionKind_NONE:
default:
return "";
}
}
} // namespace facebook::velox::common
15 changes: 15 additions & 0 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ std::string compressionKindToString(CompressionKind kind);

CompressionKind stringToCompressionKind(const std::string& kind);

std::string fileNameSuffix(CompressionKind kind);

constexpr uint64_t DEFAULT_COMPRESSION_BLOCK_SIZE = 256 * 1024;

} // namespace facebook::velox::common
Expand All @@ -56,3 +58,16 @@ struct fmt::formatter<facebook::velox::common::CompressionKind>
facebook::velox::common::compressionKindToString(s), ctx);
}
};

template <>
struct fmt::formatter<std::optional<facebook::velox::common::CompressionKind>>
: fmt::formatter<std::string> {
auto format(
const std::optional<facebook::velox::common::CompressionKind>& s,
format_context& ctx) {
return formatter<std::string>::format(
facebook::velox::common::fileNameSuffix(s.value_or(
facebook::velox::common::CompressionKind::CompressionKind_NONE)),
ctx);
}
};
16 changes: 14 additions & 2 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -939,8 +939,16 @@ std::pair<std::string, std::string> HiveDataSink::getWriterFileNames(
insertTableHandle_->storageFormat() ==
dwio::common::FileFormat::PARQUET) {
return {
fmt::format("{}{}", targetFileName, ".parquet"),
fmt::format("{}{}", writeFileName, ".parquet")};
fmt::format(
"{}{}{}",
targetFileName,
insertTableHandle_->compressionKind(),
".parquet"),
fmt::format(
"{}{}{}",
writeFileName,
insertTableHandle_->compressionKind(),
".parquet")};
}
return {targetFileName, writeFileName};
}
Expand Down Expand Up @@ -1185,4 +1193,8 @@ uint64_t HiveDataSink::WriterReclaimer::reclaim(
}
return reclaimedBytes;
}

std::string HiveDataSink::writeFileName() const {
return getWriterFileNames(std::nullopt).second;
}
} // namespace facebook::velox::connector::hive
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ class HiveDataSink : public DataSink {

bool canReclaim() const;

// For testing: check the write file name.
std::string writeFileName() const;

private:
// Validates the state transition from 'oldState' to 'newState'.
void checkStateTransition(State oldState, State newState);
Expand Down
50 changes: 46 additions & 4 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
const std::shared_ptr<connector::hive::HiveBucketProperty>&
bucketProperty = nullptr,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr) {
nullptr,
const CompressionKind compressionKind =
CompressionKind::CompressionKind_ZSTD) {
return makeHiveInsertTableHandle(
outputRowType->names(),
outputRowType->children(),
Expand All @@ -155,7 +157,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
std::nullopt,
connector::hive::LocationHandle::TableType::kNew),
fileFormat,
CompressionKind::CompressionKind_ZSTD,
compressionKind,
{},
writerOptions);
}
Expand All @@ -168,7 +170,9 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
const std::shared_ptr<connector::hive::HiveBucketProperty>&
bucketProperty = nullptr,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr) {
nullptr,
const CompressionKind compressionKind =
CompressionKind::CompressionKind_ZSTD) {
return std::make_shared<HiveDataSink>(
rowType,
createHiveInsertTableHandle(
Expand All @@ -177,7 +181,8 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
fileFormat,
partitionedBy,
bucketProperty,
writerOptions),
writerOptions,
compressionKind),
connectorQueryCtx_.get(),
CommitStrategy::kNoCommit,
connectorConfig_);
Expand Down Expand Up @@ -1168,6 +1173,43 @@ TEST_F(HiveDataSinkTest, flushPolicyWithParquet) {
EXPECT_EQ(fileMeta.numRowGroups(), 10);
EXPECT_EQ(fileMeta.rowGroup(0).numRows(), 500);
}

TEST_F(HiveDataSinkTest, parquetCompressionFileName) {
auto checkFileSuffix = [this](
const CompressionKind compression,
std::string_view expectedSuffix) {
auto endsWith = [](std::string_view str, std::string_view suffix) {
if (str.length() < suffix.length()) {
return false;
}
return (
0 ==
str.compare(str.length() - suffix.length(), suffix.length(), suffix));
};
auto dataSink = createDataSink(
rowType_,
"/path/to/test",
dwio::common::FileFormat::PARQUET,
{},
nullptr,
nullptr,
compression);
auto writeFileName = dataSink->writeFileName();
VELOX_CHECK(
endsWith(writeFileName, expectedSuffix),
"{} not end with {}",
writeFileName,
expectedSuffix);
};
checkFileSuffix(CompressionKind::CompressionKind_NONE, ".parquet");
checkFileSuffix(CompressionKind::CompressionKind_ZLIB, ".zlib.parquet");
checkFileSuffix(CompressionKind::CompressionKind_SNAPPY, ".snappy.parquet");
checkFileSuffix(CompressionKind::CompressionKind_LZO, ".lzo.parquet");
checkFileSuffix(CompressionKind::CompressionKind_ZSTD, ".zstd.parquet");
checkFileSuffix(CompressionKind::CompressionKind_LZ4, ".lz4.parquet");
checkFileSuffix(CompressionKind::CompressionKind_GZIP, ".gz.parquet");
}

#endif

TEST_F(HiveDataSinkTest, flushPolicyWithDWRF) {
Expand Down
Loading