Skip to content

Commit

Permalink
Remove the use task_bucketed_writer_count config (#11138)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11138

We shall just use task_partitioned_writer_count which corresponds to the shuffle
partitioning scheme used by table write node instead of physical table layout.
We can bump up the number of table writer driver threads even for non-bucketed
partition table which won't cause small files as the partitioning scheme is based
on the table partition columns

Reviewed By: kewang1024

Differential Revision: D63578063

fbshipit-source-id: 3f1f38b66f475bf40526c91dd8c9b0360de0ac2b
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 1, 2024
1 parent 7ad64b3 commit bfb8ebe
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 210 deletions.
3 changes: 0 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,6 @@ folly::dynamic TableWriteNode::serialize() const {
obj["connectorInsertTableHandle"] =
insertTableHandle_->connectorInsertTableHandle()->serialize();
obj["hasPartitioningScheme"] = hasPartitioningScheme_;
obj["hasBucketProperty"] = hasBucketProperty_;
obj["outputType"] = outputType_->serialize();
obj["commitStrategy"] = connector::commitStrategyToString(commitStrategy_);
return obj;
Expand All @@ -1876,7 +1875,6 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) {
ISerializable::deserialize<connector::ConnectorInsertTableHandle>(
obj["connectorInsertTableHandle"]));
const bool hasPartitioningScheme = obj["hasPartitioningScheme"].asBool();
const bool hasBucketProperty = obj["hasBucketProperty"].asBool();
auto outputType = deserializeRowType(obj["outputType"]);
auto commitStrategy =
connector::stringToCommitStrategy(obj["commitStrategy"].asString());
Expand All @@ -1889,7 +1887,6 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) {
std::make_shared<InsertTableHandle>(
connectorId, connectorInsertTableHandle),
hasPartitioningScheme,
hasBucketProperty,
outputType,
commitStrategy,
source);
Expand Down
169 changes: 86 additions & 83 deletions velox/core/PlanNode.h

Large diffs are not rendered by default.

9 changes: 0 additions & 9 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,6 @@ class QueryConfig {
static constexpr const char* kTaskPartitionedWriterCount =
"task_partitioned_writer_count";

/// The number of local parallel table writer operators per task for
/// bucketed writes. If not set, use "task_writer_count".
static constexpr const char* kTaskBucketedWriterCount =
"task_bucketed_writer_count";

/// If true, finish the hash probe on an empty build table for a specific set
/// of hash joins.
static constexpr const char* kHashProbeFinishEarlyOnEmptyBuild =
Expand Down Expand Up @@ -772,10 +767,6 @@ class QueryConfig {
.value_or(taskWriterCount());
}

uint32_t taskBucketedWriterCount() const {
return get<uint32_t>(kTaskBucketedWriterCount).value_or(taskWriterCount());
}

bool hashProbeFinishEarlyOnEmptyBuild() const {
return get<bool>(kHashProbeFinishEarlyOnEmptyBuild, false);
}
Expand Down
45 changes: 11 additions & 34 deletions velox/core/tests/QueryConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,42 +57,27 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) {
struct {
std::optional<int> numWriterCounter;
std::optional<int> numPartitionedWriterCounter;
std::optional<int> numBucketedWriterCounter;
int expectedWriterCounter;
int expectedPartitionedWriterCounter;
int expectedBucketedWriterCounter;

std::string debugString() const {
return fmt::format(
"numWriterCounter[{}] numPartitionedWriterCounter[{}] numBucketedWriterCounter[{}] expectedPartitionedWriterCounter[{}] expectedBucketedWriterCounter[{}]",
"numWriterCounter[{}] numPartitionedWriterCounter[{}] expectedWriterCounter[{}] expectedPartitionedWriterCounter[{}]",
numWriterCounter.value_or(0),
numPartitionedWriterCounter.value_or(0),
numBucketedWriterCounter.value_or(0),
expectedWriterCounter,
expectedPartitionedWriterCounter,
expectedBucketedWriterCounter);
expectedPartitionedWriterCounter);
}
} testSettings[] = {
{std::nullopt, std::nullopt, std::nullopt, 4, 4, 4},
{std::nullopt, 1, std::nullopt, 4, 1, 4},
{std::nullopt, 6, std::nullopt, 4, 6, 4},
{2, 4, std::nullopt, 2, 4, 2},
{4, 2, std::nullopt, 4, 2, 4},
{4, 6, std::nullopt, 4, 6, 4},
{6, 5, std::nullopt, 6, 5, 6},
{6, 4, std::nullopt, 6, 4, 6},
{6, std::nullopt, 6, 6, 6, 6},
{6, std::nullopt, 1, 6, 6, 1},
{std::nullopt, std::nullopt, 4, 4, 4, 4},
{std::nullopt, std::nullopt, 1, 4, 4, 1},
{std::nullopt, 1, 1, 4, 1, 1},
{std::nullopt, 1, 2, 4, 1, 2},
{std::nullopt, 6, 6, 4, 6, 6},
{std::nullopt, 6, 3, 4, 6, 3},
{2, 4, 3, 2, 4, 3},
{4, 2, 1, 4, 2, 1},
{4, 6, 7, 4, 6, 7},
{6, std::nullopt, 4, 6, 6, 4}};
{std::nullopt, std::nullopt, 4, 4},
{std::nullopt, 1, 4, 1},
{std::nullopt, 6, 4, 6},
{2, 4, 2, 4},
{4, 2, 4, 2},
{4, 6, 4, 6},
{6, 5, 6, 5},
{6, 4, 6, 4},
{6, std::nullopt, 6, 6}};
for (const auto& testConfig : testSettings) {
SCOPED_TRACE(testConfig.debugString());
std::unordered_map<std::string, std::string> configData;
Expand All @@ -106,21 +91,13 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) {
QueryConfig::kTaskPartitionedWriterCount,
std::to_string(testConfig.numPartitionedWriterCounter.value()));
}
if (testConfig.numBucketedWriterCounter.has_value()) {
configData.emplace(
QueryConfig::kTaskBucketedWriterCount,
std::to_string(testConfig.numBucketedWriterCounter.value()));
}
auto queryCtx =
QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
const QueryConfig& config = queryCtx->queryConfig();
ASSERT_EQ(config.taskWriterCount(), testConfig.expectedWriterCounter);
ASSERT_EQ(
config.taskPartitionedWriterCount(),
testConfig.expectedPartitionedWriterCounter);
ASSERT_EQ(
config.taskBucketedWriterCount(),
testConfig.expectedBucketedWriterCounter);
}
}

