diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index 312e7f09ef963..51a6dc77b0398 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -233,7 +233,9 @@ StatusOr ScanOperator::pull_chunk(RuntimeState* state) { begin_pull_chunk(res); // for query cache mechanism, we should emit EOS chunk when we receive the last chunk. auto [tablet_id, is_eos] = _should_emit_eos(res); - eval_runtime_bloom_filters(res.get()); + if (!_scan_node->support_push_down_runtime_filter_to_reader()) { + eval_runtime_bloom_filters(res.get()); + } res->owner_info().set_owner_id(tablet_id, is_eos); } diff --git a/be/src/exec/pipeline/scan/scan_operator.h b/be/src/exec/pipeline/scan/scan_operator.h index 24405ced9ec15..4c6e833a34f50 100644 --- a/be/src/exec/pipeline/scan/scan_operator.h +++ b/be/src/exec/pipeline/scan/scan_operator.h @@ -14,6 +14,7 @@ #pragma once +#include "exec/scan_node.h" #include "exec/pipeline/source_operator.h" #include "exec/query_cache/cache_operator.h" #include "exec/query_cache/lane_arbiter.h" @@ -90,7 +91,7 @@ class ScanOperator : public SourceOperator { virtual int64_t get_scan_table_id() const { return -1; } - virtual bool support_push_down_runtime_filter_to_reader() const { return _scan_node->is_push_down_runtime_filter_to_reader(); } + virtual bool support_push_down_runtime_filter_to_reader() const { return _scan_node->support_push_down_runtime_filter_to_reader(); } protected: static constexpr size_t kIOTaskBatchSize = 64; diff --git a/be/src/formats/parquet/group_reader.cpp b/be/src/formats/parquet/group_reader.cpp index ca7cded573f11..0cd392970c4e4 100644 --- a/be/src/formats/parquet/group_reader.cpp +++ b/be/src/formats/parquet/group_reader.cpp @@ -117,7 +117,7 @@ Status GroupReader::get_next(ChunkPtr* chunk, size_t* row_count) { has_filter = true; } - if (SIMD::contain_nonzero(chunk_filter) && _param.runtime_filter_collector) { + if (chunk_size > 0 && _param.runtime_filter_collector) { // bloom filter for (auto &it: _runtime_filter_by_slot) { const JoinRuntimeFilter *filter = it.second; @@ -267,7 +267,7 @@ void GroupReader::_process_columns_and_conjunct_ctxs() { // cause of runtime filter min/max have been added to conjunct_ctxs_by_slot // so just check runtime filter exist and add to dict_filter_ctx if (_runtime_filter_by_slot.find(slot_id) != _runtime_filter_by_slot.end()) { - _dict_filter_ctx.use_as_dict_filter_column(read_col_idx, slot_id, _runtime_filter_by_slot.at(slot_id)); + _dict_filter_ctx.use_as_dict_filter_column(slot_id, _runtime_filter_by_slot.at(slot_id)); } } else { bool has_conjunct = false; @@ -618,9 +618,9 @@ Status GroupReader::DictFilterContext::rewrite_conjunct_ctxs_to_predicates( ctx.use_merged_selection = false; ctx.merged_selection.assign(dict_value_column->size(), 1); join_filter->compute_hash({dict_value_column.get()}, &ctx); - filter->evaluate(dict_value_column.get(), &ctx); + join_filter->evaluate(dict_value_column.get(), &ctx); bool all_zero = false; - ColumnHelper::merge_two_filters(filter, ctx.merged_selection.data(), &all_zero); + ColumnHelper::merge_two_filters(&filter, ctx.merged_selection.data(), &all_zero); if (all_zero) { *is_group_filtered = true; return Status::OK(); diff --git a/be/src/formats/parquet/group_reader.h b/be/src/formats/parquet/group_reader.h index 2eb5e0673377f..63201e0f26add 100644 --- a/be/src/formats/parquet/group_reader.h +++ b/be/src/formats/parquet/group_reader.h @@ -165,7 +165,7 @@ class GroupReader { // conjunct ctxs that eval after chunk is dict decoded std::vector _left_conjunct_ctxs; - std::unordered_map _runtime_filter_by_slot; + std::unordered_map _runtime_filter_by_slot; // active columns that hold read_col index std::vector _active_column_indices;