Skip to content

Commit

Permalink
async trigger
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Dec 19, 2024
1 parent b478a7d commit ad04c34
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 139 deletions.
9 changes: 6 additions & 3 deletions be/src/formats/parquet/column_reader_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "formats/parquet/column_reader_factory.h"

#include "common/global_types.h"
#include "formats/parquet/complex_column_reader.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/parquet/schema.h"
Expand Down Expand Up @@ -157,19 +158,21 @@ StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(const ColumnReaderOptions&
}
}

StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(ColumnReaderPtr inner_reader, const GlobalDictMap* dict) {
StatusOr<ColumnReaderPtr> ColumnReaderFactory::create(ColumnReaderPtr inner_reader, const GlobalDictMap* dict,
SlotId slot_id) {
if (inner_reader->get_column_parquet_field()->type == ColumnType::ARRAY) {
ASSIGN_OR_RETURN(
ColumnReaderPtr child_reader,
ColumnReaderFactory::create(
std::move((down_cast<ListColumnReader*>(inner_reader.get()))->get_element_reader()), dict));
std::move((down_cast<ListColumnReader*>(inner_reader.get()))->get_element_reader()), dict,
slot_id));
return std::make_unique<ListColumnReader>(inner_reader->get_column_parquet_field(), std::move(child_reader));
} else {
NakedColumnReader* nake_reader = dynamic_cast<NakedColumnReader *>(inner_reader.get());
if (nake_reader == nullptr) {
return Status::InternalError("Error on reader transform for low cardinality reader");
}
return std::make_unique<LowCardColumnReader>(*nake_reader, dict);
return std::make_unique<LowCardColumnReader>(*nake_reader, dict, slot_id);
}
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/formats/parquet/column_reader_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class ColumnReaderFactory {
const TypeDescriptor& col_type,
const TIcebergSchemaField* iceberg_schema_field);

static StatusOr<ColumnReaderPtr> create(const ColumnReaderPtr inner_reader, const GlobalDictMap* dict);
static StatusOr<ColumnReaderPtr> create(const ColumnReaderPtr inner_reader, const GlobalDictMap* dict,
const SlotId slot_id);