Expand Down
4 changes: 0 additions & 4 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,6 @@ Table Writer
- integer
- task_writer_count
- The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default.
* - task_bucketed_writer_count
- integer
- task_writer_count
- The number of parallel table writer threads per task for bucketed table writes. If not set, use 'task_writer_count' as default.

Hive Connector
--------------
Expand Down
4 changes: 1 addition & 3 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ uint32_t maxDrivers(
if (!connectorInsertHandle->supportsMultiThreading()) {
return 1;
} else {
if (tableWrite->hasBucketProperty()) {
return queryConfig.taskBucketedWriterCount();
} else if (tableWrite->hasPartitioningScheme()) {
if (tableWrite->hasPartitioningScheme()) {
return queryConfig.taskPartitionedWriterCount();
} else {
return queryConfig.taskWriterCount();
Expand Down
77 changes: 5 additions & 72 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ std::function<PlanNodePtr(std::string, PlanNodePtr)> addTableWriter(
const std::shared_ptr<core::AggregationNode>& aggregationNode,
const std::shared_ptr<core::InsertTableHandle>& insertHandle,
bool hasPartitioningScheme,
bool hasBucketProperty,
connector::CommitStrategy commitStrategy =
connector::CommitStrategy::kNoCommit) {
return [=](core::PlanNodeId nodeId,
Expand All @@ -114,7 +113,6 @@ std::function<PlanNodePtr(std::string, PlanNodePtr)> addTableWriter(
aggregationNode,
insertHandle,
hasPartitioningScheme,
hasBucketProperty,
TableWriteTraits::outputType(aggregationNode),
commitStrategy,
std::move(source));
Expand Down Expand Up @@ -608,8 +606,7 @@ class TableWriteTest : public HiveConnectorTestBase {
partitionedBy,
bucketProperty,
compressionKind),
!partitionedBy.empty(),
bucketProperty != nullptr,
false,
outputCommitStrategy))
.capturePlanNodeId(tableWriteNodeId_);
if (aggregateResult) {
Expand All @@ -633,8 +630,7 @@ class TableWriteTest : public HiveConnectorTestBase {
partitionedBy,
bucketProperty,
compressionKind),
!partitionedBy.empty(),
bucketProperty != nullptr,
false,
outputCommitStrategy))
.capturePlanNodeId(tableWriteNodeId_)
.localPartition(std::vector<std::string>{})
Expand Down Expand Up @@ -675,8 +671,7 @@ class TableWriteTest : public HiveConnectorTestBase {
partitionedBy,
bucketProperty,
compressionKind),
!partitionedBy.empty(),
bucketProperty != nullptr,
false,
outputCommitStrategy))
.capturePlanNodeId(tableWriteNodeId_)
.localPartition({})
Expand Down Expand Up @@ -1605,65 +1600,6 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) {
}
}

