Skip to content

Commit

Permalink
compile ok, still need to deal with no-exist column and partition col…
Browse files Browse the repository at this point in the history
…umn for hive

Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Sep 18, 2023
1 parent 7d6ba74 commit 0fc5bcb
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
4 changes: 3 additions & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ StatusOr<ChunkPtr> ScanOperator::pull_chunk(RuntimeState* state) {
begin_pull_chunk(res);
// for query cache mechanism, we should emit EOS chunk when we receive the last chunk.
auto [tablet_id, is_eos] = _should_emit_eos(res);
eval_runtime_bloom_filters(res.get());
if (!_scan_node->support_push_down_runtime_filter_to_reader()) {
eval_runtime_bloom_filters(res.get());
}
res->owner_info().set_owner_id(tablet_id, is_eos);
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/scan/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "exec/scan_node.h"
#include "exec/pipeline/source_operator.h"
#include "exec/query_cache/cache_operator.h"
#include "exec/query_cache/lane_arbiter.h"
Expand Down Expand Up @@ -90,7 +91,7 @@ class ScanOperator : public SourceOperator {

virtual int64_t get_scan_table_id() const { return -1; }

virtual bool support_push_down_runtime_filter_to_reader() const { return _scan_node->is_push_down_runtime_filter_to_reader(); }
virtual bool support_push_down_runtime_filter_to_reader() const { return _scan_node->support_push_down_runtime_filter_to_reader(); }

protected:
static constexpr size_t kIOTaskBatchSize = 64;
Expand Down
8 changes: 4 additions & 4 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Status GroupReader::get_next(ChunkPtr* chunk, size_t* row_count) {
has_filter = true;
}

if (SIMD::contain_nonzero(chunk_filter) && _param.runtime_filter_collector) {
if (chunk_size > 0 && _param.runtime_filter_collector) {
// bloom filter
for (auto &it: _runtime_filter_by_slot) {
const JoinRuntimeFilter *filter = it.second;
Expand Down Expand Up @@ -267,7 +267,7 @@ void GroupReader::_process_columns_and_conjunct_ctxs() {
// cause of runtime filter min/max have been added to conjunct_ctxs_by_slot
// so just check runtime filter exist and add to dict_filter_ctx
if (_runtime_filter_by_slot.find(slot_id) != _runtime_filter_by_slot.end()) {
_dict_filter_ctx.use_as_dict_filter_column(read_col_idx, slot_id, _runtime_filter_by_slot.at(slot_id));
_dict_filter_ctx.use_as_dict_filter_column(slot_id, _runtime_filter_by_slot.at(slot_id));
}
} else {
bool has_conjunct = false;
Expand Down Expand Up @@ -618,9 +618,9 @@ Status GroupReader::DictFilterContext::rewrite_conjunct_ctxs_to_predicates(
ctx.use_merged_selection = false;
ctx.merged_selection.assign(dict_value_column->size(), 1);
join_filter->compute_hash({dict_value_column.get()}, &ctx);
filter->evaluate(dict_value_column.get(), &ctx);
join_filter->evaluate(dict_value_column.get(), &ctx);
bool all_zero = false;
ColumnHelper::merge_two_filters(filter, ctx.merged_selection.data(), &all_zero);
ColumnHelper::merge_two_filters(&filter, ctx.merged_selection.data(), &all_zero);
if (all_zero) {
*is_group_filtered = true;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/formats/parquet/group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class GroupReader {
// conjunct ctxs that eval after chunk is dict decoded
std::vector<ExprContext*> _left_conjunct_ctxs;

std::unordered_map<SlotId, JoinRuntimeFilter*> _runtime_filter_by_slot;
std::unordered_map<SlotId, const JoinRuntimeFilter*> _runtime_filter_by_slot;

// active columns that hold read_col index
std::vector<int> _active_column_indices;
Expand Down

0 comments on commit 0fc5bcb

Please sign in to comment.