diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index aac5583e3e7e..f9970f3f2267 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -324,6 +324,8 @@ class ValuesNode : public PlanNode { const size_t repeatTimes_; }; +using ValuesNodePtr = std::shared_ptr; + class ArrowStreamNode : public PlanNode { public: ArrowStreamNode( diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index 856373b54fb4..d8214636d4d1 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_fuzzer_util DuckQueryRunner.cpp PrestoQueryRunner.cpp - FuzzerUtil.cpp ToSQLUtil.cpp) +add_library(velox_fuzzer_util ReferenceQueryRunner.cpp DuckQueryRunner.cpp + PrestoQueryRunner.cpp FuzzerUtil.cpp ToSQLUtil.cpp) target_link_libraries( velox_fuzzer_util diff --git a/velox/exec/fuzzer/DuckQueryRunner.cpp b/velox/exec/fuzzer/DuckQueryRunner.cpp index d6d606f6497e..7758263e5223 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.cpp +++ b/velox/exec/fuzzer/DuckQueryRunner.cpp @@ -13,6 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +#include +#include +#include + #include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/exec/fuzzer/ToSQLUtil.h" #include "velox/exec/tests/utils/QueryAssertions.h" @@ -104,21 +109,22 @@ DuckQueryRunner::aggregationFunctionDataSpecs() const { std::multiset> DuckQueryRunner::execute( const std::string& sql, - const std::vector& input, - const RowTypePtr& resultType) { + const core::PlanNodePtr& plan) { DuckDbQueryRunner queryRunner; - queryRunner.createTable("tmp", input); - return queryRunner.execute(sql, resultType); + std::unordered_map> inputMap = + getAllTables(plan); + for (const auto& [tableName, input] : inputMap) { + queryRunner.createTable(tableName, input); + } + return queryRunner.execute(sql, plan->outputType()); } std::multiset> DuckQueryRunner::execute( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, + const std::vector& input, const RowTypePtr& resultType) { DuckDbQueryRunner queryRunner; - queryRunner.createTable("t", probeInput); - queryRunner.createTable("u", buildInput); + queryRunner.createTable("tmp", input); return queryRunner.execute(sql, resultType); } @@ -164,6 +170,11 @@ std::optional DuckQueryRunner::toSql( return toSql(joinNode); } + if (const auto valuesNode = + std::dynamic_pointer_cast(plan)) { + return toSql(valuesNode); + } + VELOX_NYI(); } @@ -340,137 +351,4 @@ std::optional DuckQueryRunner::toSql( return sql.str(); } - -std::optional DuckQueryRunner::toSql( - const std::shared_ptr& joinNode) { - const auto& joinKeysToSql = [](auto keys) { - std::stringstream out; - for (auto i = 0; i < keys.size(); ++i) { - if (i > 0) { - out << ", "; - } - out << keys[i]->name(); - } - return out.str(); - }; - - const auto filterToSql = [](core::TypedExprPtr filter) { - auto call = std::dynamic_pointer_cast(filter); - return toCallSql(call); - }; - - const auto& joinConditionAsSql = [&](auto joinNode) { - std::stringstream out; - for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { - if (i > 0) { - out << " AND "; - } - out << joinNode->leftKeys()[i]->name() << " = " - << joinNode->rightKeys()[i]->name(); - } - if (joinNode->filter()) { - out << " AND " << filterToSql(joinNode->filter()); - } - return out.str(); - }; - - const auto& outputNames = joinNode->outputType()->names(); - - std::stringstream sql; - if (joinNode->isLeftSemiProjectJoin()) { - sql << "SELECT " - << folly::join(", ", outputNames.begin(), --outputNames.end()); - } else { - sql << "SELECT " << folly::join(", ", outputNames); - } - - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeftSemiFilter: - // Multiple columns returned by a scalar subquery is not supported in - // DuckDB. A scalar subquery expression is a subquery that returns one - // result row from exactly one column for every input row. - if (joinNode->leftKeys().size() > 1) { - return std::nullopt; - } - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - break; - case core::JoinType::kLeftSemiProject: - if (joinNode->isNullAware()) { - sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " - << joinKeysToSql(joinNode->rightKeys()) << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ") FROM t"; - } else { - sql << ", EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ") FROM t"; - } - break; - case core::JoinType::kAnti: - if (joinNode->isNullAware()) { - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - } else { - sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ")"; - } - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - - return sql.str(); -} - -std::optional DuckQueryRunner::toSql( - const std::shared_ptr& joinNode) { - std::stringstream sql; - sql << "SELECT " << folly::join(", ", joinNode->outputType()->names()); - - // Nested loop join without filter. - VELOX_CHECK( - joinNode->joinCondition() == nullptr, - "This code path should be called only for nested loop join without filter"); - const std::string joinCondition{"(1 = 1)"}; - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinCondition; - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinCondition; - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - - return sql.str(); -} } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/DuckQueryRunner.h b/velox/exec/fuzzer/DuckQueryRunner.h index 4fa826af0488..3336ffa1668c 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.h +++ b/velox/exec/fuzzer/DuckQueryRunner.h @@ -15,6 +15,10 @@ */ #pragma once +#include +#include +#include + #include "velox/exec/fuzzer/ReferenceQueryRunner.h" namespace facebook::velox::exec::test { @@ -46,20 +50,21 @@ class DuckQueryRunner : public ReferenceQueryRunner { /// Assumes that source of AggregationNode or Window Node is 'tmp' table. std::optional toSql(const core::PlanNodePtr& plan) override; - /// Creates 'tmp' table with 'input' data and runs 'sql' query. Returns - /// results according to 'resultType' schema. + /// Executes SQL query returned by the 'toSql' method based on the plan. std::multiset> execute( const std::string& sql, - const std::vector& input, - const RowTypePtr& resultType) override; + const core::PlanNodePtr& plan) override; + /// Creates 'tmp' table with 'input' data and runs 'sql' query. Returns + /// results according to 'resultType' schema. std::multiset> execute( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, + const std::vector& input, const RowTypePtr& resultType) override; private: + using ReferenceQueryRunner::toSql; + std::optional toSql( const std::shared_ptr& aggregationNode); @@ -72,12 +77,6 @@ class DuckQueryRunner : public ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& rowNumberNode); - std::optional toSql( - const std::shared_ptr& joinNode); - - std::optional toSql( - const std::shared_ptr& joinNode); - std::unordered_set aggregateFunctionNames_; }; diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 1860eca9df0b..7fae551560c0 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -82,34 +82,36 @@ class JoinFuzzer { void go(); + struct InputAndSplit { + const std::vector* input; + const std::vector split; + }; + struct PlanWithSplits { core::PlanNodePtr plan; - core::PlanNodeId probeScanId; - core::PlanNodeId buildScanId; - std::unordered_map> - splits; + std::unordered_map splits; core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; int32_t numGroups; explicit PlanWithSplits( const core::PlanNodePtr& _plan, - const core::PlanNodeId& _probeScanId = "", - const core::PlanNodeId& _buildScanId = "", - const std::unordered_map< - core::PlanNodeId, - std::vector>& _splits = {}, + const std::unordered_map& _splits = {}, core::ExecutionStrategy _executionStrategy = core::ExecutionStrategy::kUngrouped, int32_t _numGroups = 0) : plan(_plan), - probeScanId(_probeScanId), - buildScanId(_buildScanId), splits(_splits), executionStrategy(_executionStrategy), numGroups(_numGroups) {} }; + static core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode); + static core::PlanNodePtr tryFlipJoinSides( + const core::MergeJoinNode& joinNode); + static core::PlanNodePtr tryFlipJoinSides( + const core::NestedLoopJoinNode& joinNode); + private: static VectorFuzzer::Options getFuzzerOptions() { VectorFuzzer::Options opts; @@ -135,95 +137,81 @@ class JoinFuzzer { // Randomly pick a join type to test. core::JoinType pickJoinType(); - // Makes the query plan with default settings in JoinFuzzer and value inputs - // for both probe and build sides. + template + static std::pair tryFlipJoinSidesHelper( + const TNode& joinNode); + + // Makes the query plan with default settings in JoinFuzzer using inputs + // joining each input from left to right. // - // NOTE: 'probeInput' and 'buildInput' could either input rows with lazy - // vectors or flatten ones. + // NOTE: inputs can be rows with lazy vectors or flattened ones. JoinFuzzer::PlanWithSplits makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList); JoinFuzzer::PlanWithSplits makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); + const std::vector& joinTypes, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. - // If withFilter is true, uses the equality filter between probeKeys and - // buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); + const std::vector& joinTypes, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList); - // Makes the default query plan with table scan as inputs for both probe and - // build sides. + // Makes the default query plan with table scan as inputs for all of the + // inputs. JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList); JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList); // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan - // nodes. If withFilter is true, uses the equiality filter between probeKeys - // and buildKeys as the join filter. Uses empty join filter otherwise. + // nodes. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList); void makeAlternativePlans( - const core::PlanNodePtr& plan, - const std::vector& probeInput, - const std::vector& buildInput, - std::vector& plans, - const std::string& filter); - - // Makes the query plan from 'planWithTableScan' with grouped execution mode. - // Correspondingly, it replaces the table scan input splits with grouped ones. - JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( - const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits); + const PlanWithSplits& plan, + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& plans); // Runs one test iteration from query plans generations, executions and result // verifications. @@ -252,15 +240,16 @@ class JoinFuzzer { void addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter); + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector probeTypeList, + const std::vector buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& altPlans); // Splits the input into groups by partitioning on the join keys. std::vector> splitInputByGroup( @@ -271,9 +260,9 @@ class JoinFuzzer { // Generates the grouped splits. std::vector generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const std::string& name, + const size_t numKeys, const std::vector& input); RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); @@ -289,8 +278,6 @@ class JoinFuzzer { RowVectorPtr testCrossProduct( const std::string& tableDir, core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput); @@ -528,13 +515,18 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [planNodeId, nodeSplits] : plan.splits) { - builder.splits(planNodeId, nodeSplits); + for (const auto& [planNodeId, inputAndSplit] : plan.splits) { + builder.splits(planNodeId, inputAndSplit.split); } if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { builder.executionStrategy(core::ExecutionStrategy::kGrouped); - builder.groupedExecutionLeafNodeIds({plan.probeScanId, plan.buildScanId}); + std::unordered_set ids; + ids.reserve(plan.splits.size()); + for (const auto& [id, _] : plan.splits) { + ids.insert(id); + } + builder.groupedExecutionLeafNodeIds(ids); builder.numSplitGroups(plan.numGroups); builder.numConcurrentSplitGroups(randInt(1, plan.numGroups)); } @@ -605,67 +597,100 @@ std::optional tryFlipJoinType(core::JoinType joinType) { } } +template +std::pair +JoinFuzzer::tryFlipJoinSidesHelper(const TNode& joinNode) { + core::PlanNodePtr left = joinNode.sources()[0]; + core::PlanNodePtr right = joinNode.sources()[1]; + if (auto leftJoinInput = + std::dynamic_pointer_cast(joinNode.sources()[0])) { + left = JoinFuzzer::tryFlipJoinSides(*leftJoinInput); + } + if (auto rightJoinInput = + std::dynamic_pointer_cast(joinNode.sources()[1])) { + right = JoinFuzzer::tryFlipJoinSides(*rightJoinInput); + } + return make_pair(left, right); +} + // Returns a plan with flipped join sides of the input hash join node. If the -// join type doesn't allow flipping, returns a nullptr. -core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode) { +// inputs of the join node are other hash join nodes, recursively flip the join +// sides of those join nodes as well. If the join type doesn't allow flipping, +// returns a nullptr. +core::PlanNodePtr JoinFuzzer::tryFlipJoinSides( + const core::HashJoinNode& joinNode) { // Null-aware right semi project join doesn't support filter. if (joinNode.filter() && joinNode.joinType() == core::JoinType::kLeftSemiProject && joinNode.isNullAware()) { return nullptr; } + auto flippedJoinType = tryFlipJoinType(joinNode.joinType()); - if (!flippedJoinType.has_value()) { + if (!flippedJoinType) { return nullptr; } + auto [left, right] = + JoinFuzzer::tryFlipJoinSidesHelper(joinNode); return std::make_shared( joinNode.id(), - flippedJoinType.value(), + *flippedJoinType, joinNode.isNullAware(), joinNode.rightKeys(), joinNode.leftKeys(), joinNode.filter(), - joinNode.sources()[1], - joinNode.sources()[0], + right, + left, joinNode.outputType()); } // Returns a plan with flipped join sides of the input merge join node. If the +// inputs of the join node are other merge join nodes, recursively flip the join +// sides of those join nodes as well. If the // join type doesn't allow flipping, returns a nullptr. -core::PlanNodePtr tryFlipJoinSides(const core::MergeJoinNode& joinNode) { +core::PlanNodePtr JoinFuzzer::tryFlipJoinSides( + const core::MergeJoinNode& joinNode) { // Merge join only supports inner and left join, so only inner join can be // flipped. if (joinNode.joinType() != core::JoinType::kInner) { return nullptr; } - auto flippedJoinType = core::JoinType::kInner; + + auto [left, right] = + JoinFuzzer::tryFlipJoinSidesHelper(joinNode); return std::make_shared( joinNode.id(), - flippedJoinType, + core::JoinType::kInner, joinNode.rightKeys(), joinNode.leftKeys(), joinNode.filter(), - joinNode.sources()[1], - joinNode.sources()[0], + right, + left, joinNode.outputType()); } // Returns a plan with flipped join sides of the input nested loop join node. If -// the join type doesn't allow flipping, returns a nullptr. -core::PlanNodePtr tryFlipJoinSides(const core::NestedLoopJoinNode& joinNode) { +// the inputs of the join node are other nested loop join nodes, recursively +// flip the join sides of those join nodes as well. If the join type doesn't +// allow flipping, returns a nullptr. +core::PlanNodePtr JoinFuzzer::tryFlipJoinSides( + const core::NestedLoopJoinNode& joinNode) { auto flippedJoinType = tryFlipJoinType(joinNode.joinType()); - if (!flippedJoinType.has_value()) { + if (!flippedJoinType) { return nullptr; } + auto [left, right] = + JoinFuzzer::tryFlipJoinSidesHelper(joinNode); + return std::make_shared( joinNode.id(), flippedJoinType.value(), joinNode.joinCondition(), - joinNode.sources()[1], - joinNode.sources()[0], + right, + left, joinNode.outputType()); } @@ -680,10 +705,8 @@ std::optional JoinFuzzer::computeReferenceResults( } if (auto sql = referenceQueryRunner_->toSql(plan)) { - return referenceQueryRunner_->execute( - sql.value(), probeInput, buildInput, plan->outputType()); + return referenceQueryRunner_->execute(*sql, plan); } - LOG(INFO) << "Query not supported by the reference DB"; return std::nullopt; } @@ -699,79 +722,85 @@ std::vector fieldNames( } JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - auto plan = + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) - .values(probeInput) + .values(inputs[0]) .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - filter, - outputColumns, - joinType, - nullAware) - .planNode(); - return PlanWithSplits{plan}; + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator).values(inputs[1]).planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator).values(inputs[i + 1]).planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + } + return PlanWithSplits{plan.planNode()}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputsAndSplits.size() > 1); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType, - nullAware) - .planNode(); - return PlanWithSplits{ - plan, - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; -} - -JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( - const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits) { - return PlanWithSplits{ - planWithTableScan.plan, - planWithTableScan.probeScanId, - planWithTableScan.buildScanId, - {{planWithTableScan.probeScanId, groupedProbeScanSplits}, - {planWithTableScan.buildScanId, groupedBuildScanSplits}}, - core::ExecutionStrategy::kGrouped, - numGroups}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .hashJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } std::vector makeSources( @@ -810,141 +839,167 @@ std::string makeJoinFilter( template void addFlippedJoinPlan( - const core::PlanNodePtr& plan, - std::vector& plans, - const core::PlanNodeId& probeScanId = "", - const core::PlanNodeId& buildScanId = "", - const std::unordered_map>& - splits = {}, - core::ExecutionStrategy executionStrategy = - core::ExecutionStrategy::kUngrouped, - int32_t numGroups = 0) { - auto joinNode = std::dynamic_pointer_cast(plan); + const JoinFuzzer::PlanWithSplits& plan, + std::vector& plans) { + auto joinNode = std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); - if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { + if (auto flippedPlan = JoinFuzzer::tryFlipJoinSides(*joinNode)) { plans.push_back(JoinFuzzer::PlanWithSplits{ - flippedPlan, - probeScanId, - buildScanId, - splits, - executionStrategy, - numGroups}); + flippedPlan, plan.splits, plan.executionStrategy, plan.numGroups}); } } JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .values(buildInput) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode()}; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .values(inputs[0]) + .orderBy(probeKeysList[0], false) + .mergeJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .values(inputs[1]) + .orderBy(buildKeysList[0], false) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.mergeJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .values(inputs[i + 1]) + .orderBy(buildKeysList[i], false) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i]); + } + return PlanWithSplits{plan.planNode()}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{ + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) - .values(probeInput) + .values(inputs[0]) .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - filter, - outputColumns, - joinType) - .planNode()}; + PlanBuilder(planNodeIdGenerator).values(inputs[1]).planNode(), + joinConditionList[0], + outputColumnsList[0], + joinTypes[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.nestedLoopJoin( + PlanBuilder(planNodeIdGenerator).values(inputs[i + 1]).planNode(), + joinConditionList[i], + outputColumnsList[i], + joinTypes[i]); + } + return PlanWithSplits{plan.planNode()}; } void JoinFuzzer::makeAlternativePlans( - const core::PlanNodePtr& plan, - const std::vector& probeInput, - const std::vector& buildInput, - std::vector& plans, - const std::string& filter) { - auto joinNode = std::dynamic_pointer_cast(plan); + const PlanWithSplits& plan, + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& plans) { + auto joinNode = + std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); // Flip join sides. addFlippedJoinPlan(plan, plans); - // Parallelize probe and build sides. - const auto probeKeys = fieldNames(joinNode->leftKeys()); - const auto buildKeys = fieldNames(joinNode->rightKeys()); - const auto outputColumns = joinNode->outputType()->names(); - const auto joinType = joinNode->joinType(); - + // Add plan with local partition round robin for inputs. auto planNodeIdGenerator = std::make_shared(); - plans.push_back(JoinFuzzer::PlanWithSplits{ + PlanBuilder partitionPlan = PlanBuilder(planNodeIdGenerator) - .localPartitionRoundRobin( - makeSources(probeInput, planNodeIdGenerator)) + .localPartitionRoundRobin(makeSources(inputs[0], planNodeIdGenerator)) .hashJoin( - probeKeys, - buildKeys, + probeKeysList[0], + buildKeysList[0], PlanBuilder(planNodeIdGenerator) .localPartitionRoundRobin( - makeSources(buildInput, planNodeIdGenerator)) + makeSources(inputs[1], planNodeIdGenerator)) .planNode(), - filter, - outputColumns, - joinType, - joinNode->isNullAware()) - .planNode()}); + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + for (int i = 1; i < inputs.size() - 1; i++) { + partitionPlan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .localPartitionRoundRobin( + makeSources(inputs[i + 1], planNodeIdGenerator)) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + } + plans.push_back(PlanWithSplits{partitionPlan.planNode()}); + bool mergeJoinSupported = true; + bool NLJSupported = true; + for (const core::JoinType& joinType : joinTypes) { + if (!core::NestedLoopJoinNode::isSupported(joinType)) { + NLJSupported = false; + } + if (!core::MergeJoinNode::isSupported(joinType)) { + mergeJoinSupported = false; + } + } // Use OrderBy + MergeJoin - if (core::MergeJoinNode::isSupported(joinNode->joinType())) { + if (mergeJoinSupported) { auto planWithSplits = makeMergeJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - filter); + joinTypes, + probeKeysList, + buildKeysList, + inputs, + outputColumnsList, + filterList); plans.push_back(planWithSplits); - addFlippedJoinPlan(planWithSplits.plan, plans); + addFlippedJoinPlan(planWithSplits, plans); } // Use NestedLoopJoin. - if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { - std::string joinCondition = filter.empty() - ? makeJoinFilter(probeKeys, buildKeys) - : fmt::format( - "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); + if (NLJSupported) { + std::vector joinConditionList; + for (int i = 0; i < probeKeysList.size(); i++) { + joinConditionList.push_back( + filterList[0].empty() + ? makeJoinFilter(probeKeysList[i], buildKeysList[i]) + : fmt::format( + "{} AND {}", + makeJoinFilter(probeKeysList[i], buildKeysList[i]), + filterList[i])); + } auto planWithSplits = makeNestedLoopJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - joinCondition); + joinTypes, inputs, outputColumnsList, joinConditionList); plans.push_back(planWithSplits); - addFlippedJoinPlan(planWithSplits.plan, plans); + addFlippedJoinPlan(planWithSplits, plans); } } @@ -972,8 +1027,6 @@ void JoinFuzzer::shuffleJoinKeys( RowVectorPtr JoinFuzzer::testCrossProduct( const std::string& tableDir, core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput) { VELOX_CHECK_GT(probeInput.size(), 0); @@ -986,13 +1039,10 @@ RowVectorPtr JoinFuzzer::testCrossProduct( ->names(); auto plan = makeNestedLoopJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - /*filter=*/""); + {joinType}, + {probeInput, buildInput}, + {outputColumns}, + /*filterList=*/{""}); const auto expected = execute(plan, /*injectSpill=*/false); // If OOM injection is not enabled verify the results against Reference query @@ -1013,23 +1063,22 @@ RowVectorPtr JoinFuzzer::testCrossProduct( std::vector altPlans; if (isTableScanSupported(probeInput[0]->type()) && isTableScanSupported(buildInput[0]->type())) { - std::vector probeScanSplits = - makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + InputAndSplit probeInputAndSplit{ + &probeInput, + makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_)}; + InputAndSplit buildInputAndSplit{ + &buildInput, + makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_)}; altPlans.push_back(makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - /*filter=*/"")); - } - addFlippedJoinPlan(plan.plan, altPlans); + {joinType}, + {probeType}, + {buildType}, + {probeInputAndSplit, buildInputAndSplit}, + {outputColumns}, + /*filter=*/{""})); + } + addFlippedJoinPlan(plan, altPlans); for (const auto& altPlan : altPlans) { auto actual = execute(altPlan, /*injectSpill=*/false); @@ -1100,19 +1149,9 @@ void JoinFuzzer::verify(core::JoinType joinType) { stats_.numCrossProduct++; auto result = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput); + tableScanDir->getPath(), joinType, probeInput, buildInput); auto flatResult = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput); + tableScanDir->getPath(), joinType, flatProbeInput, flatBuildInput); assertEqualResults({result}, {flatResult}); } } @@ -1140,14 +1179,13 @@ void JoinFuzzer::verify(core::JoinType joinType) { shuffleJoinKeys(probeKeys, buildKeys); const auto defaultPlan = makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - filter); + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {probeInput, buildInput}, + {outputColumns}, + {filter}); const auto expected = execute(defaultPlan, /*injectSpill=*/false); @@ -1170,31 +1208,47 @@ void JoinFuzzer::verify(core::JoinType joinType) { std::vector altPlans; altPlans.push_back(makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - outputColumns, - filter)); + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter})); makeAlternativePlans( - defaultPlan.plan, probeInput, buildInput, altPlans, filter); + defaultPlan, + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {probeInput, buildInput}, + {outputColumns}, + {filter}, + altPlans); makeAlternativePlans( - defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans, filter); + defaultPlan, + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter}, + altPlans); addPlansWithTableScan( tableScanDir->getPath(), - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - outputColumns, - altPlans, - filter); + {joinType}, + {nullAware}, + {asRowType(flatProbeInput[0]->type())}, + {asRowType(flatBuildInput[0]->type())}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter}, + altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; @@ -1239,111 +1293,128 @@ void JoinFuzzer::verify(core::JoinType joinType) { } JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputsAndSplits.size() > 1); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .orderBy(probeKeysList[0], false) + .mergeJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .orderBy(buildKeysList[0], false) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.mergeJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .nestedLoopJoin( + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .planNode(), + joinConditionList[0], + outputColumnsList[0], + joinTypes[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.nestedLoopJoin( + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + joinConditionList[i], + outputColumnsList[i], + joinTypes[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } void JoinFuzzer::addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector probeTypeList, + const std::vector buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& altPlans) { VELOX_CHECK(!tableDir.empty()); + VELOX_CHECK(inputs.size() > 1); - if (!isTableScanSupported(probeInput[0]->type()) || - !isTableScanSupported(buildInput[0]->type())) { - return; - } - - std::vector probeScanSplits = - makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + std::vector inputsAndSplits; + for (int i = 0; i < inputs.size(); i++) { + if (!isTableScanSupported(inputs[i][0]->type())) { + return; + } - const auto probeType = asRowType(probeInput[0]->type()); - const auto buildType = asRowType(buildInput[0]->type()); + inputsAndSplits.push_back(InputAndSplit{ + &inputs[i], + makeSplits( + inputs[i], fmt::format("{}/table_{}", tableDir, i), writerPool_)}); + } std::vector plansWithTableScan; auto defaultPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + joinTypes, + nullAwareList, + probeTypeList, + buildTypeList, + probeKeysList, + buildKeysList, + inputsAndSplits, + outputColumnsList, + filterList); plansWithTableScan.push_back(defaultPlan); auto joinNode = @@ -1351,94 +1422,86 @@ void JoinFuzzer::addPlansWithTableScan( VELOX_CHECK_NOT_NULL(joinNode); // Flip join sides. - addFlippedJoinPlan( - defaultPlan.plan, - plansWithTableScan, - defaultPlan.probeScanId, - defaultPlan.buildScanId, - defaultPlan.splits); - - const int32_t numGroups = randInt(1, probeScanSplits.size()); - const std::vector groupedProbeScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/true, - probeKeys.size(), - probeInput); - const std::vector groupedBuildScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/false, - buildKeys.size(), - buildInput); + addFlippedJoinPlan(defaultPlan, plansWithTableScan); + + const int32_t numGroups = randInt(1, inputsAndSplits[0].split.size()); + std::unordered_map groupedSplits; + for (int i = 0; const auto& [id, inputAndSplit] : defaultPlan.splits) { + groupedSplits.insert( + {id, + InputAndSplit{ + inputAndSplit.input, + generateSplitsWithGroup( + tableDir, + numGroups, + /*name=*/fmt::format("table_{}", i), + /*numKeys=*/i == 0 ? probeKeysList[i].size() + : buildKeysList[i - 1].size(), + *inputAndSplit.input)}}); + i++; + } for (const auto& planWithTableScan : plansWithTableScan) { altPlans.push_back(planWithTableScan); - altPlans.push_back(makeGroupedExecutionPlanWithTableScan( - planWithTableScan, - numGroups, - groupedProbeScanSplits, - groupedBuildScanSplits)); + altPlans.push_back(PlanWithSplits{ + planWithTableScan.plan, + groupedSplits, + core::ExecutionStrategy::kGrouped, + numGroups}); } // Add ungrouped MergeJoin with TableScan. if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + joinTypes, + probeTypeList, + buildTypeList, + probeKeysList, + buildKeysList, + inputsAndSplits, + outputColumnsList, + filterList); altPlans.push_back(planWithSplits); - addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + addFlippedJoinPlan(planWithSplits, altPlans); } // Add ungrouped NestedLoopJoin with TableScan. - if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { - std::string joinCondition = filter.empty() - ? makeJoinFilter(probeKeys, buildKeys) - : fmt::format( - "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); + bool NLJSupported = true; + for (const core::JoinType joinType : joinTypes) { + if (!core::NestedLoopJoinNode::isSupported(joinType)) { + NLJSupported = false; + } + } + if (NLJSupported) { + std::vector joinConditionList; + for (int i = 0; i < probeKeysList.size(); i++) { + joinConditionList.push_back( + filterList[i].empty() + ? makeJoinFilter(probeKeysList[i], buildKeysList[i]) + : fmt::format( + "{} AND {}", + makeJoinFilter(probeKeysList[i], buildKeysList[i]), + filterList[i])); + } auto planWithSplits = makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - joinCondition); + joinTypes, + probeTypeList, + buildTypeList, + inputsAndSplits, + outputColumnsList, + joinConditionList); altPlans.push_back(planWithSplits); - addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + addFlippedJoinPlan(planWithSplits, altPlans); } } std::vector JoinFuzzer::generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const std::string& name, + const size_t numKeys, const std::vector& input) { const std::vector> inputVectorsByGroup = splitInputByGroup(numGroups, numKeys, input); @@ -1446,12 +1509,8 @@ std::vector JoinFuzzer::generateSplitsWithGroup( std::vector splitsWithGroup; for (int32_t groupId = 0; groupId < numGroups; ++groupId) { for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { - const std::string filePath = fmt::format( - "{}/grouped[{}].{}.{}", - tableDir, - groupId, - isProbe ? "probe" : "build", - i); + const std::string filePath = + fmt::format("{}/grouped[{}].{}.{}", tableDir, groupId, name, i); writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); splitsWithGroup.emplace_back(makeConnectorSplit(filePath), groupId); } diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index c8bba9cdb64d..913b0215b4a6 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -14,13 +14,13 @@ * limitations under the License. */ -#include "velox/exec/fuzzer/PrestoQueryRunner.h" #include // @manual #include #include +#include + #include "velox/common/base/Fs.h" #include "velox/common/encode/Base64.h" -#include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/Expressions.h" @@ -28,6 +28,7 @@ #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/fuzzer/ToSQLUtil.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/functions/prestosql/types/IPAddressType.h" @@ -36,8 +37,6 @@ #include "velox/serializers/PrestoSerializer.h" #include "velox/type/parser/TypeParser.h" -#include - using namespace facebook::velox; namespace facebook::velox::exec::test { @@ -221,20 +220,6 @@ std::string toWindowCallSql( return sql.str(); } -bool isSupportedDwrfType(const TypePtr& type) { - if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { - return false; - } - - for (auto i = 0; i < type->size(); ++i) { - if (!isSupportedDwrfType(type->childAt(i))) { - return false; - } - } - - return true; -} - } // namespace const std::vector& PrestoQueryRunner::supportedScalarTypes() const { @@ -554,152 +539,10 @@ std::optional PrestoQueryRunner::toSql( return sql.str(); } -std::optional PrestoQueryRunner::toSql( - const std::shared_ptr& joinNode) { - if (!isSupportedDwrfType(joinNode->sources()[0]->outputType())) { - return std::nullopt; - } - - if (!isSupportedDwrfType(joinNode->sources()[1]->outputType())) { - return std::nullopt; - } - - const auto joinKeysToSql = [](auto keys) { - std::stringstream out; - for (auto i = 0; i < keys.size(); ++i) { - if (i > 0) { - out << ", "; - } - out << keys[i]->name(); - } - return out.str(); - }; - - const auto filterToSql = [](core::TypedExprPtr filter) { - auto call = std::dynamic_pointer_cast(filter); - return toCallSql(call); - }; - - const auto& joinConditionAsSql = [&](auto joinNode) { - std::stringstream out; - for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { - if (i > 0) { - out << " AND "; - } - out << joinNode->leftKeys()[i]->name() << " = " - << joinNode->rightKeys()[i]->name(); - } - if (joinNode->filter()) { - out << " AND " << filterToSql(joinNode->filter()); - } - return out.str(); - }; - - const auto& outputNames = joinNode->outputType()->names(); - - std::stringstream sql; - if (joinNode->isLeftSemiProjectJoin()) { - sql << "SELECT " - << folly::join(", ", outputNames.begin(), --outputNames.end()); - } else { - sql << "SELECT " << folly::join(", ", outputNames); - } - - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeftSemiFilter: - // Multiple columns returned by a scalar subquery is not supported in - // Presto. A scalar subquery expression is a subquery that returns one - // result row from exactly one column for every input row. - if (joinNode->leftKeys().size() > 1) { - return std::nullopt; - } - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - break; - case core::JoinType::kLeftSemiProject: - if (joinNode->isNullAware()) { - sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " - << joinKeysToSql(joinNode->rightKeys()) << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ") FROM t"; - } else { - sql << ", EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ") FROM t"; - } - break; - case core::JoinType::kAnti: - if (joinNode->isNullAware()) { - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - } else { - sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ")"; - } - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - return sql.str(); -} - -std::optional PrestoQueryRunner::toSql( - const std::shared_ptr& joinNode) { - std::stringstream sql; - sql << "SELECT " << folly::join(", ", joinNode->outputType()->names()); - - // Nested loop join without filter. - VELOX_CHECK( - joinNode->joinCondition() == nullptr, - "This code path should be called only for nested loop join without filter"); - const std::string joinCondition{"(1 = 1)"}; - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinCondition; - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinCondition; - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - - return sql.str(); -} - -std::optional PrestoQueryRunner::toSql( - const std::shared_ptr& valuesNode) { - if (!isSupportedDwrfType(valuesNode->outputType())) { - return std::nullopt; - } - return "tmp"; +std::multiset> PrestoQueryRunner::execute( + const std::string& sql, + const core::PlanNodePtr& plan) { + return exec::test::materialize(executeAndReturnVector(sql, plan)); } std::multiset> PrestoQueryRunner::execute( @@ -709,15 +552,6 @@ std::multiset> PrestoQueryRunner::execute( return exec::test::materialize(executeVector(sql, input, resultType)); } -std::multiset> PrestoQueryRunner::execute( - const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) { - return exec::test::materialize( - executeVector(sql, probeInput, buildInput, resultType)); -} - std::string PrestoQueryRunner::createTable( const std::string& name, const TypePtr& type) { @@ -749,40 +583,31 @@ std::string PrestoQueryRunner::createTable( return tableDirectoryPath; } -std::vector PrestoQueryRunner::executeVector( +std::vector PrestoQueryRunner::executeAndReturnVector( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const velox::RowTypePtr& resultType) { - auto probeType = asRowType(probeInput[0]->type()); - if (probeType->size() == 0) { - auto rowVector = makeNullRows(probeInput, "x", pool()); - return executeVector(sql, {rowVector}, buildInput, resultType); - } - - auto buildType = asRowType(buildInput[0]->type()); - if (probeType->size() == 0) { - auto rowVector = makeNullRows(buildInput, "y", pool()); - return executeVector(sql, probeInput, {rowVector}, resultType); + const core::PlanNodePtr& plan) { + std::unordered_map> inputMap = + getAllTables(plan); + for (const auto& [tableName, input] : inputMap) { + auto inputType = asRowType(input[0]->type()); + if (inputType->size() == 0) { + inputMap[tableName] = { + makeNullRows(input, fmt::format("{}x", tableName), pool())}; + } } - auto probeTableDirectoryPath = createTable("t", probeInput[0]->type()); - auto buildTableDirectoryPath = createTable("u", buildInput[0]->type()); - - // Create a new file in table's directory with fuzzer-generated data. - auto probeFilePath = fs::path(probeTableDirectoryPath) - .append("probe.dwrf") - .string() - .substr(strlen("file:")); + auto writerPool = aggregatePool()->addAggregateChild("writer"); + for (const auto& [tableName, input] : inputMap) { + auto tableDirectoryPath = createTable(tableName, input[0]->type()); - auto buildFilePath = fs::path(buildTableDirectoryPath) - .append("build.dwrf") - .string() - .substr(strlen("file:")); + // Create a new file in table's directory with fuzzer-generated data. + auto filePath = fs::path(tableDirectoryPath) + .append(fmt::format("{}.dwrf", tableName)) + .string() + .substr(strlen("file:")); - auto writerPool = aggregatePool()->addAggregateChild("writer"); - writeToFile(probeFilePath, probeInput, writerPool.get()); - writeToFile(buildFilePath, buildInput, writerPool.get()); + writeToFile(filePath, input, writerPool.get()); + } // Run the query. return execute(sql); diff --git a/velox/exec/fuzzer/PrestoQueryRunner.h b/velox/exec/fuzzer/PrestoQueryRunner.h index a72cae913e10..31d1604cacfc 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.h +++ b/velox/exec/fuzzer/PrestoQueryRunner.h @@ -18,7 +18,6 @@ #include #include -#include "velox/common/memory/Memory.h" #include "velox/exec/fuzzer/ReferenceQueryRunner.h" #include "velox/vector/ComplexVector.h" @@ -83,11 +82,11 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { const std::vector& input, const velox::RowTypePtr& resultType) override; + /// Executes SQL query returned by the 'toSql' method based on the plan. + /// Returns std::nullopt if the plan is not supported. std::multiset> execute( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) override; + const core::PlanNodePtr& plan) override; /// Executes Presto SQL query and returns the results. Tables referenced by /// the query must already exist. @@ -105,17 +104,13 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { const std::vector& input, const RowTypePtr& resultType) override; - std::vector executeVector( - const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) override; - std::shared_ptr queryRunnerContext() { return queryRunnerContext_; } private: + using ReferenceQueryRunner::toSql; + memory::MemoryPool* pool() { return pool_.get(); } @@ -136,14 +131,11 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& tableWriteNode); - std::optional toSql( - const std::shared_ptr& joinNode); - - std::optional toSql( - const std::shared_ptr& joinNode); - - std::optional toSql( - const std::shared_ptr& valuesNode); + /// Executes SQL query returned by the 'toSql' method based on the plan. + /// Returns std::nullopt if the plan is not supported. + std::vector executeAndReturnVector( + const std::string& sql, + const core::PlanNodePtr& plan); std::string startQuery( const std::string& sql, diff --git a/velox/exec/fuzzer/ReferenceQueryRunner.cpp b/velox/exec/fuzzer/ReferenceQueryRunner.cpp new file mode 100644 index 000000000000..6dcf7540e5ef --- /dev/null +++ b/velox/exec/fuzzer/ReferenceQueryRunner.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/ToSQLUtil.h" + +namespace facebook::velox::exec::test { + +namespace { + +std::string joinKeysToSql( + const std::vector& keys) { + std::vector keyNames; + keyNames.reserve(keys.size()); + for (const core::FieldAccessTypedExprPtr& key : keys) { + keyNames.push_back(key->name()); + } + return folly::join(", ", keyNames); +} + +std::string filterToSql(const core::TypedExprPtr& filter) { + auto call = std::dynamic_pointer_cast(filter); + return toCallSql(call); +} + +std::string joinConditionAsSql(const core::AbstractJoinNode& joinNode) { + std::stringstream out; + for (auto i = 0; i < joinNode.leftKeys().size(); ++i) { + if (i > 0) { + out << " AND "; + } + out << joinNode.leftKeys()[i]->name() << " = " + << joinNode.rightKeys()[i]->name(); + } + if (joinNode.filter()) { + if (!joinNode.leftKeys().empty()) { + out << " AND "; + } + out << filterToSql(joinNode.filter()); + } + return out.str(); +} + +} // namespace + +bool ReferenceQueryRunner::isSupportedDwrfType(const TypePtr& type) { + if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { + return false; + } + + for (auto i = 0; i < type->size(); ++i) { + if (!isSupportedDwrfType(type->childAt(i))) { + return false; + } + } + + return true; +} + +std::unordered_map> +ReferenceQueryRunner::getAllTables(const core::PlanNodePtr& plan) { + std::unordered_map> result; + if (const auto valuesNode = + std::dynamic_pointer_cast(plan)) { + result.insert({getTableName(valuesNode), valuesNode->values()}); + } else { + for (const auto& source : plan->sources()) { + auto tablesAndNames = getAllTables(source); + result.insert(tablesAndNames.begin(), tablesAndNames.end()); + } + } + return result; +} + +std::optional ReferenceQueryRunner::joinSourceToSql( + const core::PlanNodePtr& planNode) { + const std::optional subQuery = toSql(planNode); + if (subQuery) { + return subQuery->find(" ") != std::string::npos + ? fmt::format("({})", *subQuery) + : *subQuery; + } + return std::nullopt; +} + +std::optional ReferenceQueryRunner::toSql( + const core::ValuesNodePtr& valuesNode) { + if (!isSupportedDwrfType(valuesNode->outputType())) { + return std::nullopt; + } + return getTableName(valuesNode); +} + +std::optional ReferenceQueryRunner::toSql( + const std::shared_ptr& joinNode) { + if (!isSupportedDwrfType(joinNode->sources()[0]->outputType()) || + !isSupportedDwrfType(joinNode->sources()[1]->outputType())) { + return std::nullopt; + } + + std::optional probeTableName = + joinSourceToSql(joinNode->sources()[0]); + std::optional buildTableName = + joinSourceToSql(joinNode->sources()[1]); + if (!probeTableName || !buildTableName) { + return std::nullopt; + } + + const auto& outputNames = joinNode->outputType()->names(); + + std::stringstream sql; + if (joinNode->isLeftSemiProjectJoin()) { + sql << "SELECT " + << folly::join(", ", outputNames.begin(), --outputNames.end()); + } else { + sql << "SELECT " << folly::join(", ", outputNames); + } + + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM " << *probeTableName << " INNER JOIN " << *buildTableName + << " ON " << joinConditionAsSql(*joinNode); + break; + case core::JoinType::kLeft: + sql << " FROM " << *probeTableName << " LEFT JOIN " << *buildTableName + << " ON " << joinConditionAsSql(*joinNode); + break; + case core::JoinType::kFull: + sql << " FROM " << *probeTableName << " FULL OUTER JOIN " + << *buildTableName << " ON " << joinConditionAsSql(*joinNode); + break; + case core::JoinType::kLeftSemiFilter: + // Multiple columns returned by a scalar subquery is not supported. A + // scalar subquery expression is a subquery that returns one result row + // from exactly one column for every input row. + if (joinNode->leftKeys().size() > 1) { + return std::nullopt; + } + sql << " FROM " << *probeTableName << " WHERE " + << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM " + << *buildTableName; + if (joinNode->filter()) { + sql << " WHERE " << filterToSql(joinNode->filter()); + } + sql << ")"; + break; + case core::JoinType::kLeftSemiProject: + if (joinNode->isNullAware()) { + sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM " + << *buildTableName; + if (joinNode->filter()) { + sql << " WHERE " << filterToSql(joinNode->filter()); + } + sql << ") FROM " << *probeTableName; + } else { + sql << ", EXISTS (SELECT * FROM " << *buildTableName << " WHERE " + << joinConditionAsSql(*joinNode); + sql << ") FROM " << *probeTableName; + } + break; + case core::JoinType::kAnti: + if (joinNode->isNullAware()) { + sql << " FROM " << *probeTableName << " WHERE " + << joinKeysToSql(joinNode->leftKeys()) << " NOT IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM " + << *buildTableName; + if (joinNode->filter()) { + sql << " WHERE " << filterToSql(joinNode->filter()); + } + sql << ")"; + } else { + sql << " FROM " << *probeTableName + << " WHERE NOT EXISTS (SELECT * FROM " << *buildTableName + << " WHERE " << joinConditionAsSql(*joinNode); + sql << ")"; + } + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + return sql.str(); +} + +std::optional ReferenceQueryRunner::toSql( + const std::shared_ptr& joinNode) { + std::optional probeTableName = + joinSourceToSql(joinNode->sources()[0]); + std::optional buildTableName = + joinSourceToSql(joinNode->sources()[1]); + if (!probeTableName || !buildTableName) { + return std::nullopt; + } + + std::stringstream sql; + sql << "SELECT " << folly::join(", ", joinNode->outputType()->names()); + + // Nested loop join without filter. + VELOX_CHECK_NULL( + joinNode->joinCondition(), + "This code path should be called only for nested loop join without filter"); + const std::string joinCondition{"(1 = 1)"}; + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM " << *probeTableName << " INNER JOIN " << *buildTableName + << " ON " << joinCondition; + break; + case core::JoinType::kLeft: + sql << " FROM " << *probeTableName << " LEFT JOIN " << *buildTableName + << " ON " << joinCondition; + break; + case core::JoinType::kFull: + sql << " FROM " << *probeTableName << " FULL OUTER JOIN " + << *buildTableName << " ON " << joinCondition; + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + return sql.str(); +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/ReferenceQueryRunner.h b/velox/exec/fuzzer/ReferenceQueryRunner.h index 5d0c24afdc24..3a73791b1eaf 100644 --- a/velox/exec/fuzzer/ReferenceQueryRunner.h +++ b/velox/exec/fuzzer/ReferenceQueryRunner.h @@ -15,7 +15,10 @@ */ #pragma once +#include #include +#include + #include "velox/core/PlanNode.h" #include "velox/expression/FunctionSignature.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -54,6 +57,18 @@ class ReferenceQueryRunner { /// reference database. virtual std::optional toSql(const core::PlanNodePtr& plan) = 0; + /// Same as the above toSql but for values nodes. + virtual std::optional toSql( + const core::ValuesNodePtr& valuesNode); + + /// Same as the above toSql but for hash join nodes. + virtual std::optional toSql( + const std::shared_ptr& joinNode); + + /// Same as the above toSql but for nested loop join nodes. + virtual std::optional toSql( + const std::shared_ptr& joinNode); + /// Returns whether a constant expression is supported by the reference /// database. virtual bool isConstantExprSupported(const core::TypedExprPtr& /*expr*/) { @@ -66,6 +81,13 @@ class ReferenceQueryRunner { return true; } + /// Executes SQL query returned by the 'toSql' method based on the plan. + virtual std::multiset> execute( + const std::string& sql, + const core::PlanNodePtr& plan) { + VELOX_UNSUPPORTED(); + } + /// Executes SQL query returned by the 'toSql' method using 'input' data. /// Converts results using 'resultType' schema. virtual std::multiset> execute( @@ -80,7 +102,9 @@ class ReferenceQueryRunner { const std::string& sql, const std::vector& probeInput, const std::vector& buildInput, - const RowTypePtr& resultType) = 0; + const RowTypePtr& resultType) { + VELOX_UNSUPPORTED(); + } /// Returns true if 'executeVector' can be called to get results as Velox /// Vector. @@ -97,15 +121,6 @@ class ReferenceQueryRunner { VELOX_UNSUPPORTED(); } - /// Similar to above but for join node with 'probeInput' and 'buildInput'. - virtual std::vector executeVector( - const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) { - VELOX_UNSUPPORTED(); - } - virtual std::vector execute(const std::string& sql) { VELOX_UNSUPPORTED(); } @@ -121,8 +136,20 @@ class ReferenceQueryRunner { return aggregatePool_; } + bool isSupportedDwrfType(const TypePtr& type); + + /// Returns the name of the values node table in the form t_. + std::string getTableName(const core::ValuesNodePtr& valuesNode) { + return fmt::format("t_{}", valuesNode->id()); + } + + // Traverses all nodes in the plan and returns all tables and their names. + std::unordered_map> + getAllTables(const core::PlanNodePtr& plan); + private: memory::MemoryPool* aggregatePool_; -}; + std::optional joinSourceToSql(const core::PlanNodePtr& planNode); +}; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/PrestoQueryRunnerTest.cpp b/velox/exec/tests/PrestoQueryRunnerTest.cpp index 25b231dc6c7c..14447f5eb396 100644 --- a/velox/exec/tests/PrestoQueryRunnerTest.cpp +++ b/velox/exec/tests/PrestoQueryRunnerTest.cpp @@ -255,4 +255,122 @@ TEST_F(PrestoQueryRunnerTest, toSql) { } } +TEST_F(PrestoQueryRunnerTest, toSqlJoins) { + auto aggregatePool = rootPool_->addAggregateChild("toSqlJoins"); + auto queryRunner = std::make_unique( + aggregatePool.get(), + "http://unused", + "hive", + static_cast(1000)); + + auto t = makeRowVector( + {"t0", "t1", "t2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + auto u = makeRowVector( + {"u0", "u1", "u2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + auto v = makeRowVector( + {"v0", "v1", "v2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + auto w = makeRowVector( + {"w0", "w1", "w2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + + // Single join. + { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values({t}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator).values({u}).planNode(), + /*filter=*/"", + {"t0", "t1"}, + core::JoinType::kInner) + .planNode(); + EXPECT_EQ( + *queryRunner->toSql(plan), + "SELECT t0, t1 FROM t_0 INNER JOIN t_1 ON t0 = u0"); + } + + // Two joins with a filter. + { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values({t}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator).values({u}).planNode(), + /*filter=*/"", + {"t0"}, + core::JoinType::kLeftSemiFilter) + .hashJoin( + {"t0"}, + {"v0"}, + PlanBuilder(planNodeIdGenerator).values({v}).planNode(), + "v1 > 0", + {"t0", "v1"}, + core::JoinType::kInner) + .planNode(); + EXPECT_EQ( + *queryRunner->toSql(plan), + "SELECT t0, v1" + " FROM (SELECT t0 FROM t_0 WHERE t0 IN (SELECT u0 FROM t_1))" + " INNER JOIN t_3 ON t0 = v0 AND (cast(v1 as BIGINT) > BIGINT '0')"); + } + + // Three joins. + { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values({t}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator).values({u}).planNode(), + /*filter=*/"", + {"t0", "t1"}, + core::JoinType::kLeft) + .hashJoin( + {"t0"}, + {"v0"}, + PlanBuilder(planNodeIdGenerator).values({v}).planNode(), + /*filter=*/"", + {"t0", "v1"}, + core::JoinType::kInner) + .hashJoin( + {"t0", "v1"}, + {"w0", "w1"}, + PlanBuilder(planNodeIdGenerator).values({w}).planNode(), + /*filter=*/"", + {"t0", "w1"}, + core::JoinType::kFull) + .planNode(); + EXPECT_EQ( + *queryRunner->toSql(plan), + "SELECT t0, w1" + " FROM (SELECT t0, v1 FROM (SELECT t0, t1 FROM t_0 LEFT JOIN t_1 ON t0 = u0)" + " INNER JOIN t_3 ON t0 = v0)" + " FULL OUTER JOIN t_5 ON t0 = w0 AND v1 = w1"); + } +} + } // namespace facebook::velox::exec::test