TEST_P(AllTableWriterTest, writerDriverThreads) {
const int batchSize = 1'000;
const int numBatches = 20;
const std::vector<RowVectorPtr> vectors = makeVectors(numBatches, batchSize);

createDuckDbTable(vectors);

auto queryCtx = core::QueryCtx::create(executor_.get());
auto outputDirectory = TempDirectoryPath::create();
core::PlanNodeId tableWriteNodeId;
auto writerPlan =
PlanBuilder()
.values(vectors, /*parallelizable=*/true)
.tableWrite(
outputDirectory->getPath(),
partitionedBy_,
bucketProperty_ != nullptr ? bucketProperty_->bucketCount() : 0,
bucketProperty_ != nullptr ? bucketProperty_->bucketedBy()
: std::vector<std::string>{},
bucketProperty_ != nullptr
? bucketProperty_->sortedBy()
: std::vector<std::shared_ptr<
const connector::hive::HiveSortingColumn>>{})
.capturePlanNodeId(tableWriteNodeId)
.localPartition({})
.tableWriteMerge()
.project({TableWriteTraits::rowCountColumnName()})
.singleAggregation(
{},
{fmt::format("sum({})", TableWriteTraits::rowCountColumnName())})
.planNode();
const int taskWriterCount = 4;
const int taskPartitionedWriterCount = 8;
const int taskBucketWriterCount = 9;
const auto expectedNumWriterDrivers = bucketProperty_ != nullptr
? taskBucketWriterCount
: partitionedBy_.empty() ? taskWriterCount
: taskPartitionedWriterCount;
const auto expectedNumRows =
numBatches * batchSize * expectedNumWriterDrivers;
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(queryCtx)
.maxDrivers(10)
.config(
core::QueryConfig::kTaskWriterCount,
std::to_string(taskWriterCount))
.config(
core::QueryConfig::kTaskPartitionedWriterCount,
std::to_string(taskPartitionedWriterCount))
.config(
core::QueryConfig::kTaskBucketedWriterCount,
std::to_string(taskBucketWriterCount))
.plan(std::move(writerPlan))
.assertResults(fmt::format("SELECT {}", expectedNumRows));
auto planStats = exec::toPlanStats(task->taskStats());
auto& tableWriteStats = planStats.at(tableWriteNodeId);
ASSERT_EQ(tableWriteStats.numDrivers, expectedNumWriterDrivers);
}

TEST_P(AllTableWriterTest, renameAndReorderColumns) {
auto filePaths = makeFilePaths(5);
auto vectors = makeVectors(filePaths.size(), 500);
Expand Down Expand Up @@ -2900,7 +2836,6 @@ TEST_P(AllTableWriterTest, columnStatsDataTypes) {
partitionedBy_,
nullptr,
makeLocationHandle(outputDirectory->getPath()))),
!partitionedBy_.empty(),
false,
CommitStrategy::kNoCommit))
.planNode();
Expand Down Expand Up @@ -2990,8 +2925,7 @@ TEST_P(AllTableWriterTest, columnStats) {
partitionedBy_,
bucketProperty_,
makeLocationHandle(outputDirectory->getPath()))),
!partitionedBy_.empty(),
bucketProperty_ != nullptr,
false,
commitStrategy_))
.planNode();

Expand Down Expand Up @@ -3091,8 +3025,7 @@ TEST_P(AllTableWriterTest, columnStatsWithTableWriteMerge) {
partitionedBy_,
bucketProperty_,
makeLocationHandle(outputDirectory->getPath()))),
!partitionedBy_.empty(),
bucketProperty_ != nullptr,
false,
commitStrategy_));

auto mergeAggregationNode = generateAggregationNode(
Expand Down
3 changes: 1 addition & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,7 @@ PlanBuilder& PlanBuilder::tableWrite(
rowType->names(),
aggregationNode,
insertHandle,
!partitionBy.empty(),
bucketProperty != nullptr,
false,
TableWriteTraits::outputType(aggregationNode),
connector::CommitStrategy::kNoCommit,
planNode_);
Expand Down

0 comments on commit bfb8ebe

Please sign in to comment.