diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp new file mode 100644 index 00000000000..59f4587da79 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -0,0 +1,356 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace FailPoints +{ +extern const char minimum_block_size_for_cross_join[]; +} // namespace FailPoints + +namespace +{ +void recordJoinExecuteInfo( + DAGContext & dag_context, + const String & executor_id, + const String & build_side_executor_id, + const JoinPtr & join_ptr) +{ + JoinExecuteInfo join_execute_info; + join_execute_info.build_side_root_executor_id = build_side_executor_id; + join_execute_info.join_profile_info = join_ptr->profile_info; + RUNTIME_CHECK(join_execute_info.join_profile_info); + dag_context.getJoinExecuteInfoMap()[executor_id] = std::move(join_execute_info); +} +} // namespace + +PhysicalPlanNodePtr PhysicalJoin::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Join & join, + const FineGrainedShuffle & fine_grained_shuffle, + const PhysicalPlanNodePtr & left, + const PhysicalPlanNodePtr & right) +{ + RUNTIME_CHECK(left); + RUNTIME_CHECK(right); + + const Block & left_input_header = left->getSampleBlock(); + const Block & right_input_header = right->getSampleBlock(); + + JoinInterpreterHelper::TiFlashJoin tiflash_join(join, context.isTest()); + + const auto & probe_plan = tiflash_join.build_side_index == 0 ? right : left; + const auto & build_plan = tiflash_join.build_side_index == 0 ? left : right; + const auto probe_source_columns = tiflash_join.build_side_index == 0 + ? JoinInterpreterHelper::genDAGExpressionAnalyzerSourceColumns(right_input_header, right->getSchema()) + : JoinInterpreterHelper::genDAGExpressionAnalyzerSourceColumns(left_input_header, left->getSchema()); + const auto & build_source_columns = tiflash_join.build_side_index == 0 + ? JoinInterpreterHelper::genDAGExpressionAnalyzerSourceColumns(left_input_header, left->getSchema()) + : JoinInterpreterHelper::genDAGExpressionAnalyzerSourceColumns(right_input_header, right->getSchema()); + + String match_helper_name = tiflash_join.genMatchHelperName(left_input_header, right_input_header); + NamesAndTypes join_output_schema + = tiflash_join.genJoinOutputColumns(left->getSchema(), right->getSchema(), match_helper_name); + auto & dag_context = *context.getDAGContext(); + + /// add necessary transformation if the join key is an expression + + JoinNonEqualConditions join_non_equal_conditions; + // prepare probe side + auto [probe_side_prepare_actions, probe_key_names, original_probe_key_names, probe_filter_column_name] + = JoinInterpreterHelper::prepareJoin( + context, + probe_source_columns, + tiflash_join.getProbeJoinKeys(), + tiflash_join.join_key_types, + tiflash_join.getProbeConditions()); + RUNTIME_ASSERT(probe_side_prepare_actions, log, "probe_side_prepare_actions cannot be nullptr"); + /// in TiFlash, left side is always the probe side + join_non_equal_conditions.left_filter_column = std::move(probe_filter_column_name); + + // prepare build side + auto [build_side_prepare_actions, build_key_names, original_build_key_names, build_filter_column_name] + = JoinInterpreterHelper::prepareJoin( + context, + build_source_columns, + tiflash_join.getBuildJoinKeys(), + tiflash_join.join_key_types, + tiflash_join.getBuildConditions()); + RUNTIME_ASSERT(build_side_prepare_actions, log, "build_side_prepare_actions cannot be nullptr"); + /// in TiFlash, right side is always the build side + join_non_equal_conditions.right_filter_column = std::move(build_filter_column_name); + + tiflash_join.fillJoinOtherConditionsAction( + context, + left->getSchema(), + right->getSchema(), + probe_side_prepare_actions, + original_probe_key_names, + original_build_key_names, + join_non_equal_conditions); + + const Settings & settings = context.getSettingsRef(); + size_t max_bytes_before_external_join = settings.max_bytes_before_external_join; + auto join_req_id = fmt::format("{}_{}", log->identifier(), executor_id); + SpillConfig build_spill_config( + context.getTemporaryPath(), + fmt::format("{}_0_build", join_req_id), + settings.max_cached_data_bytes_in_spiller, + settings.max_spilled_rows_per_file, + settings.max_spilled_bytes_per_file, + context.getFileProvider(), + settings.max_threads, + settings.max_block_size); + SpillConfig probe_spill_config( + context.getTemporaryPath(), + fmt::format("{}_0_probe", join_req_id), + settings.max_cached_data_bytes_in_spiller, + settings.max_spilled_rows_per_file, + settings.max_spilled_bytes_per_file, + context.getFileProvider(), + settings.max_threads, + settings.max_block_size); + size_t max_block_size = settings.max_block_size; + fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size = 1; }); + + String flag_mapped_entry_helper_name = tiflash_join.genFlagMappedEntryHelperName( + left_input_header, + right_input_header, + join_non_equal_conditions.other_cond_expr != nullptr); + + assert(build_key_names.size() == original_build_key_names.size()); + std::unordered_map build_key_names_map; + for (size_t i = 0; i < original_build_key_names.size(); ++i) + { + build_key_names_map[original_build_key_names[i]] = build_key_names[i]; + } + auto runtime_filter_list + = tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log); + LOG_DEBUG(log, "before register runtime filter list, list size:{}", runtime_filter_list.size()); + context.getDAGContext()->runtime_filter_mgr.registerRuntimeFilterList(runtime_filter_list); + + JoinPtr join_ptr = std::make_shared( + probe_key_names, + build_key_names, + tiflash_join.kind, + join_req_id, + fine_grained_shuffle.stream_count, + max_bytes_before_external_join, + build_spill_config, + probe_spill_config, + RestoreConfig{settings.join_restore_concurrency, 0, 0}, + join_output_schema, + [&](const OperatorSpillContextPtr & operator_spill_context) { + if (context.getDAGContext() != nullptr) + { + context.getDAGContext()->registerOperatorSpillContext(operator_spill_context); + } + }, + context.getDAGContext() != nullptr ? context.getDAGContext()->getAutoSpillTrigger() : nullptr, + tiflash_join.join_key_collators, + join_non_equal_conditions, + max_block_size, + settings.shallow_copy_cross_probe_threshold, + match_helper_name, + flag_mapped_entry_helper_name, + settings.join_probe_cache_columns_threshold, + context.isTest(), + runtime_filter_list); + + recordJoinExecuteInfo(dag_context, executor_id, build_plan->execId(), join_ptr); + + auto physical_join = std::make_shared( + executor_id, + join_output_schema, + fine_grained_shuffle, + log->identifier(), + probe_plan, + build_plan, + join_ptr, + probe_side_prepare_actions, + build_side_prepare_actions); + return physical_join; +} + +void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & context) +{ + const auto & settings = context.getSettingsRef(); + /// probe side streams + executeExpression( + probe_pipeline, + probe_side_prepare_actions, + log, + "append join key and join filters for probe side"); + /// add join input stream + String join_probe_extra_info = fmt::format( + "join probe, join_executor_id = {}, scan_hash_map_after_probe = {}", + execId(), + needScanHashMapAfterProbe(join_ptr->getKind())); + join_ptr->initProbe(probe_pipeline.firstStream()->getHeader(), probe_pipeline.streams.size()); + size_t probe_index = 0; + for (auto & stream : probe_pipeline.streams) + { + stream = std::make_shared( + stream, + join_ptr, + probe_index++, + log->identifier(), + settings.max_block_size); + stream->setExtraInfo(join_probe_extra_info); + } + join_ptr->setCancellationHook([&] { return context.isCancelled(); }); +} + +void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & context, size_t max_streams) +{ + auto & dag_context = *context.getDAGContext(); + size_t join_build_concurrency = build_pipeline.streams.size(); + + /// build side streams + executeExpression( + build_pipeline, + build_side_prepare_actions, + log, + "append join key and join filters for build side"); + // add a HashJoinBuildBlockInputStream to build a shared hash table + String join_build_extra_info = fmt::format("join build, build_side_root_executor_id = {}", build()->execId()); + if (fine_grained_shuffle.enabled()) + join_build_extra_info = fmt::format("{} {}", join_build_extra_info, String(enableFineGrainedShuffleExtraInfo)); + auto & join_execute_info = dag_context.getJoinExecuteInfoMap()[execId()]; + auto build_streams = [&](BlockInputStreams & streams) { + size_t build_index = 0; + for (auto & stream : streams) + { + stream + = std::make_shared(stream, join_ptr, build_index++, log->identifier()); + stream->setExtraInfo(join_build_extra_info); + join_execute_info.join_build_streams.push_back(stream); + } + }; + build_streams(build_pipeline.streams); + // for test, join executor need the return blocks to output. + executeUnion( + build_pipeline, + max_streams, + context.getSettingsRef().max_buffered_bytes_in_executor, + log, + /*ignore_block=*/!context.isTest(), + "for join"); + + SubqueryForSet build_query; + build_query.source = build_pipeline.firstStream(); + build_query.join = join_ptr; + join_ptr->initBuild(build_query.source->getHeader(), join_build_concurrency); + dag_context.addSubquery(execId(), std::move(build_query)); +} + +void PhysicalJoin::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + /// The build side needs to be transformed first. + { + DAGPipeline build_pipeline; + build()->buildBlockInputStream(build_pipeline, context, max_streams); + buildSideTransform(build_pipeline, context, max_streams); + } + + { + DAGPipeline & probe_pipeline = pipeline; + probe()->buildBlockInputStream(probe_pipeline, context, max_streams); + probeSideTransform(probe_pipeline, context); + } +} + +void PhysicalJoin::buildPipeline(PipelineBuilder & builder, Context & context, PipelineExecutorContext & exec_context) +{ + // Break the pipeline for join build. + auto join_build = std::make_shared( + executor_id, + build()->getSchema(), + fine_grained_shuffle, + req_id, + build(), + join_ptr, + build_side_prepare_actions); + auto join_build_builder = builder.breakPipeline(join_build); + // Join build pipeline. + build()->buildPipeline(join_build_builder, context, exec_context); + join_build_builder.build(); + + // Join probe pipeline. + probe()->buildPipeline(builder, context, exec_context); + auto join_probe = std::make_shared( + executor_id, + schema, + req_id, + probe(), + join_ptr, + probe_side_prepare_actions); + builder.addPlanNode(join_probe); +} + +void PhysicalJoin::finalizeImpl(const Names & parent_require) +{ + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); + join_ptr->finalize(parent_require); + auto required_input_columns = join_ptr->getRequiredColumns(); + + Names build_required; + Names probe_required; + const auto & build_sample_block = build_side_prepare_actions->getSampleBlock(); + for (const auto & name : required_input_columns) + { + if (build_sample_block.has(name)) + build_required.push_back(name); + else + /// if name not exists in probe side, it will throw error when call `probe_size_prepare_actions->finalize(probe_required)` + probe_required.push_back(name); + } + + build_side_prepare_actions->finalize(build_required); + build()->finalize(build_side_prepare_actions->getRequiredColumns()); + FinalizeHelper::prependProjectInputIfNeed(build_side_prepare_actions, build()->getSampleBlock().columns()); + + probe_side_prepare_actions->finalize(probe_required); + probe()->finalize(probe_side_prepare_actions->getRequiredColumns()); + FinalizeHelper::prependProjectInputIfNeed(probe_side_prepare_actions, probe()->getSampleBlock().columns()); +} + +const Block & PhysicalJoin::getSampleBlock() const +{ + return join_ptr->getOutputBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp new file mode 100644 index 00000000000..a62e1f7cdc8 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp @@ -0,0 +1,45 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +void PhysicalJoinBuild::buildPipelineExecGroupImpl( + PipelineExecutorContext & exec_context, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t /*concurrency*/) +{ + executeExpression(exec_context, group_builder, prepare_actions, log); + + size_t build_index = 0; + assert(join_ptr); + group_builder.transform([&](auto & builder) { + builder.setSinkOp( + std::make_unique(exec_context, log->identifier(), join_ptr, build_index++)); + }); + auto & join_execute_info = context.getDAGContext()->getJoinExecuteInfoMap()[execId()]; + join_execute_info.join_build_profile_infos = group_builder.getCurProfileInfos(); + join_ptr->initBuild(group_builder.getCurrentHeader(), group_builder.concurrency()); + join_ptr->setInitActiveBuildThreads(); + join_ptr.reset(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp new file mode 100644 index 00000000000..2c952fecfe6 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp @@ -0,0 +1,67 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +void PhysicalJoinProbe::buildPipelineExecGroupImpl( + PipelineExecutorContext & exec_context, + PipelineExecGroupBuilder & group_builder, + Context & context, + size_t concurrency) +{ + // Currently join probe does not support fine grained shuffle. + RUNTIME_CHECK(!fine_grained_shuffle.enabled()); + if (join_ptr->isSpilled() && group_builder.concurrency() == 1) + { + // When the join build operator spilled, the probe operator requires at least two or more threads to restore spilled hash partitions. + auto restore_concurrency = std::max(2, concurrency); + restoreConcurrency( + exec_context, + group_builder, + restore_concurrency, + context.getSettingsRef().max_buffered_bytes_in_executor, + log); + } + + executeExpression(exec_context, group_builder, prepare_actions, log); + + auto input_header = group_builder.getCurrentHeader(); + assert(join_ptr); + join_ptr->initProbe(input_header, group_builder.concurrency()); + size_t probe_index = 0; + const auto & max_block_size = context.getSettingsRef().max_block_size; + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique( + exec_context, + log->identifier(), + join_ptr, + probe_index++, + max_block_size, + input_header)); + }); + // The `join_ptr->wait_build_finished_future` does not need to be added to exec_context here; + // it is only necessary to add it during the "restore build stage." + // The order of build/probe here is ensured by the event. + exec_context.addOneTimeFuture(join_ptr->wait_probe_finished_future); + join_ptr->setCancellationHook([&]() { return exec_context.isCancelled(); }); + join_ptr.reset(); +} +} // namespace DB diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index f0d30fefedf..0a4a6061d4c 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -1634,9 +1634,164 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const /// handle other conditions if (!other_filter_column.empty() || !other_eq_filter_from_in_column.empty()) { +<<<<<<< HEAD if (!offsets_to_replicate) throw Exception("Should not reach here, the strictness of join with other condition must be ALL"); handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes); +======= + assert(offsets_to_replicate != nullptr); + handleOtherConditions(block, nullptr, offsets_to_replicate.get()); + + if (useRowFlaggedHashMap(kind, has_other_condition)) + { + // set hash table used flag using SemiMapped column + auto & mapped_column = block.getByName(flag_mapped_entry_helper_name).column; + const auto & ptr_col = static_cast(*mapped_column); + const auto & container = static_cast(ptr_col.getData()); + for (size_t i = 0; i < block.rows(); ++i) + { + auto ptr_value = container[i]; + auto * current = reinterpret_cast(ptr_value); + current->setUsed(); + } + + if (isRightSemiFamily(kind)) + { + // Return build table header for right semi/anti join + block = right_sample_block; + } + else if (kind == ASTTableJoin::Kind::RightOuter) + { + block.erase(flag_mapped_entry_helper_name); + } + } + } + + return block; +} + +Block Join::removeUselessColumn(Block & block) const +{ + // cancelled + if (!block) + return block; + + Block projected_block; + for (const auto & name_and_type : output_columns_after_finalize) + { + auto & column = block.getByName(name_and_type.name); + projected_block.insert(std::move(column)); + } + return projected_block; +} + +Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const +{ + std::vector result_blocks; + size_t result_rows = 0; + JoinBuildInfo join_build_info{ + enable_fine_grained_shuffle, + fine_grained_shuffle_count, + isEnableSpill(), + hash_join_spill_context->isSpilled(), + build_concurrency, + restore_config.restore_round}; + probe_process_info.prepareForHashProbe( + key_names_left, + non_equal_conditions.left_filter_column, + kind, + strictness, + join_build_info.needVirtualDispatchForProbeBlock(), + collators, + restore_config.restore_round); + while (true) + { + if (is_cancelled()) + return {}; + auto block = doJoinBlockHash(probe_process_info, join_build_info); + assert(block); + block = removeUselessColumn(block); + result_rows += block.rows(); + result_blocks.push_back(std::move(block)); + /// exit the while loop if + /// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed + /// 2. the block may be expanded after join and result_rows exceeds the min_result_block_size + if (probe_process_info.all_rows_joined_finish + || (may_probe_side_expanded_after_join && result_rows >= probe_process_info.min_result_block_size)) + break; + } + assert(!result_blocks.empty()); + return vstackBlocks(std::move(result_blocks)); +} + +Block Join::doJoinBlockCross(ProbeProcessInfo & probe_process_info) const +{ + assert(probe_process_info.prepare_for_probe_done); + if (cross_probe_mode == CrossProbeMode::DEEP_COPY_RIGHT_BLOCK) + { + probe_process_info.updateStartRow(); + auto block = crossProbeBlockDeepCopyRightBlock(kind, strictness, probe_process_info, original_blocks); + if (non_equal_conditions.other_cond_expr != nullptr) + { + assert(probe_process_info.offsets_to_replicate != nullptr); + if (probe_process_info.end_row - probe_process_info.start_row != probe_process_info.block.rows()) + { + probe_process_info.cutFilterAndOffsetVector(probe_process_info.start_row, probe_process_info.end_row); + } + handleOtherConditions( + block, + probe_process_info.filter.get(), + probe_process_info.offsets_to_replicate.get()); + } + return block; + } + else if (cross_probe_mode == CrossProbeMode::SHALLOW_COPY_RIGHT_BLOCK) + { + probe_process_info.updateStartRow(); + auto [block, is_matched_rows] + = crossProbeBlockShallowCopyRightBlock(kind, strictness, probe_process_info, original_blocks); + if (is_matched_rows) + { + if (non_equal_conditions.other_cond_expr != nullptr) + { + probe_process_info.cutFilterAndOffsetVector(0, 1); + /// for matched rows, each call to `doJoinBlockCross` only handle part of the probed data for one left row, the internal + /// state is saved in `probe_process_info` + handleOtherConditionsForOneProbeRow(block, probe_process_info); + } + for (size_t i = 0; i < probe_process_info.cross_join_data->left_column_index_in_left_block.size(); ++i) + { + auto & name = probe_process_info.block + .getByPosition(probe_process_info.cross_join_data->left_column_index_in_left_block[i]) + .name; + if (block.has(name)) + { + auto & column_and_name = block.getByName(name); + if (column_and_name.column->isColumnConst()) + column_and_name.column = column_and_name.column->convertToFullColumnIfConst(); + } + } + if (isLeftOuterSemiFamily(kind)) + { + auto & help_column = block.getByName(match_helper_name); + if (help_column.column->isColumnConst()) + help_column.column = help_column.column->convertToFullColumnIfConst(); + } + } + else if (non_equal_conditions.other_cond_expr != nullptr) + { + probe_process_info.cutFilterAndOffsetVector(0, block.rows()); + handleOtherConditions( + block, + probe_process_info.filter.get(), + probe_process_info.offsets_to_replicate.get()); + } + return block; + } + else + { + throw Exception(fmt::format("Unsupported cross probe mode: {}", magic_enum::enum_name(cross_probe_mode))); +>>>>>>> 78bd3f04dc (fix tiflash assert failure (#9456)) } } @@ -1983,6 +2138,9 @@ void Join::joinBlock(Block & block) const else throw Exception("Logical error: unknown combination of JOIN", ErrorCodes::LOGICAL_ERROR); + // if cancelled, just return empty block + if (!block) + return block; /// for (cartesian)antiLeftSemi join, the meaning of "match-helper" is `non-matched` instead of `matched`. if (kind == ASTTableJoin::Kind::LeftAnti || kind == ASTTableJoin::Kind::Cross_LeftAnti) {