private:
// for struct type without schema change
Expand Down
3 changes: 2 additions & 1 deletion be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ StatusOr<ColumnReaderPtr> GroupReader::_create_column_reader(const GroupReaderPa
}
if (_param.global_dictmaps->contains(column.slot_id())) {
ASSIGN_OR_RETURN(column_reader, ColumnReaderFactory::create(std::move(column_reader),
_param.global_dictmaps->at(column.slot_id())));
_param.global_dictmaps->at(column.slot_id()),
column.slot_id()));
}
if (column_reader == nullptr) {
// this shouldn't happen but guard
Expand Down
33 changes: 33 additions & 0 deletions be/src/formats/parquet/scalar_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,5 +396,38 @@ Status LowCardColumnReader::fill_dst_column(ColumnPtr& dst, ColumnPtr& src) {
return Status::OK();
}

Status LowCardColumnReader::_check_current_dict() {
std::vector<int16_t> code_convert_map;

// create dict value chunk for evaluation.
ColumnPtr dict_value_column = ColumnHelper::create_column(TypeDescriptor(TYPE_VARCHAR), true);
RETURN_IF_ERROR(_reader->get_dict_values(dict_value_column.get()));

size_t dict_size = dict_value_column->size();

code_convert_map.resize(dict_size + 2);
std::fill(code_convert_map.begin(), code_convert_map.end(), 0);
auto* local_to_global = code_convert_map.data();

auto viewer = ColumnViewer<TYPE_VARCHAR>(dict_value_column);

for (int i = 0; i < dict_size; ++i) {
auto slice = viewer.value(i);
auto res = _dict->find(slice);
if (res == _dict->end()) {
if (slice.size > 0) {
// error message format used to extract info, carefully
return Status::GlobalDictNotMatch(fmt::format("SlotId: {}, file doesn't match global dict, {}",
_slot_id, _opts.file->filename()));
} else {
local_to_global[i] = res->second;
}
}
}

_code_convert_map = std::move(code_convert_map);

return Status::OK();
}

} // namespace starrocks::parquet
121 changes: 7 additions & 114 deletions be/src/formats/parquet/scalar_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,14 @@ class ScalarColumnReader final: public NakedColumnReader {

class LowCardColumnReader final : public NakedColumnReader {
public:
explicit LowCardColumnReader(const NakedColumnReader& reader, const GlobalDictMap* dict)
: NakedColumnReader(reader), _dict(dict) {}
explicit LowCardColumnReader(const NakedColumnReader& reader, const GlobalDictMap* dict, SlotId slot_id)
: NakedColumnReader(reader), _dict(dict), _slot_id(slot_id) {}

Status prepare() override {
if (!_column_all_pages_dict_encoded()) {
return Status::GlobalDictNotMatch("Not dict encoded on global dict column");
// error message format used to extract info, carefully
return Status::GlobalDictNotMatch(
fmt.format("SlotId: {}, Not dict encoded on global dict column", _slot_id));
}
return NakedColumnReader::prepare();
}
Expand Down Expand Up @@ -213,126 +215,17 @@ class LowCardColumnReader final : public NakedColumnReader {
}

private:
Status _check_current_dict() {
std::vector<int16_t> code_convert_map;

// create dict value chunk for evaluation.
ColumnPtr dict_value_column = ColumnHelper::create_column(TypeDescriptor(TYPE_VARCHAR), true);
RETURN_IF_ERROR(_reader->get_dict_values(dict_value_column.get()));

size_t dict_size = dict_value_column->size();

code_convert_map.resize(dict_size + 2);
std::fill(code_convert_map.begin(), code_convert_map.end(), 0);
auto* local_to_global = code_convert_map.data();

auto viewer = ColumnViewer<TYPE_VARCHAR>(dict_value_column);

for (int i = 0; i < dict_size; ++i) {
auto slice = viewer.value(i);
auto res = _dict->find(slice);
if (res == _dict->end()) {
if (slice.size > 0) {
return Status::GlobalDictNotMatch(fmt::format("not found slice:{} in global dict", slice.to_string()));
}
} else {
local_to_global[i] = res->second;
}
}

_code_convert_map = std::move(code_convert_map);

return Status::OK();
}
Status _check_current_dict();

std::unique_ptr<ColumnDictFilterContext> _dict_filter_ctx;

const GlobalDictMap* _dict = nullptr;
const SlotId _slot_id;

std::optional<std::vector<int16_t>> _code_convert_map;

ColumnPtr _dict_code = nullptr;
ColumnPtr _ori_column = nullptr;
};

//class LowCardColumnReader : public ScalarColumnReader {
//public:
// explicit LowCardColumnReader(ScalarColumnReader)
//
//
//};

//class WrappedReader : public ColumnReader {
//public:
// explicit WrappedReader(ColumnReaderPtr inner_reader)
// : ColumnReader(inner_reader->get_column_parquet_field()), _inner_reader(std::move(inner_reader)) {
// DCHECK(dynamic_cast<ScalarColumnReader*>(_inner_reader.get()));
// }
// ~WrappedReader() override = default;
//
// Status prepare() override { return _inner_reader->prepare(); }
//
// Status read_range(const Range<uint64_t>& range, const Filter* filter, ColumnPtr& dst) override {
// //
// // return _inner_reader->read_range(range, filter, dst);
// }
//
// void get_levels(level_t** def_levels, level_t** rep_levels, size_t* num_levels) override {
// _inner_reader->get_levels(def_levels, rep_levels, num_levels);
// };
//
// void set_need_parse_levels(bool need_parse_levels) override {
// _inner_reader->set_need_parse_levels(need_parse_levels);
// }
//
// bool try_to_use_dict_filter(ExprContext* ctx, bool is_decode_needed, const SlotId slotId,
// const std::vector<std::string>& sub_field_path, const size_t& layer) override {
// return _inner_reader->try_to_use_dict_filter(ctx, is_decode_needed, slotId, sub_field_path, layer);
// }
//
// Status rewrite_conjunct_ctxs_to_predicate(bool* is_group_filtered, const std::vector<std::string>& sub_field_path,
// const size_t& layer) override {
// return _inner_reader->rewrite_conjunct_ctxs_to_predicate(is_group_filtered, sub_field_path, layer);
// }
//
// Status filter_dict_column(const ColumnPtr& column, Filter* filter, const std::vector<std::string>& sub_field_path,
// const size_t& layer) override {
// return _inner_reader->filter_dict_column(column, filter, sub_field_path, layer);
// }
//
// void collect_column_io_range(std::vector<io::SharedBufferedInputStream::IORange>* ranges, int64_t* end_offset,
// ColumnIOType type, bool active) override {
// _inner_reader->collect_column_io_range(ranges, end_offset, type, active);
// }
//
// const tparquet::ColumnChunk* get_chunk_metadata() const override { return _inner_reader->get_chunk_metadata(); }
//
// StatusOr<tparquet::OffsetIndex*> get_offset_index(const uint64_t rg_first_row) override {
// return _inner_reader->get_offset_index(rg_first_row);
// }
//
// void select_offset_index(const SparseRange<uint64_t>& range, const uint64_t rg_first_row) override {
// _inner_reader->select_offset_index(range, rg_first_row);
// }
//
// // Return true means selected, return false means not selected
// StatusOr<bool> row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
// CompoundNodeType pred_relation, const uint64_t rg_first_row,
// const uint64_t rg_num_rows) const override {
// return _inner_reader->row_group_zone_map_filter(predicates, pred_relation, rg_first_row, rg_num_rows);
// }
//
//private:
// ColumnReaderPtr _inner_reader;
//};

//class LowCardColumnReader : public WrappedReader {
//public:
// LowCardColumnReader(ColumnReaderPtr inner_reader, const GlobalDictMap* dict)
// : WrappedReader(std::move(inner_reader)), _dict(dict) {}
//
//private:
// const GlobalDictMap* _dict = nullptr;
//};

} // namespace starrocks::parquet
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.connector;

