diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 60c199084706b..40184a3045188 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -194,10 +194,14 @@ class JoinFuzzer { void makeAlternativePlans( const PlanWithSplits& plan, - const std::vector& probeInput, - const std::vector& buildInput, - std::vector& plans, - const std::string& filter); + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& plans); // Runs one test iteration from query plans generations, executions and result // verifications. @@ -903,10 +907,14 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( void JoinFuzzer::makeAlternativePlans( const PlanWithSplits& plan, - const std::vector& probeInput, - const std::vector& buildInput, - std::vector& plans, - const std::string& filter) { + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& plans) { auto joinNode = std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); @@ -914,52 +922,75 @@ void JoinFuzzer::makeAlternativePlans( // Flip join sides. addFlippedJoinPlan(plan, plans); - // Parallelize probe and build sides. - const auto probeKeys = fieldNames(joinNode->leftKeys()); - const auto buildKeys = fieldNames(joinNode->rightKeys()); - const auto outputColumns = joinNode->outputType()->names(); - const auto joinType = joinNode->joinType(); - + // Add plan with local partition round robin for inputs. auto planNodeIdGenerator = std::make_shared(); - plans.push_back(JoinFuzzer::PlanWithSplits{ + PlanBuilder partitionPlan = PlanBuilder(planNodeIdGenerator) - .localPartitionRoundRobin( - makeSources(probeInput, planNodeIdGenerator)) + .localPartitionRoundRobin(makeSources(inputs[0], planNodeIdGenerator)) .hashJoin( - probeKeys, - buildKeys, + probeKeysList[0], + buildKeysList[0], PlanBuilder(planNodeIdGenerator) .localPartitionRoundRobin( - makeSources(buildInput, planNodeIdGenerator)) + makeSources(inputs[1], planNodeIdGenerator)) .planNode(), - filter, - outputColumns, - joinType, - joinNode->isNullAware()) - .planNode()}); + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + for (int i = 1; i < inputs.size() - 1; i++) { + partitionPlan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .localPartitionRoundRobin( + makeSources(inputs[i + 1], planNodeIdGenerator)) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + } + plans.push_back(PlanWithSplits{partitionPlan.planNode()}); + bool mergeJoinSupported = true; + bool NLJSupported = true; + for (const core::JoinType& joinType : joinTypes) { + if (!core::NestedLoopJoinNode::isSupported(joinType)) { + NLJSupported = false; + } + if (!core::MergeJoinNode::isSupported(joinType)) { + mergeJoinSupported = false; + } + } // Use OrderBy + MergeJoin - if (core::MergeJoinNode::isSupported(joinNode->joinType())) { + if (mergeJoinSupported) { auto planWithSplits = makeMergeJoinPlan( - {joinType}, - {probeKeys}, - {buildKeys}, - {probeInput, buildInput}, - {outputColumns}, - {filter}); + joinTypes, + probeKeysList, + buildKeysList, + inputs, + outputColumnsList, + filterList); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); } // Use NestedLoopJoin. - if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { - std::string joinCondition = filter.empty() - ? makeJoinFilter(probeKeys, buildKeys) - : fmt::format( - "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); + if (NLJSupported) { + std::vector joinConditionList; + for (int i = 0; i < probeKeysList.size(); i++) { + joinConditionList.push_back( + filterList[0].empty() + ? makeJoinFilter(probeKeysList[i], buildKeysList[i]) + : fmt::format( + "{} AND {}", + makeJoinFilter(probeKeysList[i], buildKeysList[i]), + filterList[i])); + } auto planWithSplits = makeNestedLoopJoinPlan( - {joinType}, {probeInput, buildInput}, {outputColumns}, {joinCondition}); + joinTypes, inputs, outputColumnsList, joinConditionList); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); @@ -1179,9 +1210,26 @@ void JoinFuzzer::verify(core::JoinType joinType) { {outputColumns}, {filter})); - makeAlternativePlans(defaultPlan, probeInput, buildInput, altPlans, filter); makeAlternativePlans( - defaultPlan, flatProbeInput, flatBuildInput, altPlans, filter); + defaultPlan, + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {probeInput, buildInput}, + {outputColumns}, + {filter}, + altPlans); + makeAlternativePlans( + defaultPlan, + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter}, + altPlans); addPlansWithTableScan( tableScanDir->getPath(),