From bfb8ebe9282a2665ef8edecde95fdb8b13b67080 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Tue, 1 Oct 2024 15:48:33 -0700 Subject: [PATCH] Remove the use task_bucketed_writer_count config (#11138) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11138 We shall just use task_partitioned_writer_count which corresponds to the shuffle partitioning scheme used by table write node instead of physical table layout. We can bump up the number of table writer driver threads even for non-bucketed partition table which won't cause small files as the partitioning scheme is based on the table partition columns Reviewed By: kewang1024 Differential Revision: D63578063 fbshipit-source-id: 3f1f38b66f475bf40526c91dd8c9b0360de0ac2b --- velox/core/PlanNode.cpp | 3 - velox/core/PlanNode.h | 169 +++++++++++++------------ velox/core/QueryConfig.h | 9 -- velox/core/tests/QueryConfigTest.cpp | 45 ++----- velox/docs/configs.rst | 4 - velox/exec/LocalPlanner.cpp | 4 +- velox/exec/tests/TableWriteTest.cpp | 77 +---------- velox/exec/tests/utils/PlanBuilder.cpp | 3 +- 8 files changed, 104 insertions(+), 210 deletions(-) diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 5a09581edcb5..f1e56ed7cfd6 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1853,7 +1853,6 @@ folly::dynamic TableWriteNode::serialize() const { obj["connectorInsertTableHandle"] = insertTableHandle_->connectorInsertTableHandle()->serialize(); obj["hasPartitioningScheme"] = hasPartitioningScheme_; - obj["hasBucketProperty"] = hasBucketProperty_; obj["outputType"] = outputType_->serialize(); obj["commitStrategy"] = connector::commitStrategyToString(commitStrategy_); return obj; @@ -1876,7 +1875,6 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) { ISerializable::deserialize( obj["connectorInsertTableHandle"])); const bool hasPartitioningScheme = obj["hasPartitioningScheme"].asBool(); - const bool hasBucketProperty = obj["hasBucketProperty"].asBool(); auto outputType = deserializeRowType(obj["outputType"]); auto commitStrategy = connector::stringToCommitStrategy(obj["commitStrategy"].asString()); @@ -1889,7 +1887,6 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) { std::make_shared( connectorId, connectorInsertTableHandle), hasPartitioningScheme, - hasBucketProperty, outputType, commitStrategy, source); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 3600a86897ed..f359bc90dee3 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -671,7 +671,6 @@ class TableWriteNode : public PlanNode { std::shared_ptr aggregationNode, std::shared_ptr insertTableHandle, bool hasPartitioningScheme, - bool hasBucketProperty, RowTypePtr outputType, connector::CommitStrategy commitStrategy, const PlanNodePtr& source) @@ -682,7 +681,6 @@ class TableWriteNode : public PlanNode { aggregationNode_(std::move(aggregationNode)), insertTableHandle_(std::move(insertTableHandle)), hasPartitioningScheme_(hasPartitioningScheme), - hasBucketProperty_(hasBucketProperty), outputType_(std::move(outputType)), commitStrategy_(commitStrategy) { VELOX_USER_CHECK_EQ(columns->size(), columnNames.size()); @@ -703,6 +701,7 @@ class TableWriteNode : public PlanNode { std::shared_ptr aggregationNode, std::shared_ptr insertTableHandle, bool hasPartitioningScheme, + bool /*unused*/, RowTypePtr outputType, connector::CommitStrategy commitStrategy, const PlanNodePtr& source) @@ -713,7 +712,6 @@ class TableWriteNode : public PlanNode { std::move(aggregationNode), std::move(insertTableHandle), hasPartitioningScheme, - false, std::move(outputType), commitStrategy, source) {} @@ -733,8 +731,8 @@ class TableWriteNode : public PlanNode { return columns_; } - /// Column names to use when writing the table. This vector is aligned with - /// 'columns' vector. + /// Column names to use when writing the table. This vector is aligned + /// with 'columns' vector. const std::vector& columnNames() const { return columnNames_; } @@ -743,23 +741,15 @@ class TableWriteNode : public PlanNode { return insertTableHandle_; } - /// Indicates if this table write has specified partitioning scheme. If true, - /// the task creates a number of table write operators based on the query - /// config 'task_partitioned_writer_count', otherwise based on - /// 'task_writer_count'. + /// Indicates if this table write plan node has specified partitioning + /// scheme for remote and local shuffles. If true, the task creates a + /// number of table write operators based on the query config + /// 'task_partitioned_writer_count', otherwise based on + /// x'task_writer_count'. bool hasPartitioningScheme() const { return hasPartitioningScheme_; } - /// Indicates if this table write has specified bucket property. If true, the - /// task creates a number of table write operators based on the query config - /// 'task_partitioned_bucket_writer_count', otherwise based on - /// 'task_partitioned_writer_count' or 'task__writer_count' depending on - /// whether paritition scheme is specified or not. - bool hasBucketProperty() const { - return hasBucketProperty_; - } - connector::CommitStrategy commitStrategy() const { return commitStrategy_; } @@ -790,7 +780,6 @@ class TableWriteNode : public PlanNode { const std::shared_ptr aggregationNode_; const std::shared_ptr insertTableHandle_; const bool hasPartitioningScheme_; - const bool hasBucketProperty_; const RowTypePtr outputType_; const connector::CommitStrategy commitStrategy_; }; @@ -798,8 +787,8 @@ class TableWriteNode : public PlanNode { class TableWriteMergeNode : public PlanNode { public: /// 'outputType' specifies the type to store the metadata of table write - /// output which contains the following columns: 'numWrittenRows', 'fragment' - /// and 'tableCommitContext'. + /// output which contains the following columns: 'numWrittenRows', + /// 'fragment' and 'tableCommitContext'. TableWriteMergeNode( const PlanNodeId& id, RowTypePtr outputType, @@ -840,11 +829,11 @@ class TableWriteMergeNode : public PlanNode { }; /// For each input row, generates N rows with M columns according to -/// specified 'projections'. 'projections' is an N x M matrix of expressions: a -/// vector of N rows each having M columns. Each expression is either a column -/// reference or a constant. Both null and non-null constants are allowed. -/// 'names' is a list of M new column names. The semantic of this operator -/// matches Spark. +/// specified 'projections'. 'projections' is an N x M matrix of expressions: +/// a vector of N rows each having M columns. Each expression is either a +/// column reference or a constant. Both null and non-null constants are +/// allowed. 'names' is a list of M new column names. The semantic of this +/// operator matches Spark. class ExpandNode : public PlanNode { public: ExpandNode( @@ -889,10 +878,10 @@ class ExpandNode : public PlanNode { const std::vector> projections_; }; -/// Plan node used to implement aggregations over grouping sets. Duplicates the -/// aggregation input for each set of grouping keys. The output contains one -/// column for each grouping key, followed by aggregation inputs, followed by a -/// column containing grouping set ID. For a given grouping set, a subset +/// Plan node used to implement aggregations over grouping sets. Duplicates +/// the aggregation input for each set of grouping keys. The output contains +/// one column for each grouping key, followed by aggregation inputs, followed +/// by a column containing grouping set ID. For a given grouping set, a subset /// of the grouping key columns present in the set are populated with values. /// The rest of the grouping key columns are filled in with nulls. class GroupIdNode : public PlanNode { @@ -907,9 +896,9 @@ class GroupIdNode : public PlanNode { }; /// @param id Plan node ID. - /// @param groupingSets A list of grouping key sets. Grouping keys within the - /// set must be unique, but grouping keys across sets may repeat. - /// Note: groupingSets are specified using output column names. + /// @param groupingSets A list of grouping key sets. Grouping keys within + /// the set must be unique, but grouping keys across sets may repeat. Note: + /// groupingSets are specified using output column names. /// @param groupingKeyInfos The names and order of the grouping keys in the /// output. /// @param aggregationInputs Columns that contain inputs to the aggregate @@ -1096,9 +1085,9 @@ class PartitionFunction { /// @param input RowVector to split into partitions. /// @param [out] partitions Computed partition numbers for each row in /// 'input'. - /// @return Returns partition number in case all rows of 'input' are assigned - /// to the same partition. In this case 'partitions' vector is left unchanged. - /// Used to optimize round-robin partitioning in local exchange. + /// @return Returns partition number in case all rows of 'input' are + /// assigned to the same partition. In this case 'partitions' vector is left + /// unchanged. Used to optimize round-robin partitioning in local exchange. virtual std::optional partition( const RowVector& input, std::vector& partitions) = 0; @@ -1141,9 +1130,9 @@ class GatherPartitionFunctionSpec : public PartitionFunctionSpec { } }; -/// Partitions data using specified partition function. The number of partitions -/// is determined by the parallelism of the upstream pipeline. Can be used to -/// gather data from multiple sources. +/// Partitions data using specified partition function. The number of +/// partitions is determined by the parallelism of the upstream pipeline. Can +/// be used to gather data from multiple sources. class LocalPartitionNode : public PlanNode { public: enum class Type { @@ -1347,9 +1336,9 @@ class PartitionedOutputNode : public PlanNode { } /// Returns true if an arbitrary row and all rows with null keys must be - /// replicated to all destinations. This is used to ensure correct results for - /// anti-join which requires all nodes to know whether combined build side is - /// empty and whether it has any entry with null join key. + /// replicated to all destinations. This is used to ensure correct results + /// for anti-join which requires all nodes to know whether combined build + /// side is empty and whether it has any entry with null join key. bool isReplicateNullsAndAny() const { return replicateNullsAndAny_; } @@ -1385,7 +1374,8 @@ class PartitionedOutputNode : public PlanNode { FOLLY_ALWAYS_INLINE std::ostream& operator<<( std::ostream& out, const PartitionedOutputNode::Kind kind) { - return out << PartitionedOutputNode::kindString(kind); + out << PartitionedOutputNode::kindString(kind); + return out; } enum class JoinType { @@ -1396,52 +1386,63 @@ enum class JoinType { // all combinations. In addition, return all rows from the left that have no // match on the right with right-side columns filled with nulls. kLeft = 1, - // Opposite of kLeft. For each row on the right, find all matching rows on the + // Opposite of kLeft. For each row on the right, find all matching rows on + // the // left and return all combinations. In addition, return all rows from the // right that have no match on the left with left-side columns filled with // nulls. kRight = 2, - // A "union" of kLeft and kRight. For each row on the left, find all matching - // rows on the right and return all combinations. In addition, return all rows + // A "union" of kLeft and kRight. For each row on the left, find all + // matching + // rows on the right and return all combinations. In addition, return all + // rows // from the left that have no - // match on the right with right-side columns filled with nulls. Also, return + // match on the right with right-side columns filled with nulls. Also, + // return // all rows from the // right that have no match on the left with left-side columns filled with // nulls. kFull = 3, - // Return a subset of rows from the left side which have a match on the right + // Return a subset of rows from the left side which have a match on the + // right // side. For this join type, cardinality of the output is less than or equal // to the cardinality of the left side. kLeftSemiFilter = 4, // Return each row from the left side with a boolean flag indicating whether - // there exists a match on the right side. For this join type, cardinality of + // there exists a match on the right side. For this join type, cardinality + // of // the output equals the cardinality of the left side. // // The handling of the rows with nulls in the join key depends on the // 'nullAware' boolean specified separately. // - // Null-aware join follows IN semantic. Regular join follows EXISTS semantic. + // Null-aware join follows IN semantic. Regular join follows EXISTS + // semantic. kLeftSemiProject = 5, // Opposite of kLeftSemiFilter. Return a subset of rows from the right side - // which have a match on the left side. For this join type, cardinality of the + // which have a match on the left side. For this join type, cardinality of + // the // output is less than or equal to the cardinality of the right side. kRightSemiFilter = 6, // Opposite of kLeftSemiProject. Return each row from the right side with a - // boolean flag indicating whether there exists a match on the left side. For + // boolean flag indicating whether there exists a match on the left side. + // For // this join type, cardinality of the output equals the cardinality of the // right side. // // The handling of the rows with nulls in the join key depends on the // 'nullAware' boolean specified separately. // - // Null-aware join follows IN semantic. Regular join follows EXISTS semantic. + // Null-aware join follows IN semantic. Regular join follows EXISTS + // semantic. kRightSemiProject = 7, // Return each row from the left side which has no match on the right side. // The handling of the rows with nulls in the join key depends on the // 'nullAware' boolean specified separately. // // Null-aware join follows NOT IN semantic: - // (1) return empty result if the right side contains a record with a null in + // (1) return empty result if the right side contains a record with a null + // in // the join key; // (2) return left-side row with null in the join key only when // the right side is empty. @@ -1493,7 +1494,7 @@ inline bool isAntiJoin(JoinType joinType) { return joinType == JoinType::kAnti; } -inline bool isNullAwareSupported(core::JoinType joinType) { +inline bool isNullAwareSupported(JoinType joinType) { return joinType == JoinType::kAnti || joinType == JoinType::kLeftSemiProject || joinType == JoinType::kRightSemiProject; @@ -1595,8 +1596,8 @@ class AbstractJoinNode : public PlanNode { }; /// Represents inner/outer/semi/anti hash joins. Translates to an -/// exec::HashBuild and exec::HashProbe. A separate pipeline is produced for the -/// build side when generating exec::Operators. +/// exec::HashBuild and exec::HashProbe. A separate pipeline is produced for +/// the build side when generating exec::Operators. /// /// 'nullAware' boolean applies to semi and anti joins. When true, the join /// semantic is IN / NOT IN. When false, the join semantic is EXISTS / NOT @@ -1689,22 +1690,23 @@ class MergeJoinNode : public AbstractJoinNode { folly::dynamic serialize() const override; /// If merge join supports this join type. - static bool isSupported(core::JoinType joinType); + static bool isSupported(JoinType joinType); static PlanNodePtr create(const folly::dynamic& obj, void* context); }; /// Represents inner/outer nested loop joins. Translates to an -/// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate pipeline -/// is produced for the build side when generating exec::Operators. +/// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate +/// pipeline is produced for the build side when generating exec::Operators. /// -/// Nested loop join (NLJ) supports both equal and non-equal joins. Expressions -/// specified in joinCondition are evaluated on every combination of left/right -/// tuple, to emit result. Results are emitted following the same input order of -/// probe rows for inner and left joins, for each thread of execution. +/// Nested loop join (NLJ) supports both equal and non-equal joins. +/// Expressions specified in joinCondition are evaluated on every combination +/// of left/right tuple, to emit result. Results are emitted following the +/// same input order of probe rows for inner and left joins, for each thread +/// of execution. /// -/// To create Cartesian product of the left/right's output, use the constructor -/// without `joinType` and `joinCondition` parameter. +/// To create Cartesian product of the left/right's output, use the +/// constructor without `joinType` and `joinCondition` parameter. class NestedLoopJoinNode : public PlanNode { public: NestedLoopJoinNode( @@ -1744,7 +1746,7 @@ class NestedLoopJoinNode : public PlanNode { folly::dynamic serialize() const override; /// If nested loop join supports this join type. - static bool isSupported(core::JoinType joinType); + static bool isSupported(JoinType joinType); static PlanNodePtr create(const folly::dynamic& obj, void* context); @@ -1951,11 +1953,11 @@ class LimitNode : public PlanNode { class UnnestNode : public PlanNode { public: /// @param replicateVariables Inputs that are projected as is - /// @param unnestVariables Inputs that are unnested. Must be of type ARRAY or - /// MAP. + /// @param unnestVariables Inputs that are unnested. Must be of type ARRAY + /// or MAP. /// @param unnestNames Names to use for unnested outputs: one name for each - /// array (element); two names for each map (key and value). The output names - /// must appear in the same order as unnestVariables. + /// array (element); two names for each map (key and value). The output + /// names must appear in the same order as unnestVariables. /// @param ordinalityName Optional name for the ordinality columns. If not /// present, ordinality column is not produced. UnnestNode( @@ -1967,8 +1969,8 @@ class UnnestNode : public PlanNode { const PlanNodePtr& source); /// The order of columns in the output is: replicated columns (in the order - /// specified), unnested columns (in the order specified, for maps: key comes - /// before value), optional ordinality column. + /// specified), unnested columns (in the order specified, for maps: key + /// comes before value), optional ordinality column. const RowTypePtr& outputType() const override { return outputType_; } @@ -2007,8 +2009,8 @@ class UnnestNode : public PlanNode { RowTypePtr outputType_; }; -/// Checks that input contains at most one row. Return that row as is. If input -/// is empty, returns a single row with all values set to null. If input +/// Checks that input contains at most one row. Return that row as is. If +/// input is empty, returns a single row with all values set to null. If input /// contains more than one row raises an exception. /// /// This plan node is used in query plans that use non-correlated sub-queries. @@ -2176,9 +2178,9 @@ class WindowNode : public PlanNode { } bool canSpill(const QueryConfig& queryConfig) const override { - // No partitioning keys means the whole input is one big partition. In this - // case, spilling is not helpful because we need to have a full partition in - // memory to produce results. + // No partitioning keys means the whole input is one big partition. In + // this case, spilling is not helpful because we need to have a full + // partition in memory to produce results. return !partitionKeys_.empty() && !inputsSorted_ && queryConfig.windowSpillEnabled(); } @@ -2240,8 +2242,8 @@ class RowNumberNode : public PlanNode { public: /// @param partitionKeys Partitioning keys. May be empty. /// @param rowNumberColumnName Optional name of the column containing row - /// numbers. If not specified, the output doesn't include 'row number' column. - /// This is used when computing partial results. + /// numbers. If not specified, the output doesn't include 'row number' + /// column. This is used when computing partial results. /// @param limit Optional per-partition limit. If specified, the number of /// rows produced by this node will not exceed this value for any given /// partition. Extra rows will be dropped. @@ -2313,7 +2315,8 @@ class MarkDistinctNode : public PlanNode { return sources_; } - /// The outputType is the concatenation of the input columns and mask column. + /// The outputType is the concatenation of the input columns and mask + /// column. const RowTypePtr& outputType() const override { return outputType_; } @@ -2357,8 +2360,8 @@ class TopNRowNumberNode : public PlanNode { /// with 'partitionKeys'. /// @param sortingOrders Sorting orders, one per sorting key. /// @param rowNumberColumnName Optional name of the column containing row - /// numbers. If not specified, the output doesn't include 'row number' column. - /// This is used when computing partial results. + /// numbers. If not specified, the output doesn't include 'row number' + /// column. This is used when computing partial results. /// @param limit Per-partition limit. The number of /// rows produced by this node will not exceed this value for any given /// partition. Extra rows will be dropped. diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index ad42655b69c1..6bd934d0c4f5 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -300,11 +300,6 @@ class QueryConfig { static constexpr const char* kTaskPartitionedWriterCount = "task_partitioned_writer_count"; - /// The number of local parallel table writer operators per task for - /// bucketed writes. If not set, use "task_writer_count". - static constexpr const char* kTaskBucketedWriterCount = - "task_bucketed_writer_count"; - /// If true, finish the hash probe on an empty build table for a specific set /// of hash joins. static constexpr const char* kHashProbeFinishEarlyOnEmptyBuild = @@ -772,10 +767,6 @@ class QueryConfig { .value_or(taskWriterCount()); } - uint32_t taskBucketedWriterCount() const { - return get(kTaskBucketedWriterCount).value_or(taskWriterCount()); - } - bool hashProbeFinishEarlyOnEmptyBuild() const { return get(kHashProbeFinishEarlyOnEmptyBuild, false); } diff --git a/velox/core/tests/QueryConfigTest.cpp b/velox/core/tests/QueryConfigTest.cpp index 227f85a2fbbe..c89d44d2fdfa 100644 --- a/velox/core/tests/QueryConfigTest.cpp +++ b/velox/core/tests/QueryConfigTest.cpp @@ -57,42 +57,27 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { struct { std::optional numWriterCounter; std::optional numPartitionedWriterCounter; - std::optional numBucketedWriterCounter; int expectedWriterCounter; int expectedPartitionedWriterCounter; - int expectedBucketedWriterCounter; std::string debugString() const { return fmt::format( - "numWriterCounter[{}] numPartitionedWriterCounter[{}] numBucketedWriterCounter[{}] expectedPartitionedWriterCounter[{}] expectedBucketedWriterCounter[{}]", + "numWriterCounter[{}] numPartitionedWriterCounter[{}] expectedWriterCounter[{}] expectedPartitionedWriterCounter[{}]", numWriterCounter.value_or(0), numPartitionedWriterCounter.value_or(0), - numBucketedWriterCounter.value_or(0), expectedWriterCounter, - expectedPartitionedWriterCounter, - expectedBucketedWriterCounter); + expectedPartitionedWriterCounter); } } testSettings[] = { - {std::nullopt, std::nullopt, std::nullopt, 4, 4, 4}, - {std::nullopt, 1, std::nullopt, 4, 1, 4}, - {std::nullopt, 6, std::nullopt, 4, 6, 4}, - {2, 4, std::nullopt, 2, 4, 2}, - {4, 2, std::nullopt, 4, 2, 4}, - {4, 6, std::nullopt, 4, 6, 4}, - {6, 5, std::nullopt, 6, 5, 6}, - {6, 4, std::nullopt, 6, 4, 6}, - {6, std::nullopt, 6, 6, 6, 6}, - {6, std::nullopt, 1, 6, 6, 1}, - {std::nullopt, std::nullopt, 4, 4, 4, 4}, - {std::nullopt, std::nullopt, 1, 4, 4, 1}, - {std::nullopt, 1, 1, 4, 1, 1}, - {std::nullopt, 1, 2, 4, 1, 2}, - {std::nullopt, 6, 6, 4, 6, 6}, - {std::nullopt, 6, 3, 4, 6, 3}, - {2, 4, 3, 2, 4, 3}, - {4, 2, 1, 4, 2, 1}, - {4, 6, 7, 4, 6, 7}, - {6, std::nullopt, 4, 6, 6, 4}}; + {std::nullopt, std::nullopt, 4, 4}, + {std::nullopt, 1, 4, 1}, + {std::nullopt, 6, 4, 6}, + {2, 4, 2, 4}, + {4, 2, 4, 2}, + {4, 6, 4, 6}, + {6, 5, 6, 5}, + {6, 4, 6, 4}, + {6, std::nullopt, 6, 6}}; for (const auto& testConfig : testSettings) { SCOPED_TRACE(testConfig.debugString()); std::unordered_map configData; @@ -106,11 +91,6 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { QueryConfig::kTaskPartitionedWriterCount, std::to_string(testConfig.numPartitionedWriterCounter.value())); } - if (testConfig.numBucketedWriterCounter.has_value()) { - configData.emplace( - QueryConfig::kTaskBucketedWriterCount, - std::to_string(testConfig.numBucketedWriterCounter.value())); - } auto queryCtx = QueryCtx::create(nullptr, QueryConfig{std::move(configData)}); const QueryConfig& config = queryCtx->queryConfig(); @@ -118,9 +98,6 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { ASSERT_EQ( config.taskPartitionedWriterCount(), testConfig.expectedPartitionedWriterCounter); - ASSERT_EQ( - config.taskBucketedWriterCount(), - testConfig.expectedBucketedWriterCounter); } } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 507fa7685419..a2c5f37219d5 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -385,10 +385,6 @@ Table Writer - integer - task_writer_count - The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default. - * - task_bucketed_writer_count - - integer - - task_writer_count - - The number of parallel table writer threads per task for bucketed table writes. If not set, use 'task_writer_count' as default. Hive Connector -------------- diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index 77d538e3aac1..bf99d78e4cf0 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -238,9 +238,7 @@ uint32_t maxDrivers( if (!connectorInsertHandle->supportsMultiThreading()) { return 1; } else { - if (tableWrite->hasBucketProperty()) { - return queryConfig.taskBucketedWriterCount(); - } else if (tableWrite->hasPartitioningScheme()) { + if (tableWrite->hasPartitioningScheme()) { return queryConfig.taskPartitionedWriterCount(); } else { return queryConfig.taskWriterCount(); diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 15b0247033cb..612d7c8b503a 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -102,7 +102,6 @@ std::function addTableWriter( const std::shared_ptr& aggregationNode, const std::shared_ptr& insertHandle, bool hasPartitioningScheme, - bool hasBucketProperty, connector::CommitStrategy commitStrategy = connector::CommitStrategy::kNoCommit) { return [=](core::PlanNodeId nodeId, @@ -114,7 +113,6 @@ std::function addTableWriter( aggregationNode, insertHandle, hasPartitioningScheme, - hasBucketProperty, TableWriteTraits::outputType(aggregationNode), commitStrategy, std::move(source)); @@ -608,8 +606,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), - !partitionedBy.empty(), - bucketProperty != nullptr, + false, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_); if (aggregateResult) { @@ -633,8 +630,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), - !partitionedBy.empty(), - bucketProperty != nullptr, + false, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_) .localPartition(std::vector{}) @@ -675,8 +671,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), - !partitionedBy.empty(), - bucketProperty != nullptr, + false, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_) .localPartition({}) @@ -1605,65 +1600,6 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) { } } -TEST_P(AllTableWriterTest, writerDriverThreads) { - const int batchSize = 1'000; - const int numBatches = 20; - const std::vector vectors = makeVectors(numBatches, batchSize); - - createDuckDbTable(vectors); - - auto queryCtx = core::QueryCtx::create(executor_.get()); - auto outputDirectory = TempDirectoryPath::create(); - core::PlanNodeId tableWriteNodeId; - auto writerPlan = - PlanBuilder() - .values(vectors, /*parallelizable=*/true) - .tableWrite( - outputDirectory->getPath(), - partitionedBy_, - bucketProperty_ != nullptr ? bucketProperty_->bucketCount() : 0, - bucketProperty_ != nullptr ? bucketProperty_->bucketedBy() - : std::vector{}, - bucketProperty_ != nullptr - ? bucketProperty_->sortedBy() - : std::vector>{}) - .capturePlanNodeId(tableWriteNodeId) - .localPartition({}) - .tableWriteMerge() - .project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}) - .planNode(); - const int taskWriterCount = 4; - const int taskPartitionedWriterCount = 8; - const int taskBucketWriterCount = 9; - const auto expectedNumWriterDrivers = bucketProperty_ != nullptr - ? taskBucketWriterCount - : partitionedBy_.empty() ? taskWriterCount - : taskPartitionedWriterCount; - const auto expectedNumRows = - numBatches * batchSize * expectedNumWriterDrivers; - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(queryCtx) - .maxDrivers(10) - .config( - core::QueryConfig::kTaskWriterCount, - std::to_string(taskWriterCount)) - .config( - core::QueryConfig::kTaskPartitionedWriterCount, - std::to_string(taskPartitionedWriterCount)) - .config( - core::QueryConfig::kTaskBucketedWriterCount, - std::to_string(taskBucketWriterCount)) - .plan(std::move(writerPlan)) - .assertResults(fmt::format("SELECT {}", expectedNumRows)); - auto planStats = exec::toPlanStats(task->taskStats()); - auto& tableWriteStats = planStats.at(tableWriteNodeId); - ASSERT_EQ(tableWriteStats.numDrivers, expectedNumWriterDrivers); -} - TEST_P(AllTableWriterTest, renameAndReorderColumns) { auto filePaths = makeFilePaths(5); auto vectors = makeVectors(filePaths.size(), 500); @@ -2900,7 +2836,6 @@ TEST_P(AllTableWriterTest, columnStatsDataTypes) { partitionedBy_, nullptr, makeLocationHandle(outputDirectory->getPath()))), - !partitionedBy_.empty(), false, CommitStrategy::kNoCommit)) .planNode(); @@ -2990,8 +2925,7 @@ TEST_P(AllTableWriterTest, columnStats) { partitionedBy_, bucketProperty_, makeLocationHandle(outputDirectory->getPath()))), - !partitionedBy_.empty(), - bucketProperty_ != nullptr, + false, commitStrategy_)) .planNode(); @@ -3091,8 +3025,7 @@ TEST_P(AllTableWriterTest, columnStatsWithTableWriteMerge) { partitionedBy_, bucketProperty_, makeLocationHandle(outputDirectory->getPath()))), - !partitionedBy_.empty(), - bucketProperty_ != nullptr, + false, commitStrategy_)); auto mergeAggregationNode = generateAggregationNode( diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index ae0a40b26019..ae5cd8b16a22 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -471,8 +471,7 @@ PlanBuilder& PlanBuilder::tableWrite( rowType->names(), aggregationNode, insertHandle, - !partitionBy.empty(), - bucketProperty != nullptr, + false, TableWriteTraits::outputType(aggregationNode), connector::CommitStrategy::kNoCommit, planNode_);