import com.starrocks.thrift.THdfsScanRange;
import com.starrocks.thrift.TScanRangeLocations;

import java.util.List;
Expand All @@ -22,16 +23,24 @@ public class RemoteFilesSampleStrategy {
public enum Type {
ALL,
RANDOM,
SPECIFIC,
}
private int limit = 5;
private Type type = Type.ALL;
private int sampled = 0;
private String fileName;

public RemoteFilesSampleStrategy() {}

public RemoteFilesSampleStrategy(int limit, Type type) {
public RemoteFilesSampleStrategy(int limit) {
this.limit = limit;
this.type = type;
this.type = Type.RANDOM;
}

public RemoteFilesSampleStrategy(String fileName) {
this.limit = 1;
this.type = Type.SPECIFIC;
this.fileName = fileName;
}

public int getLimit() {
Expand All @@ -51,12 +60,29 @@ public boolean enough() {
}

public List<TScanRangeLocations> sample(List<TScanRangeLocations> input) {
if (type == Type.ALL) {
return input;
} else {
int s = Math.max(0, Math.min(input.size(), limit - sampled));
sampled += s;
return input.subList(0, s);
switch (type) {
case ALL:
return input;
case RANDOM:
int s = Math.max(0, Math.min(input.size(), limit - sampled));
sampled += s;
return input.subList(0, s);
case SPECIFIC:
for (TScanRangeLocations scan : input) {
if (scan.isSetScan_range() && scan.getScan_range().isSetHdfs_scan_range()) {
THdfsScanRange scanRange = scan.getScan_range().getHdfs_scan_range();
if (scanRange.isSetRelative_path() && fileName.contains(scanRange.relative_path)) {
sampled += 1;
return List.of(scan);
}
if (scanRange.isSetFull_path() && fileName.equals(scanRange.full_path)) {
sampled += 1;
return List.of(scan);
}
}
}
return List.of();
}
return List.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@

package com.starrocks.connector.exception;

import com.starrocks.common.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Optional;

import static java.lang.String.format;

public class GlobalDictNotMatchException extends RuntimeException {
private static final Logger LOG = LogManager.getLogger(GlobalDictNotMatchException.class);
public GlobalDictNotMatchException(String message) {
super(message);
}
Expand All @@ -37,5 +44,20 @@ public String getErrorMessage() {
public String getMessage() {
return getErrorMessage();
}

public Pair<Optional<Integer>, Optional<String>> extract() {
try {
String[] splits = getMessage().split(",");
int slotId = Integer.parseInt(splits[0].split(":")[0]);
if (splits.length == 2) {
return new Pair<Optional<Integer>, Optional<String>>(Optional.of(slotId), Optional.empty());
} else {
return new Pair<Optional<Integer>, Optional<String>>(Optional.of(slotId), Optional.empty());
}
} catch (Exception e) {
LOG.warn(getMessage());
return new Pair<Optional<Integer>, Optional<String>>(Optional.empty(), Optional.empty());
}
}
}

4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public String getTableName() {
return desc.getTable().getName();
}

public TupleDescriptor getDesc() {
return desc;
}

public boolean isLocalNativeTable() {
return false;
}
Expand Down
Loading

0 comments on commit ad04c34

Please sign in to comment.