Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Dec 19, 2024
1 parent bf6562e commit b478a7d
Show file tree
Hide file tree
Showing 22 changed files with 462 additions and 46 deletions.
2 changes: 2 additions & 0 deletions be/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ std::string Status::code_as_string() const {
return "Duplicate RPC invocation";
case TStatusCode::GLOBAL_DICT_ERROR:
return "Global dictionary error";
case TStatusCode::GLOBAL_DICT_NOT_MATCH:
return "Global dictionary not match";
case TStatusCode::UNKNOWN:
return "Unknown";
case TStatusCode::TXN_NOT_EXISTS:
Expand Down
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class STATUS_ATTRIBUTE Status {
}

static Status GlobalDictError(std::string_view msg) { return Status(TStatusCode::GLOBAL_DICT_ERROR, msg); }
static Status GlobalDictNotMatch(std::string_view msg) { return Status(TStatusCode::GLOBAL_DICT_NOT_MATCH, msg); }

static Status TransactionInProcessing(std::string_view msg) { return Status(TStatusCode::TXN_IN_PROCESSING, msg); }
static Status TransactionNotExists(std::string_view msg) { return Status(TStatusCode::TXN_NOT_EXISTS, msg); }
Expand Down
22 changes: 22 additions & 0 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "connector/hive_chunk_sink.h"
#include "exec/cache_select_scanner.h"
#include "exec/exec_node.h"
#include "exec/hdfs_scanner.h"
#include "exec/hdfs_scanner_orc.h"
#include "exec/hdfs_scanner_parquet.h"
#include "exec/hdfs_scanner_partition.h"
Expand Down Expand Up @@ -580,6 +581,26 @@ void HiveDataSource::_init_rf_counters() {
}
}

Status HiveDataSource::_init_global_dicts(HdfsScannerParams* params) {
const THdfsScanNode& hdfs_scan_node = _provider->_hdfs_scan_node;
const auto& global_dict_map = _runtime_state->get_query_global_dict_map();
auto global_dict = _pool.add(new ColumnIdToGlobalDictMap());
// mapping column id to storage column ids
const TupleDescriptor* tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(hdfs_scan_node.tuple_id);
for (auto slot : tuple_desc->slots()) {
if (!slot->is_materialized() || !slot->is_output_column()) {
continue;
}
auto iter = global_dict_map.find(slot->id());
if (iter != global_dict_map.end()) {
auto& dict_map = iter->second.first;
global_dict->emplace(slot->id(), const_cast<GlobalDictMap*>(&dict_map));
}
}
params->global_dictmaps = global_dict;
return Status::OK();
}

Status HiveDataSource::_init_scanner(RuntimeState* state) {
SCOPED_TIMER(_profile.open_file_timer);

Expand Down Expand Up @@ -607,6 +628,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateUniqueFromString(native_file_path, fsOptions));

