From 9115c484fa6e2d96ffda82f50bd0d5924f0f36dd Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Thu, 9 Jan 2025 16:13:21 -0800 Subject: [PATCH 1/6] feat(fuzzer): Support multiple joins in the join node "toSql" methods for reference query runners (#11801) Summary: Currently, the hash join and nested loop join "toSql" methods for all reference query runners only support a single join. This change extends it to support multiple joins, only needing the join node of the last join in the tree. It traverses up the tree and recursively builds the sql query. Differential Revision: D66977480 --- velox/core/PlanNode.h | 2 + velox/exec/fuzzer/CMakeLists.txt | 4 +- velox/exec/fuzzer/DuckQueryRunner.cpp | 160 ++------------ velox/exec/fuzzer/DuckQueryRunner.h | 23 +- velox/exec/fuzzer/JoinFuzzer.cpp | 4 +- velox/exec/fuzzer/PrestoQueryRunner.cpp | 229 +++---------------- velox/exec/fuzzer/PrestoQueryRunner.h | 28 +-- velox/exec/fuzzer/ReferenceQueryRunner.cpp | 242 +++++++++++++++++++++ velox/exec/fuzzer/ReferenceQueryRunner.h | 49 ++++- velox/exec/tests/PrestoQueryRunnerTest.cpp | 118 ++++++++++ 10 files changed, 470 insertions(+), 389 deletions(-) create mode 100644 velox/exec/fuzzer/ReferenceQueryRunner.cpp diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index aac5583e3e7e..f9970f3f2267 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -324,6 +324,8 @@ class ValuesNode : public PlanNode { const size_t repeatTimes_; }; +using ValuesNodePtr = std::shared_ptr; + class ArrowStreamNode : public PlanNode { public: ArrowStreamNode( diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index 856373b54fb4..d8214636d4d1 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_fuzzer_util DuckQueryRunner.cpp PrestoQueryRunner.cpp - FuzzerUtil.cpp ToSQLUtil.cpp) +add_library(velox_fuzzer_util ReferenceQueryRunner.cpp DuckQueryRunner.cpp + PrestoQueryRunner.cpp FuzzerUtil.cpp ToSQLUtil.cpp) target_link_libraries( velox_fuzzer_util diff --git a/velox/exec/fuzzer/DuckQueryRunner.cpp b/velox/exec/fuzzer/DuckQueryRunner.cpp index d6d606f6497e..7758263e5223 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.cpp +++ b/velox/exec/fuzzer/DuckQueryRunner.cpp @@ -13,6 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +#include +#include +#include + #include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/exec/fuzzer/ToSQLUtil.h" #include "velox/exec/tests/utils/QueryAssertions.h" @@ -104,21 +109,22 @@ DuckQueryRunner::aggregationFunctionDataSpecs() const { std::multiset> DuckQueryRunner::execute( const std::string& sql, - const std::vector& input, - const RowTypePtr& resultType) { + const core::PlanNodePtr& plan) { DuckDbQueryRunner queryRunner; - queryRunner.createTable("tmp", input); - return queryRunner.execute(sql, resultType); + std::unordered_map> inputMap = + getAllTables(plan); + for (const auto& [tableName, input] : inputMap) { + queryRunner.createTable(tableName, input); + } + return queryRunner.execute(sql, plan->outputType()); } std::multiset> DuckQueryRunner::execute( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, + const std::vector& input, const RowTypePtr& resultType) { DuckDbQueryRunner queryRunner; - queryRunner.createTable("t", probeInput); - queryRunner.createTable("u", buildInput); + queryRunner.createTable("tmp", input); return queryRunner.execute(sql, resultType); } @@ -164,6 +170,11 @@ std::optional DuckQueryRunner::toSql( return toSql(joinNode); } + if (const auto valuesNode = + std::dynamic_pointer_cast(plan)) { + return toSql(valuesNode); + } + VELOX_NYI(); } @@ -340,137 +351,4 @@ std::optional DuckQueryRunner::toSql( return sql.str(); } - -std::optional DuckQueryRunner::toSql( - const std::shared_ptr& joinNode) { - const auto& joinKeysToSql = [](auto keys) { - std::stringstream out; - for (auto i = 0; i < keys.size(); ++i) { - if (i > 0) { - out << ", "; - } - out << keys[i]->name(); - } - return out.str(); - }; - - const auto filterToSql = [](core::TypedExprPtr filter) { - auto call = std::dynamic_pointer_cast(filter); - return toCallSql(call); - }; - - const auto& joinConditionAsSql = [&](auto joinNode) { - std::stringstream out; - for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { - if (i > 0) { - out << " AND "; - } - out << joinNode->leftKeys()[i]->name() << " = " - << joinNode->rightKeys()[i]->name(); - } - if (joinNode->filter()) { - out << " AND " << filterToSql(joinNode->filter()); - } - return out.str(); - }; - - const auto& outputNames = joinNode->outputType()->names(); - - std::stringstream sql; - if (joinNode->isLeftSemiProjectJoin()) { - sql << "SELECT " - << folly::join(", ", outputNames.begin(), --outputNames.end()); - } else { - sql << "SELECT " << folly::join(", ", outputNames); - } - - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeftSemiFilter: - // Multiple columns returned by a scalar subquery is not supported in - // DuckDB. A scalar subquery expression is a subquery that returns one - // result row from exactly one column for every input row. - if (joinNode->leftKeys().size() > 1) { - return std::nullopt; - } - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - break; - case core::JoinType::kLeftSemiProject: - if (joinNode->isNullAware()) { - sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " - << joinKeysToSql(joinNode->rightKeys()) << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ") FROM t"; - } else { - sql << ", EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ") FROM t"; - } - break; - case core::JoinType::kAnti: - if (joinNode->isNullAware()) { - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - } else { - sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ")"; - } - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - - return sql.str(); -} - -std::optional DuckQueryRunner::toSql( - const std::shared_ptr& joinNode) { - std::stringstream sql; - sql << "SELECT " << folly::join(", ", joinNode->outputType()->names()); - - // Nested loop join without filter. - VELOX_CHECK( - joinNode->joinCondition() == nullptr, - "This code path should be called only for nested loop join without filter"); - const std::string joinCondition{"(1 = 1)"}; - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinCondition; - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinCondition; - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - - return sql.str(); -} } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/DuckQueryRunner.h b/velox/exec/fuzzer/DuckQueryRunner.h index 4fa826af0488..3336ffa1668c 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.h +++ b/velox/exec/fuzzer/DuckQueryRunner.h @@ -15,6 +15,10 @@ */ #pragma once +#include +#include +#include + #include "velox/exec/fuzzer/ReferenceQueryRunner.h" namespace facebook::velox::exec::test { @@ -46,20 +50,21 @@ class DuckQueryRunner : public ReferenceQueryRunner { /// Assumes that source of AggregationNode or Window Node is 'tmp' table. std::optional toSql(const core::PlanNodePtr& plan) override; - /// Creates 'tmp' table with 'input' data and runs 'sql' query. Returns - /// results according to 'resultType' schema. + /// Executes SQL query returned by the 'toSql' method based on the plan. std::multiset> execute( const std::string& sql, - const std::vector& input, - const RowTypePtr& resultType) override; + const core::PlanNodePtr& plan) override; + /// Creates 'tmp' table with 'input' data and runs 'sql' query. Returns + /// results according to 'resultType' schema. std::multiset> execute( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, + const std::vector& input, const RowTypePtr& resultType) override; private: + using ReferenceQueryRunner::toSql; + std::optional toSql( const std::shared_ptr& aggregationNode); @@ -72,12 +77,6 @@ class DuckQueryRunner : public ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& rowNumberNode); - std::optional toSql( - const std::shared_ptr& joinNode); - - std::optional toSql( - const std::shared_ptr& joinNode); - std::unordered_set aggregateFunctionNames_; }; diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 1860eca9df0b..21e4a7de81fc 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -680,10 +680,8 @@ std::optional JoinFuzzer::computeReferenceResults( } if (auto sql = referenceQueryRunner_->toSql(plan)) { - return referenceQueryRunner_->execute( - sql.value(), probeInput, buildInput, plan->outputType()); + return referenceQueryRunner_->execute(*sql, plan); } - LOG(INFO) << "Query not supported by the reference DB"; return std::nullopt; } diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index c8bba9cdb64d..913b0215b4a6 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -14,13 +14,13 @@ * limitations under the License. */ -#include "velox/exec/fuzzer/PrestoQueryRunner.h" #include // @manual #include #include +#include + #include "velox/common/base/Fs.h" #include "velox/common/encode/Base64.h" -#include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/Expressions.h" @@ -28,6 +28,7 @@ #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/fuzzer/FuzzerUtil.h" +#include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/fuzzer/ToSQLUtil.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/functions/prestosql/types/IPAddressType.h" @@ -36,8 +37,6 @@ #include "velox/serializers/PrestoSerializer.h" #include "velox/type/parser/TypeParser.h" -#include - using namespace facebook::velox; namespace facebook::velox::exec::test { @@ -221,20 +220,6 @@ std::string toWindowCallSql( return sql.str(); } -bool isSupportedDwrfType(const TypePtr& type) { - if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { - return false; - } - - for (auto i = 0; i < type->size(); ++i) { - if (!isSupportedDwrfType(type->childAt(i))) { - return false; - } - } - - return true; -} - } // namespace const std::vector& PrestoQueryRunner::supportedScalarTypes() const { @@ -554,152 +539,10 @@ std::optional PrestoQueryRunner::toSql( return sql.str(); } -std::optional PrestoQueryRunner::toSql( - const std::shared_ptr& joinNode) { - if (!isSupportedDwrfType(joinNode->sources()[0]->outputType())) { - return std::nullopt; - } - - if (!isSupportedDwrfType(joinNode->sources()[1]->outputType())) { - return std::nullopt; - } - - const auto joinKeysToSql = [](auto keys) { - std::stringstream out; - for (auto i = 0; i < keys.size(); ++i) { - if (i > 0) { - out << ", "; - } - out << keys[i]->name(); - } - return out.str(); - }; - - const auto filterToSql = [](core::TypedExprPtr filter) { - auto call = std::dynamic_pointer_cast(filter); - return toCallSql(call); - }; - - const auto& joinConditionAsSql = [&](auto joinNode) { - std::stringstream out; - for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { - if (i > 0) { - out << " AND "; - } - out << joinNode->leftKeys()[i]->name() << " = " - << joinNode->rightKeys()[i]->name(); - } - if (joinNode->filter()) { - out << " AND " << filterToSql(joinNode->filter()); - } - return out.str(); - }; - - const auto& outputNames = joinNode->outputType()->names(); - - std::stringstream sql; - if (joinNode->isLeftSemiProjectJoin()) { - sql << "SELECT " - << folly::join(", ", outputNames.begin(), --outputNames.end()); - } else { - sql << "SELECT " << folly::join(", ", outputNames); - } - - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinConditionAsSql(joinNode); - break; - case core::JoinType::kLeftSemiFilter: - // Multiple columns returned by a scalar subquery is not supported in - // Presto. A scalar subquery expression is a subquery that returns one - // result row from exactly one column for every input row. - if (joinNode->leftKeys().size() > 1) { - return std::nullopt; - } - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - break; - case core::JoinType::kLeftSemiProject: - if (joinNode->isNullAware()) { - sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " - << joinKeysToSql(joinNode->rightKeys()) << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ") FROM t"; - } else { - sql << ", EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ") FROM t"; - } - break; - case core::JoinType::kAnti: - if (joinNode->isNullAware()) { - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u"; - if (joinNode->filter()) { - sql << " WHERE " << filterToSql(joinNode->filter()); - } - sql << ")"; - } else { - sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " - << joinConditionAsSql(joinNode); - sql << ")"; - } - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - return sql.str(); -} - -std::optional PrestoQueryRunner::toSql( - const std::shared_ptr& joinNode) { - std::stringstream sql; - sql << "SELECT " << folly::join(", ", joinNode->outputType()->names()); - - // Nested loop join without filter. - VELOX_CHECK( - joinNode->joinCondition() == nullptr, - "This code path should be called only for nested loop join without filter"); - const std::string joinCondition{"(1 = 1)"}; - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << joinCondition; - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << joinCondition; - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); - } - - return sql.str(); -} - -std::optional PrestoQueryRunner::toSql( - const std::shared_ptr& valuesNode) { - if (!isSupportedDwrfType(valuesNode->outputType())) { - return std::nullopt; - } - return "tmp"; +std::multiset> PrestoQueryRunner::execute( + const std::string& sql, + const core::PlanNodePtr& plan) { + return exec::test::materialize(executeAndReturnVector(sql, plan)); } std::multiset> PrestoQueryRunner::execute( @@ -709,15 +552,6 @@ std::multiset> PrestoQueryRunner::execute( return exec::test::materialize(executeVector(sql, input, resultType)); } -std::multiset> PrestoQueryRunner::execute( - const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) { - return exec::test::materialize( - executeVector(sql, probeInput, buildInput, resultType)); -} - std::string PrestoQueryRunner::createTable( const std::string& name, const TypePtr& type) { @@ -749,40 +583,31 @@ std::string PrestoQueryRunner::createTable( return tableDirectoryPath; } -std::vector PrestoQueryRunner::executeVector( +std::vector PrestoQueryRunner::executeAndReturnVector( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const velox::RowTypePtr& resultType) { - auto probeType = asRowType(probeInput[0]->type()); - if (probeType->size() == 0) { - auto rowVector = makeNullRows(probeInput, "x", pool()); - return executeVector(sql, {rowVector}, buildInput, resultType); - } - - auto buildType = asRowType(buildInput[0]->type()); - if (probeType->size() == 0) { - auto rowVector = makeNullRows(buildInput, "y", pool()); - return executeVector(sql, probeInput, {rowVector}, resultType); + const core::PlanNodePtr& plan) { + std::unordered_map> inputMap = + getAllTables(plan); + for (const auto& [tableName, input] : inputMap) { + auto inputType = asRowType(input[0]->type()); + if (inputType->size() == 0) { + inputMap[tableName] = { + makeNullRows(input, fmt::format("{}x", tableName), pool())}; + } } - auto probeTableDirectoryPath = createTable("t", probeInput[0]->type()); - auto buildTableDirectoryPath = createTable("u", buildInput[0]->type()); - - // Create a new file in table's directory with fuzzer-generated data. - auto probeFilePath = fs::path(probeTableDirectoryPath) - .append("probe.dwrf") - .string() - .substr(strlen("file:")); + auto writerPool = aggregatePool()->addAggregateChild("writer"); + for (const auto& [tableName, input] : inputMap) { + auto tableDirectoryPath = createTable(tableName, input[0]->type()); - auto buildFilePath = fs::path(buildTableDirectoryPath) - .append("build.dwrf") - .string() - .substr(strlen("file:")); + // Create a new file in table's directory with fuzzer-generated data. + auto filePath = fs::path(tableDirectoryPath) + .append(fmt::format("{}.dwrf", tableName)) + .string() + .substr(strlen("file:")); - auto writerPool = aggregatePool()->addAggregateChild("writer"); - writeToFile(probeFilePath, probeInput, writerPool.get()); - writeToFile(buildFilePath, buildInput, writerPool.get()); + writeToFile(filePath, input, writerPool.get()); + } // Run the query. return execute(sql); diff --git a/velox/exec/fuzzer/PrestoQueryRunner.h b/velox/exec/fuzzer/PrestoQueryRunner.h index a72cae913e10..31d1604cacfc 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.h +++ b/velox/exec/fuzzer/PrestoQueryRunner.h @@ -18,7 +18,6 @@ #include #include -#include "velox/common/memory/Memory.h" #include "velox/exec/fuzzer/ReferenceQueryRunner.h" #include "velox/vector/ComplexVector.h" @@ -83,11 +82,11 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { const std::vector& input, const velox::RowTypePtr& resultType) override; + /// Executes SQL query returned by the 'toSql' method based on the plan. + /// Returns std::nullopt if the plan is not supported. std::multiset> execute( const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) override; + const core::PlanNodePtr& plan) override; /// Executes Presto SQL query and returns the results. Tables referenced by /// the query must already exist. @@ -105,17 +104,13 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { const std::vector& input, const RowTypePtr& resultType) override; - std::vector executeVector( - const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) override; - std::shared_ptr queryRunnerContext() { return queryRunnerContext_; } private: + using ReferenceQueryRunner::toSql; + memory::MemoryPool* pool() { return pool_.get(); } @@ -136,14 +131,11 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& tableWriteNode); - std::optional toSql( - const std::shared_ptr& joinNode); - - std::optional toSql( - const std::shared_ptr& joinNode); - - std::optional toSql( - const std::shared_ptr& valuesNode); + /// Executes SQL query returned by the 'toSql' method based on the plan. + /// Returns std::nullopt if the plan is not supported. + std::vector executeAndReturnVector( + const std::string& sql, + const core::PlanNodePtr& plan); std::string startQuery( const std::string& sql, diff --git a/velox/exec/fuzzer/ReferenceQueryRunner.cpp b/velox/exec/fuzzer/ReferenceQueryRunner.cpp new file mode 100644 index 000000000000..6dcf7540e5ef --- /dev/null +++ b/velox/exec/fuzzer/ReferenceQueryRunner.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include "velox/core/PlanNode.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/ToSQLUtil.h" + +namespace facebook::velox::exec::test { + +namespace { + +std::string joinKeysToSql( + const std::vector& keys) { + std::vector keyNames; + keyNames.reserve(keys.size()); + for (const core::FieldAccessTypedExprPtr& key : keys) { + keyNames.push_back(key->name()); + } + return folly::join(", ", keyNames); +} + +std::string filterToSql(const core::TypedExprPtr& filter) { + auto call = std::dynamic_pointer_cast(filter); + return toCallSql(call); +} + +std::string joinConditionAsSql(const core::AbstractJoinNode& joinNode) { + std::stringstream out; + for (auto i = 0; i < joinNode.leftKeys().size(); ++i) { + if (i > 0) { + out << " AND "; + } + out << joinNode.leftKeys()[i]->name() << " = " + << joinNode.rightKeys()[i]->name(); + } + if (joinNode.filter()) { + if (!joinNode.leftKeys().empty()) { + out << " AND "; + } + out << filterToSql(joinNode.filter()); + } + return out.str(); +} + +} // namespace + +bool ReferenceQueryRunner::isSupportedDwrfType(const TypePtr& type) { + if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { + return false; + } + + for (auto i = 0; i < type->size(); ++i) { + if (!isSupportedDwrfType(type->childAt(i))) { + return false; + } + } + + return true; +} + +std::unordered_map> +ReferenceQueryRunner::getAllTables(const core::PlanNodePtr& plan) { + std::unordered_map> result; + if (const auto valuesNode = + std::dynamic_pointer_cast(plan)) { + result.insert({getTableName(valuesNode), valuesNode->values()}); + } else { + for (const auto& source : plan->sources()) { + auto tablesAndNames = getAllTables(source); + result.insert(tablesAndNames.begin(), tablesAndNames.end()); + } + } + return result; +} + +std::optional ReferenceQueryRunner::joinSourceToSql( + const core::PlanNodePtr& planNode) { + const std::optional subQuery = toSql(planNode); + if (subQuery) { + return subQuery->find(" ") != std::string::npos + ? fmt::format("({})", *subQuery) + : *subQuery; + } + return std::nullopt; +} + +std::optional ReferenceQueryRunner::toSql( + const core::ValuesNodePtr& valuesNode) { + if (!isSupportedDwrfType(valuesNode->outputType())) { + return std::nullopt; + } + return getTableName(valuesNode); +} + +std::optional ReferenceQueryRunner::toSql( + const std::shared_ptr& joinNode) { + if (!isSupportedDwrfType(joinNode->sources()[0]->outputType()) || + !isSupportedDwrfType(joinNode->sources()[1]->outputType())) { + return std::nullopt; + } + + std::optional probeTableName = + joinSourceToSql(joinNode->sources()[0]); + std::optional buildTableName = + joinSourceToSql(joinNode->sources()[1]); + if (!probeTableName || !buildTableName) { + return std::nullopt; + } + + const auto& outputNames = joinNode->outputType()->names(); + + std::stringstream sql; + if (joinNode->isLeftSemiProjectJoin()) { + sql << "SELECT " + << folly::join(", ", outputNames.begin(), --outputNames.end()); + } else { + sql << "SELECT " << folly::join(", ", outputNames); + } + + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM " << *probeTableName << " INNER JOIN " << *buildTableName + << " ON " << joinConditionAsSql(*joinNode); + break; + case core::JoinType::kLeft: + sql << " FROM " << *probeTableName << " LEFT JOIN " << *buildTableName + << " ON " << joinConditionAsSql(*joinNode); + break; + case core::JoinType::kFull: + sql << " FROM " << *probeTableName << " FULL OUTER JOIN " + << *buildTableName << " ON " << joinConditionAsSql(*joinNode); + break; + case core::JoinType::kLeftSemiFilter: + // Multiple columns returned by a scalar subquery is not supported. A + // scalar subquery expression is a subquery that returns one result row + // from exactly one column for every input row. + if (joinNode->leftKeys().size() > 1) { + return std::nullopt; + } + sql << " FROM " << *probeTableName << " WHERE " + << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM " + << *buildTableName; + if (joinNode->filter()) { + sql << " WHERE " << filterToSql(joinNode->filter()); + } + sql << ")"; + break; + case core::JoinType::kLeftSemiProject: + if (joinNode->isNullAware()) { + sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM " + << *buildTableName; + if (joinNode->filter()) { + sql << " WHERE " << filterToSql(joinNode->filter()); + } + sql << ") FROM " << *probeTableName; + } else { + sql << ", EXISTS (SELECT * FROM " << *buildTableName << " WHERE " + << joinConditionAsSql(*joinNode); + sql << ") FROM " << *probeTableName; + } + break; + case core::JoinType::kAnti: + if (joinNode->isNullAware()) { + sql << " FROM " << *probeTableName << " WHERE " + << joinKeysToSql(joinNode->leftKeys()) << " NOT IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM " + << *buildTableName; + if (joinNode->filter()) { + sql << " WHERE " << filterToSql(joinNode->filter()); + } + sql << ")"; + } else { + sql << " FROM " << *probeTableName + << " WHERE NOT EXISTS (SELECT * FROM " << *buildTableName + << " WHERE " << joinConditionAsSql(*joinNode); + sql << ")"; + } + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + return sql.str(); +} + +std::optional ReferenceQueryRunner::toSql( + const std::shared_ptr& joinNode) { + std::optional probeTableName = + joinSourceToSql(joinNode->sources()[0]); + std::optional buildTableName = + joinSourceToSql(joinNode->sources()[1]); + if (!probeTableName || !buildTableName) { + return std::nullopt; + } + + std::stringstream sql; + sql << "SELECT " << folly::join(", ", joinNode->outputType()->names()); + + // Nested loop join without filter. + VELOX_CHECK_NULL( + joinNode->joinCondition(), + "This code path should be called only for nested loop join without filter"); + const std::string joinCondition{"(1 = 1)"}; + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM " << *probeTableName << " INNER JOIN " << *buildTableName + << " ON " << joinCondition; + break; + case core::JoinType::kLeft: + sql << " FROM " << *probeTableName << " LEFT JOIN " << *buildTableName + << " ON " << joinCondition; + break; + case core::JoinType::kFull: + sql << " FROM " << *probeTableName << " FULL OUTER JOIN " + << *buildTableName << " ON " << joinCondition; + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + return sql.str(); +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/ReferenceQueryRunner.h b/velox/exec/fuzzer/ReferenceQueryRunner.h index 5d0c24afdc24..3a73791b1eaf 100644 --- a/velox/exec/fuzzer/ReferenceQueryRunner.h +++ b/velox/exec/fuzzer/ReferenceQueryRunner.h @@ -15,7 +15,10 @@ */ #pragma once +#include #include +#include + #include "velox/core/PlanNode.h" #include "velox/expression/FunctionSignature.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -54,6 +57,18 @@ class ReferenceQueryRunner { /// reference database. virtual std::optional toSql(const core::PlanNodePtr& plan) = 0; + /// Same as the above toSql but for values nodes. + virtual std::optional toSql( + const core::ValuesNodePtr& valuesNode); + + /// Same as the above toSql but for hash join nodes. + virtual std::optional toSql( + const std::shared_ptr& joinNode); + + /// Same as the above toSql but for nested loop join nodes. + virtual std::optional toSql( + const std::shared_ptr& joinNode); + /// Returns whether a constant expression is supported by the reference /// database. virtual bool isConstantExprSupported(const core::TypedExprPtr& /*expr*/) { @@ -66,6 +81,13 @@ class ReferenceQueryRunner { return true; } + /// Executes SQL query returned by the 'toSql' method based on the plan. + virtual std::multiset> execute( + const std::string& sql, + const core::PlanNodePtr& plan) { + VELOX_UNSUPPORTED(); + } + /// Executes SQL query returned by the 'toSql' method using 'input' data. /// Converts results using 'resultType' schema. virtual std::multiset> execute( @@ -80,7 +102,9 @@ class ReferenceQueryRunner { const std::string& sql, const std::vector& probeInput, const std::vector& buildInput, - const RowTypePtr& resultType) = 0; + const RowTypePtr& resultType) { + VELOX_UNSUPPORTED(); + } /// Returns true if 'executeVector' can be called to get results as Velox /// Vector. @@ -97,15 +121,6 @@ class ReferenceQueryRunner { VELOX_UNSUPPORTED(); } - /// Similar to above but for join node with 'probeInput' and 'buildInput'. - virtual std::vector executeVector( - const std::string& sql, - const std::vector& probeInput, - const std::vector& buildInput, - const RowTypePtr& resultType) { - VELOX_UNSUPPORTED(); - } - virtual std::vector execute(const std::string& sql) { VELOX_UNSUPPORTED(); } @@ -121,8 +136,20 @@ class ReferenceQueryRunner { return aggregatePool_; } + bool isSupportedDwrfType(const TypePtr& type); + + /// Returns the name of the values node table in the form t_. + std::string getTableName(const core::ValuesNodePtr& valuesNode) { + return fmt::format("t_{}", valuesNode->id()); + } + + // Traverses all nodes in the plan and returns all tables and their names. + std::unordered_map> + getAllTables(const core::PlanNodePtr& plan); + private: memory::MemoryPool* aggregatePool_; -}; + std::optional joinSourceToSql(const core::PlanNodePtr& planNode); +}; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/PrestoQueryRunnerTest.cpp b/velox/exec/tests/PrestoQueryRunnerTest.cpp index 25b231dc6c7c..14447f5eb396 100644 --- a/velox/exec/tests/PrestoQueryRunnerTest.cpp +++ b/velox/exec/tests/PrestoQueryRunnerTest.cpp @@ -255,4 +255,122 @@ TEST_F(PrestoQueryRunnerTest, toSql) { } } +TEST_F(PrestoQueryRunnerTest, toSqlJoins) { + auto aggregatePool = rootPool_->addAggregateChild("toSqlJoins"); + auto queryRunner = std::make_unique( + aggregatePool.get(), + "http://unused", + "hive", + static_cast(1000)); + + auto t = makeRowVector( + {"t0", "t1", "t2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + auto u = makeRowVector( + {"u0", "u1", "u2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + auto v = makeRowVector( + {"v0", "v1", "v2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + auto w = makeRowVector( + {"w0", "w1", "w2"}, + { + makeFlatVector({}), + makeFlatVector({}), + makeFlatVector({}), + }); + + // Single join. + { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values({t}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator).values({u}).planNode(), + /*filter=*/"", + {"t0", "t1"}, + core::JoinType::kInner) + .planNode(); + EXPECT_EQ( + *queryRunner->toSql(plan), + "SELECT t0, t1 FROM t_0 INNER JOIN t_1 ON t0 = u0"); + } + + // Two joins with a filter. + { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values({t}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator).values({u}).planNode(), + /*filter=*/"", + {"t0"}, + core::JoinType::kLeftSemiFilter) + .hashJoin( + {"t0"}, + {"v0"}, + PlanBuilder(planNodeIdGenerator).values({v}).planNode(), + "v1 > 0", + {"t0", "v1"}, + core::JoinType::kInner) + .planNode(); + EXPECT_EQ( + *queryRunner->toSql(plan), + "SELECT t0, v1" + " FROM (SELECT t0 FROM t_0 WHERE t0 IN (SELECT u0 FROM t_1))" + " INNER JOIN t_3 ON t0 = v0 AND (cast(v1 as BIGINT) > BIGINT '0')"); + } + + // Three joins. + { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values({t}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator).values({u}).planNode(), + /*filter=*/"", + {"t0", "t1"}, + core::JoinType::kLeft) + .hashJoin( + {"t0"}, + {"v0"}, + PlanBuilder(planNodeIdGenerator).values({v}).planNode(), + /*filter=*/"", + {"t0", "v1"}, + core::JoinType::kInner) + .hashJoin( + {"t0", "v1"}, + {"w0", "w1"}, + PlanBuilder(planNodeIdGenerator).values({w}).planNode(), + /*filter=*/"", + {"t0", "w1"}, + core::JoinType::kFull) + .planNode(); + EXPECT_EQ( + *queryRunner->toSql(plan), + "SELECT t0, w1" + " FROM (SELECT t0, v1 FROM (SELECT t0, t1 FROM t_0 LEFT JOIN t_1 ON t0 = u0)" + " INNER JOIN t_3 ON t0 = v0)" + " FULL OUTER JOIN t_5 ON t0 = w0 AND v1 = w1"); + } +} + } // namespace facebook::velox::exec::test From 290d37937f208bcf2aea4593f10e8e7f147e824a Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Thu, 9 Jan 2025 16:13:21 -0800 Subject: [PATCH 2/6] feat(fuzzer): Update "tryFlipJoinSides" functions to handle multi-joins (#11938) Summary: The function should traverse the plan tree and recursively flip the sides of all join nodes that are eligible to be flipped. This is in preparation to make the Join Fuzzer produce plans with multiple joins. Flipping join sides help make alternate plans that should be logically equivalent. Reviewed By: kgpai Differential Revision: D67606686 --- velox/exec/fuzzer/JoinFuzzer.cpp | 77 +++++++++++++++++++++++++------- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 21e4a7de81fc..2a6c77f9631e 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -110,6 +110,12 @@ class JoinFuzzer { numGroups(_numGroups) {} }; + static core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode); + static core::PlanNodePtr tryFlipJoinSides( + const core::MergeJoinNode& joinNode); + static core::PlanNodePtr tryFlipJoinSides( + const core::NestedLoopJoinNode& joinNode); + private: static VectorFuzzer::Options getFuzzerOptions() { VectorFuzzer::Options opts; @@ -135,6 +141,10 @@ class JoinFuzzer { // Randomly pick a join type to test. core::JoinType pickJoinType(); + template + static std::pair tryFlipJoinSidesHelper( + const TNode& joinNode); + // Makes the query plan with default settings in JoinFuzzer and value inputs // for both probe and build sides. // @@ -605,67 +615,100 @@ std::optional tryFlipJoinType(core::JoinType joinType) { } } +template +std::pair +JoinFuzzer::tryFlipJoinSidesHelper(const TNode& joinNode) { + core::PlanNodePtr left = joinNode.sources()[0]; + core::PlanNodePtr right = joinNode.sources()[1]; + if (auto leftJoinInput = + std::dynamic_pointer_cast(joinNode.sources()[0])) { + left = JoinFuzzer::tryFlipJoinSides(*leftJoinInput); + } + if (auto rightJoinInput = + std::dynamic_pointer_cast(joinNode.sources()[1])) { + right = JoinFuzzer::tryFlipJoinSides(*rightJoinInput); + } + return make_pair(left, right); +} + // Returns a plan with flipped join sides of the input hash join node. If the -// join type doesn't allow flipping, returns a nullptr. -core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode) { +// inputs of the join node are other hash join nodes, recursively flip the join +// sides of those join nodes as well. If the join type doesn't allow flipping, +// returns a nullptr. +core::PlanNodePtr JoinFuzzer::tryFlipJoinSides( + const core::HashJoinNode& joinNode) { // Null-aware right semi project join doesn't support filter. if (joinNode.filter() && joinNode.joinType() == core::JoinType::kLeftSemiProject && joinNode.isNullAware()) { return nullptr; } + auto flippedJoinType = tryFlipJoinType(joinNode.joinType()); - if (!flippedJoinType.has_value()) { + if (!flippedJoinType) { return nullptr; } + auto [left, right] = + JoinFuzzer::tryFlipJoinSidesHelper(joinNode); return std::make_shared( joinNode.id(), - flippedJoinType.value(), + *flippedJoinType, joinNode.isNullAware(), joinNode.rightKeys(), joinNode.leftKeys(), joinNode.filter(), - joinNode.sources()[1], - joinNode.sources()[0], + right, + left, joinNode.outputType()); } // Returns a plan with flipped join sides of the input merge join node. If the +// inputs of the join node are other merge join nodes, recursively flip the join +// sides of those join nodes as well. If the // join type doesn't allow flipping, returns a nullptr. -core::PlanNodePtr tryFlipJoinSides(const core::MergeJoinNode& joinNode) { +core::PlanNodePtr JoinFuzzer::tryFlipJoinSides( + const core::MergeJoinNode& joinNode) { // Merge join only supports inner and left join, so only inner join can be // flipped. if (joinNode.joinType() != core::JoinType::kInner) { return nullptr; } - auto flippedJoinType = core::JoinType::kInner; + + auto [left, right] = + JoinFuzzer::tryFlipJoinSidesHelper(joinNode); return std::make_shared( joinNode.id(), - flippedJoinType, + core::JoinType::kInner, joinNode.rightKeys(), joinNode.leftKeys(), joinNode.filter(), - joinNode.sources()[1], - joinNode.sources()[0], + right, + left, joinNode.outputType()); } // Returns a plan with flipped join sides of the input nested loop join node. If -// the join type doesn't allow flipping, returns a nullptr. -core::PlanNodePtr tryFlipJoinSides(const core::NestedLoopJoinNode& joinNode) { +// the inputs of the join node are other nested loop join nodes, recursively +// flip the join sides of those join nodes as well. If the join type doesn't +// allow flipping, returns a nullptr. +core::PlanNodePtr JoinFuzzer::tryFlipJoinSides( + const core::NestedLoopJoinNode& joinNode) { auto flippedJoinType = tryFlipJoinType(joinNode.joinType()); - if (!flippedJoinType.has_value()) { + if (!flippedJoinType) { return nullptr; } + auto [left, right] = + JoinFuzzer::tryFlipJoinSidesHelper(joinNode); + return std::make_shared( joinNode.id(), flippedJoinType.value(), joinNode.joinCondition(), - joinNode.sources()[1], - joinNode.sources()[0], + right, + left, joinNode.outputType()); } @@ -819,7 +862,7 @@ void addFlippedJoinPlan( int32_t numGroups = 0) { auto joinNode = std::dynamic_pointer_cast(plan); VELOX_CHECK_NOT_NULL(joinNode); - if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { + if (auto flippedPlan = JoinFuzzer::tryFlipJoinSides(*joinNode)) { plans.push_back(JoinFuzzer::PlanWithSplits{ flippedPlan, probeScanId, From 63faa562d57cb88e3e9113999e6a51b3ae39ebb7 Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Thu, 9 Jan 2025 16:13:21 -0800 Subject: [PATCH 3/6] feat(fuzzer): Update "makeDefaultPlan" function to generate multi-join plans (#11939) Summary: Generates a cascading multi-join from left to right. ``` [t1, t2, t3, t4] t1 t2 \ / a t3 \ / b t4 \ / c ``` Differential Revision: D67607316 --- velox/exec/fuzzer/JoinFuzzer.cpp | 99 +++++++++++++++++--------------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 2a6c77f9631e..7da27da10b15 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -145,20 +145,18 @@ class JoinFuzzer { static std::pair tryFlipJoinSidesHelper( const TNode& joinNode); - // Makes the query plan with default settings in JoinFuzzer and value inputs - // for both probe and build sides. + // Makes the query plan with default settings in JoinFuzzer using inputs + // joining each input from left to right. // - // NOTE: 'probeInput' and 'buildInput' could either input rows with lazy - // vectors or flatten ones. + // NOTE: inputs can be rows with lazy vectors or flattened ones. JoinFuzzer::PlanWithSplits makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList); JoinFuzzer::PlanWithSplits makeMergeJoinPlan( core::JoinType joinType, @@ -740,28 +738,37 @@ std::vector fieldNames( } JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - auto plan = + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) - .values(probeInput) + .values(inputs[0]) .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - filter, - outputColumns, - joinType, - nullAware) - .planNode(); - return PlanWithSplits{plan}; + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator).values(inputs[1]).planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator).values(inputs[i + 1]).planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + } + return PlanWithSplits{plan.planNode()}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( @@ -1181,14 +1188,13 @@ void JoinFuzzer::verify(core::JoinType joinType) { shuffleJoinKeys(probeKeys, buildKeys); const auto defaultPlan = makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - filter); + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {probeInput, buildInput}, + {outputColumns}, + {filter}); const auto expected = execute(defaultPlan, /*injectSpill=*/false); @@ -1211,14 +1217,13 @@ void JoinFuzzer::verify(core::JoinType joinType) { std::vector altPlans; altPlans.push_back(makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - outputColumns, - filter)); + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter})); makeAlternativePlans( defaultPlan.plan, probeInput, buildInput, altPlans, filter); From 25b077d110874fad219d4b4ef9b3a49f5cb4a0b1 Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Thu, 9 Jan 2025 16:13:21 -0800 Subject: [PATCH 4/6] feat(fuzzer): Update table scan plan functions to generate multi-join plans (#11940) Summary: Generates a cascading multi-join from left to right with table scans as the inputs. ``` [t1, t2, t3, t4] t1 t2 \ / a t3 \ / b t4 \ / c ``` Differential Revision: D67607533 --- velox/exec/fuzzer/JoinFuzzer.cpp | 652 ++++++++++++++----------------- 1 file changed, 301 insertions(+), 351 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 7da27da10b15..23b28de70682 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -82,29 +82,25 @@ class JoinFuzzer { void go(); + struct InputAndSplit { + const std::vector* input; + const std::vector split; + }; + struct PlanWithSplits { core::PlanNodePtr plan; - core::PlanNodeId probeScanId; - core::PlanNodeId buildScanId; - std::unordered_map> - splits; + std::unordered_map splits; core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; int32_t numGroups; explicit PlanWithSplits( const core::PlanNodePtr& _plan, - const core::PlanNodeId& _probeScanId = "", - const core::PlanNodeId& _buildScanId = "", - const std::unordered_map< - core::PlanNodeId, - std::vector>& _splits = {}, + const std::unordered_map& _splits = {}, core::ExecutionStrategy _executionStrategy = core::ExecutionStrategy::kUngrouped, int32_t _numGroups = 0) : plan(_plan), - probeScanId(_probeScanId), - buildScanId(_buildScanId), splits(_splits), executionStrategy(_executionStrategy), numGroups(_numGroups) {} @@ -168,71 +164,53 @@ class JoinFuzzer { const std::string& filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. - // If withFilter is true, uses the equality filter between probeKeys and - // buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - const std::string& filter); + const std::string& joinCondition); - // Makes the default query plan with table scan as inputs for both probe and - // build sides. + // Makes the default query plan with table scan as inputs for all of the + // inputs. JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList); JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList); // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan - // nodes. If withFilter is true, uses the equiality filter between probeKeys - // and buildKeys as the join filter. Uses empty join filter otherwise. + // nodes. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList); void makeAlternativePlans( - const core::PlanNodePtr& plan, + const PlanWithSplits& plan, const std::vector& probeInput, const std::vector& buildInput, std::vector& plans, const std::string& filter); - // Makes the query plan from 'planWithTableScan' with grouped execution mode. - // Correspondingly, it replaces the table scan input splits with grouped ones. - JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( - const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits); - // Runs one test iteration from query plans generations, executions and result // verifications. void verify(core::JoinType joinType); @@ -260,15 +238,16 @@ class JoinFuzzer { void addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter); + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector probeTypeList, + const std::vector buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& altPlans); // Splits the input into groups by partitioning on the join keys. std::vector> splitInputByGroup( @@ -279,9 +258,9 @@ class JoinFuzzer { // Generates the grouped splits. std::vector generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const std::string& name, + const size_t numKeys, const std::vector& input); RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); @@ -297,8 +276,6 @@ class JoinFuzzer { RowVectorPtr testCrossProduct( const std::string& tableDir, core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput); @@ -536,13 +513,18 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [planNodeId, nodeSplits] : plan.splits) { - builder.splits(planNodeId, nodeSplits); + for (const auto& [planNodeId, inputAndSplit] : plan.splits) { + builder.splits(planNodeId, inputAndSplit.split); } if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { builder.executionStrategy(core::ExecutionStrategy::kGrouped); - builder.groupedExecutionLeafNodeIds({plan.probeScanId, plan.buildScanId}); + std::unordered_set ids; + ids.reserve(plan.splits.size()); + for (const auto& [id, _] : plan.splits) { + ids.insert(id); + } + builder.groupedExecutionLeafNodeIds(ids); builder.numSplitGroups(plan.numGroups); builder.numConcurrentSplitGroups(randInt(1, plan.numGroups)); } @@ -772,54 +754,51 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( } JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputsAndSplits.size() > 1); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType, - nullAware) - .planNode(); - return PlanWithSplits{ - plan, - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; -} - -JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( - const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits) { - return PlanWithSplits{ - planWithTableScan.plan, - planWithTableScan.probeScanId, - planWithTableScan.buildScanId, - {{planWithTableScan.probeScanId, groupedProbeScanSplits}, - {planWithTableScan.buildScanId, groupedBuildScanSplits}}, - core::ExecutionStrategy::kGrouped, - numGroups}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .hashJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } std::vector makeSources( @@ -858,25 +837,13 @@ std::string makeJoinFilter( template void addFlippedJoinPlan( - const core::PlanNodePtr& plan, - std::vector& plans, - const core::PlanNodeId& probeScanId = "", - const core::PlanNodeId& buildScanId = "", - const std::unordered_map>& - splits = {}, - core::ExecutionStrategy executionStrategy = - core::ExecutionStrategy::kUngrouped, - int32_t numGroups = 0) { - auto joinNode = std::dynamic_pointer_cast(plan); + const JoinFuzzer::PlanWithSplits& plan, + std::vector& plans) { + auto joinNode = std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); if (auto flippedPlan = JoinFuzzer::tryFlipJoinSides(*joinNode)) { plans.push_back(JoinFuzzer::PlanWithSplits{ - flippedPlan, - probeScanId, - buildScanId, - splits, - executionStrategy, - numGroups}); + flippedPlan, plan.splits, plan.executionStrategy, plan.numGroups}); } } @@ -907,31 +874,30 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - const std::string& filter) { + const std::string& joinCondition) { auto planNodeIdGenerator = std::make_shared(); return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .values(probeInput) .nestedLoopJoin( PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - filter, + joinCondition, outputColumns, joinType) .planNode()}; } void JoinFuzzer::makeAlternativePlans( - const core::PlanNodePtr& plan, + const PlanWithSplits& plan, const std::vector& probeInput, const std::vector& buildInput, std::vector& plans, const std::string& filter) { - auto joinNode = std::dynamic_pointer_cast(plan); + auto joinNode = + std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); // Flip join sides. @@ -973,7 +939,7 @@ void JoinFuzzer::makeAlternativePlans( filter); plans.push_back(planWithSplits); - addFlippedJoinPlan(planWithSplits.plan, plans); + addFlippedJoinPlan(planWithSplits, plans); } // Use NestedLoopJoin. @@ -983,16 +949,10 @@ void JoinFuzzer::makeAlternativePlans( : fmt::format( "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - joinCondition); + joinType, probeInput, buildInput, outputColumns, joinCondition); plans.push_back(planWithSplits); - addFlippedJoinPlan(planWithSplits.plan, plans); + addFlippedJoinPlan(planWithSplits, plans); } } @@ -1020,8 +980,6 @@ void JoinFuzzer::shuffleJoinKeys( RowVectorPtr JoinFuzzer::testCrossProduct( const std::string& tableDir, core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput) { VELOX_CHECK_GT(probeInput.size(), 0); @@ -1035,8 +993,6 @@ RowVectorPtr JoinFuzzer::testCrossProduct( auto plan = makeNestedLoopJoinPlan( joinType, - probeKeys, - buildKeys, probeInput, buildInput, outputColumns, @@ -1061,23 +1017,22 @@ RowVectorPtr JoinFuzzer::testCrossProduct( std::vector altPlans; if (isTableScanSupported(probeInput[0]->type()) && isTableScanSupported(buildInput[0]->type())) { - std::vector probeScanSplits = - makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + InputAndSplit probeInputAndSplit{ + &probeInput, + makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_)}; + InputAndSplit buildInputAndSplit{ + &buildInput, + makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_)}; altPlans.push_back(makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - /*filter=*/"")); + {joinType}, + {probeType}, + {buildType}, + {probeInputAndSplit, buildInputAndSplit}, + {outputColumns}, + /*filter=*/{""})); } - addFlippedJoinPlan(plan.plan, altPlans); + addFlippedJoinPlan(plan, altPlans); for (const auto& altPlan : altPlans) { auto actual = execute(altPlan, /*injectSpill=*/false); @@ -1148,19 +1103,9 @@ void JoinFuzzer::verify(core::JoinType joinType) { stats_.numCrossProduct++; auto result = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput); + tableScanDir->getPath(), joinType, probeInput, buildInput); auto flatResult = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput); + tableScanDir->getPath(), joinType, flatProbeInput, flatBuildInput); assertEqualResults({result}, {flatResult}); } } @@ -1225,22 +1170,22 @@ void JoinFuzzer::verify(core::JoinType joinType) { {outputColumns}, {filter})); + makeAlternativePlans(defaultPlan, probeInput, buildInput, altPlans, filter); makeAlternativePlans( - defaultPlan.plan, probeInput, buildInput, altPlans, filter); - makeAlternativePlans( - defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans, filter); + defaultPlan, flatProbeInput, flatBuildInput, altPlans, filter); addPlansWithTableScan( tableScanDir->getPath(), - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - outputColumns, - altPlans, - filter); + {joinType}, + {nullAware}, + {asRowType(flatProbeInput[0]->type())}, + {asRowType(flatBuildInput[0]->type())}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter}, + altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; @@ -1285,111 +1230,128 @@ void JoinFuzzer::verify(core::JoinType joinType) { } JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputsAndSplits.size() > 1); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .orderBy(probeKeysList[0], false) + .mergeJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .orderBy(buildKeysList[0], false) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.mergeJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector joinTypes, + const std::vector& probeTypeList, + const std::vector& buildTypeList, + const std::vector& inputsAndSplits, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + std::unordered_map splits; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(probeTypeList[0]) + .capturePlanNodeId(probeScanId) + .nestedLoopJoin( + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[0]) + .capturePlanNodeId(buildScanId) + .planNode(), + joinConditionList[0], + outputColumnsList[0], + joinTypes[0]); + splits.insert( + {{probeScanId, inputsAndSplits[0]}, {buildScanId, inputsAndSplits[1]}}); + for (int i = 1; i < inputsAndSplits.size() - 1; i++) { + plan = plan.nestedLoopJoin( + PlanBuilder(planNodeIdGenerator) + .tableScan(buildTypeList[i]) + .capturePlanNodeId(buildScanId) + .planNode(), + joinConditionList[i], + outputColumnsList[i], + joinTypes[i]); + splits.insert({buildScanId, inputsAndSplits[i + 1]}); + } + return PlanWithSplits{plan.planNode(), splits}; } void JoinFuzzer::addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector& nullAwareList, + const std::vector probeTypeList, + const std::vector buildTypeList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& altPlans) { VELOX_CHECK(!tableDir.empty()); + VELOX_CHECK(inputs.size() > 1); - if (!isTableScanSupported(probeInput[0]->type()) || - !isTableScanSupported(buildInput[0]->type())) { - return; - } - - std::vector probeScanSplits = - makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + std::vector inputsAndSplits; + for (int i = 0; i < inputs.size(); i++) { + if (!isTableScanSupported(inputs[i][0]->type())) { + return; + } - const auto probeType = asRowType(probeInput[0]->type()); - const auto buildType = asRowType(buildInput[0]->type()); + inputsAndSplits.push_back(InputAndSplit{ + &inputs[i], + makeSplits( + inputs[i], fmt::format("{}/table_{}", tableDir, i), writerPool_)}); + } std::vector plansWithTableScan; auto defaultPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + joinTypes, + nullAwareList, + probeTypeList, + buildTypeList, + probeKeysList, + buildKeysList, + inputsAndSplits, + outputColumnsList, + filterList); plansWithTableScan.push_back(defaultPlan); auto joinNode = @@ -1397,94 +1359,86 @@ void JoinFuzzer::addPlansWithTableScan( VELOX_CHECK_NOT_NULL(joinNode); // Flip join sides. - addFlippedJoinPlan( - defaultPlan.plan, - plansWithTableScan, - defaultPlan.probeScanId, - defaultPlan.buildScanId, - defaultPlan.splits); - - const int32_t numGroups = randInt(1, probeScanSplits.size()); - const std::vector groupedProbeScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/true, - probeKeys.size(), - probeInput); - const std::vector groupedBuildScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/false, - buildKeys.size(), - buildInput); + addFlippedJoinPlan(defaultPlan, plansWithTableScan); + + const int32_t numGroups = randInt(1, inputsAndSplits[0].split.size()); + std::unordered_map groupedSplits; + for (int i = 0; const auto& [id, inputAndSplit] : defaultPlan.splits) { + groupedSplits.insert( + {id, + InputAndSplit{ + inputAndSplit.input, + generateSplitsWithGroup( + tableDir, + numGroups, + /*name=*/fmt::format("table_{}", i), + /*numKeys=*/i == 0 ? probeKeysList[i].size() + : buildKeysList[i - 1].size(), + *inputAndSplit.input)}}); + i++; + } for (const auto& planWithTableScan : plansWithTableScan) { altPlans.push_back(planWithTableScan); - altPlans.push_back(makeGroupedExecutionPlanWithTableScan( - planWithTableScan, - numGroups, - groupedProbeScanSplits, - groupedBuildScanSplits)); + altPlans.push_back(PlanWithSplits{ + planWithTableScan.plan, + groupedSplits, + core::ExecutionStrategy::kGrouped, + numGroups}); } // Add ungrouped MergeJoin with TableScan. if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + joinTypes, + probeTypeList, + buildTypeList, + probeKeysList, + buildKeysList, + inputsAndSplits, + outputColumnsList, + filterList); altPlans.push_back(planWithSplits); - addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + addFlippedJoinPlan(planWithSplits, altPlans); } // Add ungrouped NestedLoopJoin with TableScan. - if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { - std::string joinCondition = filter.empty() - ? makeJoinFilter(probeKeys, buildKeys) - : fmt::format( - "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); + bool NLJSupported = true; + for (const core::JoinType joinType : joinTypes) { + if (!core::NestedLoopJoinNode::isSupported(joinType)) { + NLJSupported = false; + } + } + if (NLJSupported) { + std::vector joinConditionList; + for (int i = 0; i < probeKeysList.size(); i++) { + joinConditionList.push_back( + filterList[i].empty() + ? makeJoinFilter(probeKeysList[i], buildKeysList[i]) + : fmt::format( + "{} AND {}", + makeJoinFilter(probeKeysList[i], buildKeysList[i]), + filterList[i])); + } auto planWithSplits = makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - joinCondition); + joinTypes, + probeTypeList, + buildTypeList, + inputsAndSplits, + outputColumnsList, + joinConditionList); altPlans.push_back(planWithSplits); - addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + addFlippedJoinPlan(planWithSplits, altPlans); } } std::vector JoinFuzzer::generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const std::string& name, + const size_t numKeys, const std::vector& input) { const std::vector> inputVectorsByGroup = splitInputByGroup(numGroups, numKeys, input); @@ -1492,12 +1446,8 @@ std::vector JoinFuzzer::generateSplitsWithGroup( std::vector splitsWithGroup; for (int32_t groupId = 0; groupId < numGroups; ++groupId) { for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { - const std::string filePath = fmt::format( - "{}/grouped[{}].{}.{}", - tableDir, - groupId, - isProbe ? "probe" : "build", - i); + const std::string filePath = + fmt::format("{}/grouped[{}].{}.{}", tableDir, groupId, name, i); writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); splitsWithGroup.emplace_back(makeConnectorSplit(filePath), groupId); } From 67b4ab4306dc23fd5e17d1d4fadd65da3f0c772b Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Thu, 9 Jan 2025 16:13:21 -0800 Subject: [PATCH 5/6] feat(fuzzer): Update make[merge|NLJ]Plan functions to generate multi-join plans (#11941) Summary: Change makeMergeJoinPlan and makeNestedLoopJoinPlan functions to produce cascading multi-joins. Differential Revision: D67607605 --- velox/exec/fuzzer/JoinFuzzer.cpp | 131 +++++++++++++++++-------------- 1 file changed, 73 insertions(+), 58 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 23b28de70682..7eed6d0a3a65 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -155,21 +155,19 @@ class JoinFuzzer { const std::vector& filterList); JoinFuzzer::PlanWithSplits makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); + const std::vector& joinTypes, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& joinCondition); + const std::vector& joinTypes, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList); // Makes the default query plan with table scan as inputs for all of the // inputs. @@ -848,46 +846,65 @@ void addFlippedJoinPlan( } JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .values(buildInput) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode()}; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .values(inputs[0]) + .orderBy(probeKeysList[0], false) + .mergeJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .values(inputs[1]) + .orderBy(buildKeysList[0], false) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.mergeJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .values(inputs[i + 1]) + .orderBy(buildKeysList[i], false) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i]); + } + return PlanWithSplits{plan.planNode()}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& joinCondition) { + const std::vector& joinTypes, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{ + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) - .values(probeInput) + .values(inputs[0]) .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - joinCondition, - outputColumns, - joinType) - .planNode()}; + PlanBuilder(planNodeIdGenerator).values(inputs[1]).planNode(), + joinConditionList[0], + outputColumnsList[0], + joinTypes[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.nestedLoopJoin( + PlanBuilder(planNodeIdGenerator).values(inputs[i + 1]).planNode(), + joinConditionList[i], + outputColumnsList[i], + joinTypes[i]); + } + return PlanWithSplits{plan.planNode()}; } void JoinFuzzer::makeAlternativePlans( @@ -930,13 +947,12 @@ void JoinFuzzer::makeAlternativePlans( // Use OrderBy + MergeJoin if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - filter); + {joinType}, + {probeKeys}, + {buildKeys}, + {probeInput, buildInput}, + {outputColumns}, + {filter}); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); @@ -949,7 +965,7 @@ void JoinFuzzer::makeAlternativePlans( : fmt::format( "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlan( - joinType, probeInput, buildInput, outputColumns, joinCondition); + {joinType}, {probeInput, buildInput}, {outputColumns}, {joinCondition}); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); @@ -992,11 +1008,10 @@ RowVectorPtr JoinFuzzer::testCrossProduct( ->names(); auto plan = makeNestedLoopJoinPlan( - joinType, - probeInput, - buildInput, - outputColumns, - /*filter=*/""); + {joinType}, + {probeInput, buildInput}, + {outputColumns}, + /*filterList=*/{""}); const auto expected = execute(plan, /*injectSpill=*/false); // If OOM injection is not enabled verify the results against Reference query From 91519e0cf378e93952ad655756753f6035d92610 Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Thu, 9 Jan 2025 16:13:21 -0800 Subject: [PATCH 6/6] feat(fuzzer): Update makeAlternatePlans to generate multi-join plans (#11942) Summary: Change makeAlternatePlans functions to produce cascading multi-joins. Differential Revision: D67607654 --- velox/exec/fuzzer/JoinFuzzer.cpp | 128 +++++++++++++++++++++---------- 1 file changed, 88 insertions(+), 40 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 7eed6d0a3a65..7fae551560c0 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -204,10 +204,14 @@ class JoinFuzzer { void makeAlternativePlans( const PlanWithSplits& plan, - const std::vector& probeInput, - const std::vector& buildInput, - std::vector& plans, - const std::string& filter); + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& plans); // Runs one test iteration from query plans generations, executions and result // verifications. @@ -909,10 +913,14 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( void JoinFuzzer::makeAlternativePlans( const PlanWithSplits& plan, - const std::vector& probeInput, - const std::vector& buildInput, - std::vector& plans, - const std::string& filter) { + const std::vector joinTypes, + const std::vector nullAwareList, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList, + std::vector& plans) { auto joinNode = std::dynamic_pointer_cast(plan.plan); VELOX_CHECK_NOT_NULL(joinNode); @@ -920,52 +928,75 @@ void JoinFuzzer::makeAlternativePlans( // Flip join sides. addFlippedJoinPlan(plan, plans); - // Parallelize probe and build sides. - const auto probeKeys = fieldNames(joinNode->leftKeys()); - const auto buildKeys = fieldNames(joinNode->rightKeys()); - const auto outputColumns = joinNode->outputType()->names(); - const auto joinType = joinNode->joinType(); - + // Add plan with local partition round robin for inputs. auto planNodeIdGenerator = std::make_shared(); - plans.push_back(JoinFuzzer::PlanWithSplits{ + PlanBuilder partitionPlan = PlanBuilder(planNodeIdGenerator) - .localPartitionRoundRobin( - makeSources(probeInput, planNodeIdGenerator)) + .localPartitionRoundRobin(makeSources(inputs[0], planNodeIdGenerator)) .hashJoin( - probeKeys, - buildKeys, + probeKeysList[0], + buildKeysList[0], PlanBuilder(planNodeIdGenerator) .localPartitionRoundRobin( - makeSources(buildInput, planNodeIdGenerator)) + makeSources(inputs[1], planNodeIdGenerator)) .planNode(), - filter, - outputColumns, - joinType, - joinNode->isNullAware()) - .planNode()}); + filterList[0], + outputColumnsList[0], + joinTypes[0], + nullAwareList[0]); + for (int i = 1; i < inputs.size() - 1; i++) { + partitionPlan.hashJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .localPartitionRoundRobin( + makeSources(inputs[i + 1], planNodeIdGenerator)) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i], + nullAwareList[i]); + } + plans.push_back(PlanWithSplits{partitionPlan.planNode()}); + bool mergeJoinSupported = true; + bool NLJSupported = true; + for (const core::JoinType& joinType : joinTypes) { + if (!core::NestedLoopJoinNode::isSupported(joinType)) { + NLJSupported = false; + } + if (!core::MergeJoinNode::isSupported(joinType)) { + mergeJoinSupported = false; + } + } // Use OrderBy + MergeJoin - if (core::MergeJoinNode::isSupported(joinNode->joinType())) { + if (mergeJoinSupported) { auto planWithSplits = makeMergeJoinPlan( - {joinType}, - {probeKeys}, - {buildKeys}, - {probeInput, buildInput}, - {outputColumns}, - {filter}); + joinTypes, + probeKeysList, + buildKeysList, + inputs, + outputColumnsList, + filterList); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); } // Use NestedLoopJoin. - if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { - std::string joinCondition = filter.empty() - ? makeJoinFilter(probeKeys, buildKeys) - : fmt::format( - "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); + if (NLJSupported) { + std::vector joinConditionList; + for (int i = 0; i < probeKeysList.size(); i++) { + joinConditionList.push_back( + filterList[0].empty() + ? makeJoinFilter(probeKeysList[i], buildKeysList[i]) + : fmt::format( + "{} AND {}", + makeJoinFilter(probeKeysList[i], buildKeysList[i]), + filterList[i])); + } auto planWithSplits = makeNestedLoopJoinPlan( - {joinType}, {probeInput, buildInput}, {outputColumns}, {joinCondition}); + joinTypes, inputs, outputColumnsList, joinConditionList); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); @@ -1185,9 +1216,26 @@ void JoinFuzzer::verify(core::JoinType joinType) { {outputColumns}, {filter})); - makeAlternativePlans(defaultPlan, probeInput, buildInput, altPlans, filter); makeAlternativePlans( - defaultPlan, flatProbeInput, flatBuildInput, altPlans, filter); + defaultPlan, + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {probeInput, buildInput}, + {outputColumns}, + {filter}, + altPlans); + makeAlternativePlans( + defaultPlan, + {joinType}, + {nullAware}, + {probeKeys}, + {buildKeys}, + {flatProbeInput, flatBuildInput}, + {outputColumns}, + {filter}, + altPlans); addPlansWithTableScan( tableScanDir->getPath(),