From 220e7f0b2e4589d635583f442d075664b0bf3e37 Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Thu, 26 Dec 2024 17:39:33 -0800 Subject: [PATCH] feat(fuzzer): Update table scan plan functions to generate multi-join plans (#11940) Summary: Generates a cascading multi-join from left to right with table scans as the inputs. ``` [t1, t2, t3, t4] t1 t2 \ / a t3 \ / b t4 \ / c ``` Differential Revision: D67607533 --- velox/exec/fuzzer/JoinFuzzer.cpp | 652 ++++++++++++++----------------- 1 file changed, 301 insertions(+), 351 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 8107116e61740..0260e4697beac 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -82,29 +82,25 @@ class JoinFuzzer { void go(); + struct InputAndSplit { + const std::vector* input; + const std::vector split; + }; + struct PlanWithSplits { core::PlanNodePtr plan; - core::PlanNodeId probeScanId; - core::PlanNodeId buildScanId; - std::unordered_map> - splits; + std::unordered_map splits; core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; int32_t numGroups; explicit PlanWithSplits( const core::PlanNodePtr& _plan, - const core::PlanNodeId& _probeScanId = "", - const core::PlanNodeId& _buildScanId = "", - const std::unordered_map< - core::PlanNodeId, - std::vector>& _splits = {}, + const std::unordered_map& _splits = {}, core::ExecutionStrategy _executionStrategy = core::ExecutionStrategy::kUngrouped, int32_t _numGroups = 0) : plan(_plan), - probeScanId(_probeScanId), - buildScanId(_buildScanId), splits(_splits), executionStrategy(_executionStrategy), numGroups(_numGroups) {} @@ -158,71 +154,53 @@ class JoinFuzzer { const std::string& filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. - // If withFilter is true, uses the equality filter between probeKeys and - // buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - const std::string& filter); + const std::string& joinCondition); - // Makes the default query plan with table scan as inputs for both probe and - // build sides. + // Makes the default query plan with table scan as inputs for all of the + // inputs. JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList); JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList); // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan - // nodes. If withFilter is true, uses the equiality filter between probeKeys - // and buildKeys as the join filter. Uses empty join filter otherwise. + // nodes. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList); void makeAlternativePlans( - const core::PlanNodePtr& plan, + const PlanWithSplits& plan, const std::vector& probeInput, const std::vector& buildInput, std::vector& plans, const std::string& filter); - // Makes the query plan from 'planWithTableScan' with grouped execution mode. - // Correspondingly, it replaces the table scan input splits with grouped ones. - JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( - const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits); - // Runs one test iteration from query plans generations, executions and result // verifications. void verify(core::JoinType joinType); @@ -250,15 +228,16 @@ class JoinFuzzer { void addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter); + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector probeTypeList, + const std::vector buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& altPlans); // Splits the input into groups by partitioning on the join keys. std::vector> splitInputByGroup( @@ -269,9 +248,9 @@ class JoinFuzzer { // Generates the grouped splits. std::vector generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const std::string& name, + const size_t numKeys, const std::vector& input); RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); @@ -287,8 +266,6 @@ class JoinFuzzer { RowVectorPtr testCrossProduct( const std::string& tableDir, core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput); @@ -526,13 +503,18 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [planNodeId, nodeSplits] : plan.splits) { - builder.splits(planNodeId, nodeSplits); + for (const auto& [planNodeId, inputAndSplit] : plan.splits) { + builder.splits(planNodeId, inputAndSplit.split); } if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { builder.executionStrategy(core::ExecutionStrategy::kGrouped); - builder.groupedExecutionLeafNodeIds({plan.probeScanId, plan.buildScanId}); + std::unordered_set ids; + ids.reserve(plan.splits.size()); + for (const auto& [id, _] : plan.splits) { + ids.insert(id); + } + builder.groupedExecutionLeafNodeIds(ids); builder.numSplitGroups(plan.numGroups); builder.numConcurrentSplitGroups(randInt(1, plan.numGroups)); } @@ -766,54 +748,51 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( } JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputsAndSplits.size() > 1); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType, - nullAware) - .planNode(); - return PlanWithSplits{ - plan, - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; -} - -JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( - const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits) { - return PlanWithSplits{ - planWithTableScan.plan, - planWithTableScan.probeScanId, - planWithTableScan.buildScanId, - {{planWithTableScan.probeScanId, groupedProbeScanSplits}, - {planWithTableScan.buildScanId, groupedBuildScanSplits}}, - core::ExecutionStrategy::kGrouped, - numGroups}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .hashJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } std::vector makeSources( @@ -852,25 +831,13 @@ std::string makeJoinFilter( template void addFlippedJoinPlan( - const core::PlanNodePtr& plan, - std::vector& plans, - const core::PlanNodeId& probeScanId = "", - const core::PlanNodeId& buildScanId = "", - const std::unordered_map>& - splits = {}, - core::ExecutionStrategy executionStrategy = - core::ExecutionStrategy::kUngrouped, - int32_t numGroups = 0) { - auto joinNode = std::dynamic_pointer_cast(plan); + const JoinFuzzer::PlanWithSplits& plan, + std::vector& plans) { + auto joinNode = std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { plans.push_back(JoinFuzzer::PlanWithSplits{ - flippedPlan, - probeScanId, - buildScanId, - splits, - executionStrategy, - numGroups}); + flippedPlan, plan.splits, plan.executionStrategy, plan.numGroups}); } } @@ -901,31 +868,30 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - const std::string& filter) { + const std::string& joinCondition) { auto planNodeIdGenerator = std::make_shared(); return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .values(probeInput) .nestedLoopJoin( PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - filter, + joinCondition, outputColumns, joinType) .planNode()}; } void JoinFuzzer::makeAlternativePlans( - const core::PlanNodePtr& plan, + const PlanWithSplits& plan, const std::vector& probeInput, const std::vector& buildInput, std::vector& plans, const std::string& filter) { - auto joinNode = std::dynamic_pointer_cast(plan); + auto joinNode = + std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); // Flip join sides. @@ -967,7 +933,7 @@ void JoinFuzzer::makeAlternativePlans( filter); plans.push_back(planWithSplits); - addFlippedJoinPlan(planWithSplits.plan, plans); + addFlippedJoinPlan(planWithSplits, plans); } // Use NestedLoopJoin. @@ -977,16 +943,10 @@ void JoinFuzzer::makeAlternativePlans( : fmt::format( "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - joinCondition); + joinType, probeInput, buildInput, outputColumns, joinCondition); plans.push_back(planWithSplits); - addFlippedJoinPlan(planWithSplits.plan, plans); + addFlippedJoinPlan(planWithSplits, plans); } } @@ -1014,8 +974,6 @@ void JoinFuzzer::shuffleJoinKeys( RowVectorPtr JoinFuzzer::testCrossProduct( const std::string& tableDir, core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput) { VELOX_CHECK_GT(probeInput.size(), 0); @@ -1029,8 +987,6 @@ RowVectorPtr JoinFuzzer::testCrossProduct( auto plan = makeNestedLoopJoinPlan( joinType, - probeKeys, - buildKeys, probeInput, buildInput, outputColumns, @@ -1055,23 +1011,22 @@ RowVectorPtr JoinFuzzer::testCrossProduct( std::vector altPlans; if (isTableScanSupported(probeInput[0]->type()) && isTableScanSupported(buildInput[0]->type())) { - std::vector probeScanSplits = - makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + InputAndSplit probeInputAndSplit{ + &probeInput, + makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_)}; + InputAndSplit buildInputAndSplit{ + &buildInput, + makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_)}; altPlans.push_back(makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - /*filter=*/"")); + {joinType}, + {probeType}, + {buildType}, + {probeInputAndSplit, buildInputAndSplit}, + {outputColumns}, + /*filter=*/{""})); } - addFlippedJoinPlan(plan.plan, altPlans); + addFlippedJoinPlan(plan, altPlans); for (const auto& altPlan : altPlans) { auto actual = execute(altPlan, /*injectSpill=*/false); @@ -1142,19 +1097,9 @@ void JoinFuzzer::verify(core::JoinType joinType) { stats_.numCrossProduct++; auto result = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput); + tableScanDir->getPath(), joinType, probeInput, buildInput); auto flatResult = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput); + tableScanDir->getPath(), joinType, flatProbeInput, flatBuildInput); assertEqualResults({result}, {flatResult}); } } @@ -1219,22 +1164,22 @@ void JoinFuzzer::verify(core::JoinType joinType) { {outputColumns}, {filter})); + makeAlternativePlans(defaultPlan, probeInput, buildInput, altPlans, filter); makeAlternativePlans( - defaultPlan.plan, probeInput, buildInput, altPlans, filter); - makeAlternativePlans( - defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans, filter); + defaultPlan, flatProbeInput, flatBuildInput, altPlans, filter); addPlansWithTableScan( tableScanDir->getPath(), - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - outputColumns, - altPlans, - filter); + {joinType}, + {nullAware}, + {asRowType(flatProbeInput[0]->type())}, + {asRowType(flatBuildInput[0]->type())}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter}, + altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; @@ -1279,111 +1224,128 @@ void JoinFuzzer::verify(core::JoinType joinType) { } JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputsAndSplits.size() > 1); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .orderBy(probeKeysList[0], false) + .mergeJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .orderBy(buildKeysList[0], false) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.mergeJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .nestedLoopJoin( + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .planNode(), + joinConditionList[0], + outputColumnsList[0], + joinTypes[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.nestedLoopJoin( + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + joinConditionList[i], + outputColumnsList[i], + joinTypes[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } void JoinFuzzer::addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector probeTypeList, + const std::vector buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& altPlans) { VELOX_CHECK(!tableDir.empty()); + VELOX_CHECK(inputs.size() > 1); - if (!isTableScanSupported(probeInput[0]->type()) || - !isTableScanSupported(buildInput[0]->type())) { - return; - } - - std::vector probeScanSplits = - makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + std::vector inputsAndSplits; + for (int i = 0; i < inputs.size(); i++) { + if (!isTableScanSupported(inputs[i][0]->type())) { + return; + } - const auto probeType = asRowType(probeInput[0]->type()); - const auto buildType = asRowType(buildInput[0]->type()); + inputsAndSplits.push_back(InputAndSplit{ + &inputs[i], + makeSplits( + inputs[i], fmt::format("{}/table_{}", tableDir, i), writerPool_)}); + } std::vector plansWithTableScan; auto defaultPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + joinTypes, + nullAwareList, + probeTypeList, + buildTypeList, + probeKeysList, + buildKeysList, + inputsAndSplits, + outputColumnsList, + filterList); plansWithTableScan.push_back(defaultPlan); auto joinNode = @@ -1391,94 +1353,86 @@ void JoinFuzzer::addPlansWithTableScan( VELOX_CHECK_NOT_NULL(joinNode); // Flip join sides. - addFlippedJoinPlan( - defaultPlan.plan, - plansWithTableScan, - defaultPlan.probeScanId, - defaultPlan.buildScanId, - defaultPlan.splits); - - const int32_t numGroups = randInt(1, probeScanSplits.size()); - const std::vector groupedProbeScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/true, - probeKeys.size(), - probeInput); - const std::vector groupedBuildScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/false, - buildKeys.size(), - buildInput); + addFlippedJoinPlan(defaultPlan, plansWithTableScan); + + const int32_t numGroups = randInt(1, inputsAndSplits[0].split.size()); + std::unordered_map groupedSplits; + for (int i = 0; const auto& [id, inputAndSplit] : defaultPlan.splits) { + groupedSplits.insert( + {id, + InputAndSplit{ + inputAndSplit.input, + generateSplitsWithGroup( + tableDir, + numGroups, + /*name=*/fmt::format("table_{}", i), + /*numKeys=*/i == 0 ? probeKeysList[i].size() + : buildKeysList[i - 1].size(), + *inputAndSplit.input)}}); + i++; + } for (const auto& planWithTableScan : plansWithTableScan) { altPlans.push_back(planWithTableScan); - altPlans.push_back(makeGroupedExecutionPlanWithTableScan( - planWithTableScan, - numGroups, - groupedProbeScanSplits, - groupedBuildScanSplits)); + altPlans.push_back(PlanWithSplits{ + planWithTableScan.plan, + groupedSplits, + core::ExecutionStrategy::kGrouped, + numGroups}); } // Add ungrouped MergeJoin with TableScan. if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + joinTypes, + probeTypeList, + buildTypeList, + probeKeysList, + buildKeysList, + inputsAndSplits, + outputColumnsList, + filterList); altPlans.push_back(planWithSplits); - addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + addFlippedJoinPlan(planWithSplits, altPlans); } // Add ungrouped NestedLoopJoin with TableScan. - if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { - std::string joinCondition = filter.empty() - ? makeJoinFilter(probeKeys, buildKeys) - : fmt::format( - "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); + bool NLJSupported = true; + for (const core::JoinType joinType : joinTypes) { + if (!core::NestedLoopJoinNode::isSupported(joinType)) { + NLJSupported = false; + } + } + if (NLJSupported) { + std::vector joinConditionList; + for (int i = 0; i < probeKeysList.size(); i++) { + joinConditionList.push_back( + filterList[i].empty() + ? makeJoinFilter(probeKeysList[i], buildKeysList[i]) + : fmt::format( + "{} AND {}", + makeJoinFilter(probeKeysList[i], buildKeysList[i]), + filterList[i])); + } auto planWithSplits = makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - joinCondition); + joinTypes, + probeTypeList, + buildTypeList, + inputsAndSplits, + outputColumnsList, + joinConditionList); altPlans.push_back(planWithSplits); - addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + addFlippedJoinPlan(planWithSplits, altPlans); } } std::vector JoinFuzzer::generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const std::string& name, + const size_t numKeys, const std::vector& input) { const std::vector> inputVectorsByGroup = splitInputByGroup(numGroups, numKeys, input); @@ -1486,12 +1440,8 @@ std::vector JoinFuzzer::generateSplitsWithGroup( std::vector splitsWithGroup; for (int32_t groupId = 0; groupId < numGroups; ++groupId) { for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { - const std::string filePath = fmt::format( - "{}/grouped[{}].{}.{}", - tableDir, - groupId, - isProbe ? "probe" : "build", - i); + const std::string filePath = + fmt::format("{}/grouped[{}].{}.{}", tableDir, groupId, name, i); writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); splitsWithGroup.emplace_back(makeConnectorSplit(filePath), groupId); }