HdfsScannerParams scanner_params;
RETURN_IF_ERROR(_init_global_dicts(&scanner_params));
scanner_params.runtime_filter_collector = _runtime_filters;
scanner_params.scan_range = &scan_range;
scanner_params.fs = _pool.add(fs.release());
Expand Down
2 changes: 2 additions & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class HiveDataSource final : public DataSource {
HdfsScanner* _create_kudu_jni_scanner(const FSOptions& options);
Status _check_all_slots_nullable();

Status _init_global_dicts(HdfsScannerParams* params);

// =====================================
ObjectPool _pool;
RuntimeState* _runtime_state = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Status HdfsScanner::_build_scanner_context() {
ctx.split_context = _scanner_params.split_context;
ctx.enable_split_tasks = _scanner_params.enable_split_tasks;
ctx.connector_max_split_size = _scanner_params.connector_max_split_size;
ctx.global_dictmaps = _scanner_params.global_dictmaps;

if (config::parquet_advance_zonemap_filter) {
ScanConjunctsManagerOptions opts;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ struct HdfsScannerParams {
bool orc_use_column_names = false;

int64_t connector_max_split_size = 0;

ColumnIdToGlobalDictMap* global_dictmaps = &EMPTY_GLOBAL_DICTMAPS;
};

struct HdfsScannerContext {
Expand Down Expand Up @@ -347,6 +349,8 @@ struct HdfsScannerContext {
std::unique_ptr<ScanConjunctsManager> conjuncts_manager = nullptr;
std::vector<std::unique_ptr<ColumnPredicate>> predicate_free_pool;
PredicateTree predicate_tree;

ColumnIdToGlobalDictMap* global_dictmaps = &EMPTY_GLOBAL_DICTMAPS;
};

struct OpenFileOptions {
Expand Down
17 changes: 17 additions & 0 deletions be/src/formats/parquet/column_reader_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "formats/parquet/scalar_column_reader.h"
#include "formats/parquet/schema.h"
#include "formats/utils.h"
#include "runtime/global_dict/types_fwd_decl.h"

namespace starrocks::parquet {

Expand Down Expand Up @@ -156,6 +157,22 @@ StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(const ColumnReaderOptions&
}
}

StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(ColumnReaderPtr inner_reader, const GlobalDictMap* dict) {
if (inner_reader->get_column_parquet_field()->type == ColumnType::ARRAY) {
ASSIGN_OR_RETURN(
ColumnReaderPtr child_reader,
ColumnReaderFactory::create(
std::move((down_cast<ListColumnReader*>(inner_reader.get()))->get_element_reader()), dict));
return std::make_unique<ListColumnReader>(inner_reader->get_column_parquet_field(), std::move(child_reader));
} else {
NakedColumnReader* nake_reader = dynamic_cast<NakedColumnReader *>(inner_reader.get());
if (nake_reader == nullptr) {
return Status::InternalError("Error on reader transform for low cardinality reader");
}
return std::make_unique<LowCardColumnReader>(*nake_reader, dict);
}
}

void ColumnReaderFactory::get_subfield_pos_with_pruned_type(const ParquetField& field, const TypeDescriptor& col_type,
bool case_sensitive, std::vector<int32_t>& pos) {
DCHECK(field.type == ColumnType::STRUCT);
Expand Down
2 changes: 2 additions & 0 deletions be/src/formats/parquet/column_reader_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class ColumnReaderFactory {
const TypeDescriptor& col_type,
const TIcebergSchemaField* iceberg_schema_field);

static StatusOr<ColumnReaderPtr> create(const ColumnReaderPtr inner_reader, const GlobalDictMap* dict);

private:
// for struct type without schema change
static void get_subfield_pos_with_pruned_type(const ParquetField& field, const TypeDescriptor& col_type,
Expand Down
2 changes: 2 additions & 0 deletions be/src/formats/parquet/complex_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class ListColumnReader final : public ColumnReader {
_element_reader->select_offset_index(range, rg_first_row);
}

ColumnReaderPtr& get_element_reader() { return _element_reader; }

private:
std::unique_ptr<ColumnReader> _element_reader;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ Status FileReader::_init_group_readers() {
_group_reader_param.not_existed_slots = &fd_scanner_ctx.not_existed_slots;
// for pageIndex
_group_reader_param.min_max_conjunct_ctxs = fd_scanner_ctx.min_max_conjunct_ctxs;
_group_reader_param.global_dictmaps = fd_scanner_ctx.global_dictmaps;

int64_t row_group_first_row = 0;
// select and create row group readers.
Expand Down
4 changes: 4 additions & 0 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ StatusOr<ColumnReaderPtr> GroupReader::_create_column_reader(const GroupReaderPa
ColumnReaderFactory::create(_column_reader_opts, schema_node, column.slot_type(),
column.t_iceberg_schema_field));
}
if (_param.global_dictmaps->contains(column.slot_id())) {
ASSIGN_OR_RETURN(column_reader, ColumnReaderFactory::create(std::move(column_reader),
_param.global_dictmaps->at(column.slot_id())));
}
if (column_reader == nullptr) {
// this shouldn't happen but guard
return Status::InternalError("No valid column reader.");
Expand Down
2 changes: 2 additions & 0 deletions be/src/formats/parquet/group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ struct GroupReaderParam {
const std::vector<ColumnPtr>* partition_values = nullptr;
// not existed column
const std::vector<SlotDescriptor*>* not_existed_slots = nullptr;
// used for global low cardinality optimization
ColumnIdToGlobalDictMap* global_dictmaps = &EMPTY_GLOBAL_DICTMAPS;
};

class PageIndexReader;
Expand Down
83 changes: 74 additions & 9 deletions be/src/formats/parquet/scalar_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
#include "formats/parquet/stored_column_reader_with_index.h"
#include "gutil/casts.h"
#include "io/shared_buffered_input_stream.h"
#include "runtime/global_dict/dict_column.h"
#include "runtime/types.h"
#include "simd/simd.h"
#include "simd/gather.h"
#include "statistics_helper.h"
#include "utils.h"

Expand Down Expand Up @@ -126,7 +129,7 @@ Status ScalarColumnReader::fill_dst_column(ColumnPtr& dst, ColumnPtr& src) {
return Status::OK();
}

bool ScalarColumnReader::_column_all_pages_dict_encoded() {
bool NakedColumnReader::_column_all_pages_dict_encoded() {
// The Parquet spec allows for column chunks to have mixed encodings
// where some data pages are dictionary-encoded and others are plain
// encoded. For example, a Parquet file writer might start writing
Expand Down Expand Up @@ -185,7 +188,7 @@ bool ScalarColumnReader::_column_all_pages_dict_encoded() {
return true;
}

void ScalarColumnReader::collect_column_io_range(std::vector<io::SharedBufferedInputStream::IORange>* ranges,
void NakedColumnReader::collect_column_io_range(std::vector<io::SharedBufferedInputStream::IORange>* ranges,
int64_t* end_offset, ColumnIOType type, bool active) {
const auto& column = *get_chunk_metadata();
if (type == ColumnIOType::PAGES) {
Expand Down Expand Up @@ -226,7 +229,7 @@ void ScalarColumnReader::collect_column_io_range(std::vector<io::SharedBufferedI
}
}

void ScalarColumnReader::select_offset_index(const SparseRange<uint64_t>& range, const uint64_t rg_first_row) {
void NakedColumnReader::select_offset_index(const SparseRange<uint64_t>& range, const uint64_t rg_first_row) {
if (_offset_index_ctx == nullptr) {
if (!get_chunk_metadata()->__isset.offset_index_offset) {
return;
Expand Down Expand Up @@ -272,8 +275,9 @@ void ScalarColumnReader::select_offset_index(const SparseRange<uint64_t>& range,
_reader = std::make_unique<StoredColumnReaderWithIndex>(std::move(_reader), _offset_index_ctx.get(), has_dict_page);
}

StatusOr<bool> ScalarColumnReader::row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
StatusOr<bool> NakedColumnReader::_row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
CompoundNodeType pred_relation,
const TypeDescriptor& col_type,
const uint64_t rg_first_row,
const uint64_t rg_num_rows) const {
if (!get_chunk_metadata()->meta_data.__isset.statistics || get_column_parquet_field() == nullptr) {
Expand All @@ -292,8 +296,8 @@ StatusOr<bool> ScalarColumnReader::row_group_zone_map_filter(const std::vector<c
std::optional<ZoneMapDetail> zone_map_detail = std::nullopt;

// used to hold min/max slice values
const ColumnPtr min_column = ColumnHelper::create_column(*_col_type, true);
const ColumnPtr max_column = ColumnHelper::create_column(*_col_type, true);
const ColumnPtr min_column = ColumnHelper::create_column(col_type, true);
const ColumnPtr max_column = ColumnHelper::create_column(col_type, true);
if (is_all_null) {
// if the entire column's value is null, the min/max value not existed
zone_map_detail = ZoneMapDetail{Datum{}, Datum{}, true};
Expand All @@ -302,12 +306,12 @@ StatusOr<bool> ScalarColumnReader::row_group_zone_map_filter(const std::vector<c
std::vector<string> min_values;
std::vector<string> max_values;
Status st =
StatisticsHelper::get_min_max_value(_opts.file_meta_data, *_col_type, &get_chunk_metadata()->meta_data,
StatisticsHelper::get_min_max_value(_opts.file_meta_data, col_type, &get_chunk_metadata()->meta_data,
get_column_parquet_field(), min_values, max_values);
if (st.ok()) {
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(min_column, min_values, *_col_type,
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(min_column, min_values, col_type,
get_column_parquet_field(), _opts.timezone));
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(max_column, max_values, *_col_type,
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(max_column, max_values, col_type,
get_column_parquet_field(), _opts.timezone));

zone_map_detail = ZoneMapDetail{min_column->get(0), max_column->get(0), has_null};
Expand All @@ -332,4 +336,65 @@ StatusOr<bool> ScalarColumnReader::row_group_zone_map_filter(const std::vector<c
return is_satisfy(zone_map_detail.value());
}

Status LowCardColumnReader::read_range(const Range<uint64_t>& range, const Filter* filter, ColumnPtr& dst) {
DCHECK(get_column_parquet_field()->is_nullable ? dst->is_nullable() : true);
ColumnContentType content_type = ColumnContentType::DICT_CODE;

if (_dict_code == nullptr) {
_dict_code = ColumnHelper::create_column(
TypeDescriptor::from_logical_type(ColumnDictFilterContext::kDictCodePrimitiveType), true);
}
_ori_column = dst;
dst = _dict_code;
dst->reserve(range.span_size());

{
SCOPED_RAW_TIMER(&_opts.stats->column_read_ns);
return _reader->read_range(range, filter, content_type, dst.get());
}
}

bool LowCardColumnReader::try_to_use_dict_filter(ExprContext* ctx, bool is_decode_needed, const SlotId slotId,
const std::vector<std::string>& sub_field_path, const size_t& layer) {
if (sub_field_path.size() != layer) {
return false;
}

if (_dict_filter_ctx == nullptr) {
_dict_filter_ctx = std::make_unique<ColumnDictFilterContext>();
_dict_filter_ctx->is_decode_needed = is_decode_needed;
_dict_filter_ctx->sub_field_path = sub_field_path;
_dict_filter_ctx->slot_id = slotId;
}
_dict_filter_ctx->conjunct_ctxs.push_back(ctx);
return true;
}

Status LowCardColumnReader::fill_dst_column(ColumnPtr& dst, ColumnPtr& src) {
if (!_code_convert_map.has_value()) {
RETURN_IF_ERROR(_check_current_dict());
}

dst->resize(src->size());

const ColumnPtr& dict_codes = src;
auto* codes_nullable_column = ColumnHelper::as_raw_column<NullableColumn>(dict_codes);
auto* codes_column = ColumnHelper::as_raw_column<FixedLengthColumn<int32_t>>(codes_nullable_column->data_column());
auto* dst_data_column = down_cast<LowCardDictColumn*>(ColumnHelper::get_data_column(dst.get()));
SIMDGather::gather(dst_data_column->get_data().data(), _code_convert_map->data(), codes_column->get_data().data(), DICT_DECODE_MAX_SIZE, src->size());

if (dst->is_nullable()) {
auto* nullable_codes = down_cast<NullableColumn*>(dict_codes.get());
auto* nullable_dst = down_cast<NullableColumn*>(dst.get());
nullable_dst->null_column_data().swap(nullable_codes->null_column_data());
nullable_dst->set_has_null(nullable_codes->has_null());
}

src->reset_column();
src = _ori_column;

return Status::OK();
}


} // namespace starrocks::parquet
Loading

0 comments on commit b478a7d

Please sign in to comment.