Skip to content

Commit

Permalink
[Enhancement]dict_merge can process data from plain reader
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Jan 8, 2025
1 parent 67cdf03 commit deeea1e
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 6 deletions.
42 changes: 36 additions & 6 deletions be/src/exprs/agg/distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,23 +467,53 @@ class DictMergeAggregateFunction final

void update(FunctionContext* ctx, const Column** columns, AggDataPtr __restrict state,
size_t row_num) const override {
DCHECK(false) << "this method shouldn't be called";
auto& agg_state = this->data(state);
MemPool* mem_pool = ctx->mem_pool();

// if dict size greater than DICT_DECODE_MAX_SIZE. we return a FAKE dictionary
if (agg_state.over_limit) {
return;
}

if (columns[0]->is_array()) {
const auto* array_column = down_cast<const ArrayColumn*>(columns[0]);
const auto* column = array_column->elements_column().get();
const auto& off = array_column->offsets().get_data();
const auto* binary_column = down_cast<const BinaryColumn*>(ColumnHelper::get_data_column(column));
for (auto i = off[row_num]; i < off[row_num + 1]; i++) {
if (!column->is_null(i)) {
agg_state.update(mem_pool, binary_column->get_slice(i));
}
}
} else {
const auto& binary_column = down_cast<const BinaryColumn&>(*columns[0]);
agg_state.update(mem_pool, binary_column.get_slice(row_num));
}

agg_state.over_limit = agg_state.set.size() > DICT_DECODE_MAX_SIZE;
}

void update_batch_single_state(FunctionContext* ctx, size_t chunk_size, const Column** columns,
AggDataPtr __restrict state) const override {
auto& agg_state = this->data(state);
const auto* column = down_cast<const ArrayColumn*>(columns[0]);
MemPool* mem_pool = ctx->mem_pool();

// if dict size greater than DICT_DECODE_MAX_SIZE. we return a FAKE dictionary
if (agg_state.over_limit) {
return;
}

const auto& elements_column = column->elements();
if (column->elements().is_nullable()) {
const auto& null_column = down_cast<const NullableColumn&>(elements_column);
const Column* column = nullptr;

if (columns[0]->is_array()) {
const auto* array_column = down_cast<const ArrayColumn*>(columns[0]);
column = array_column->elements_column().get();
} else {
column = columns[0];
}

if (column->is_nullable()) {
const auto& null_column = down_cast<const NullableColumn&>(*column);
const auto& null_data = null_column.immutable_null_column_data();
const auto& binary_column = down_cast<const BinaryColumn&>(null_column.data_column_ref());

Expand All @@ -494,7 +524,7 @@ class DictMergeAggregateFunction final
}
agg_state.over_limit = agg_state.set.size() > DICT_DECODE_MAX_SIZE;
} else {
const auto& binary_column = down_cast<const BinaryColumn&>(elements_column);
const auto& binary_column = down_cast<const BinaryColumn&>(*column);
for (size_t i = 0; i < binary_column.size(); ++i) {
agg_state.update(mem_pool, binary_column.get_slice(i));
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/agg/factory/aggregate_resolver_others.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ void AggregateFuncResolver::register_others() {
add_general_mapping<AnyValueSemiState>("any_value", false, AggregateFactory::MakeAnyValueSemiAggregateFunction());
add_general_mapping_notnull("array_agg2", false, AggregateFactory::MakeArrayAggAggregateFunctionV2());
add_general_mapping_notnull("group_concat2", false, AggregateFactory::MakeGroupConcatAggregateFunctionV2());

add_aggregate_mapping<TYPE_VARCHAR, TYPE_VARCHAR, DictMergeState>(
"dict_merge", false, AggregateFactory::MakeDictMergeAggregateFunction());
}

} // namespace starrocks
41 changes: 41 additions & 0 deletions test/sql/test_agg_function/R/test_dict_merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- name: testDictMerge
CREATE TABLE `test_dict_merge` (
`id` int NULL COMMENT "",
`city` string NOT NULL COMMENT "",
`city_null` string NULL COMMENT "",
`city_array` array<string> NOT NULL COMMENT "",
`city_array_null` array<string> NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 4
PROPERTIES (
"replication_num" = "1",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);
-- result:
-- !result
insert into test_dict_merge values
(1, "beijing", "beijing", ["beijing", "shanghai"], NULL),
(1, "beijing", NULL, ["shenzhen", "shanghai"], ["shenzhen", "shanghai"]),
(1, "shanghai", "shanghai", ["shenzhen", NULL], ["shenzhen", NULL]),
(1, "shanghai", NULL, ["beijing", NULL, "shanghai"], NULL);
-- result:
-- !result
select dict_merge(city) from test_dict_merge;
-- result:
{"2":{"lst":["str",2,"YmVpamluZw","c2hhbmdoYWk"]},"3":{"lst":["i32",2,1,2]}}
-- !result
select dict_merge(city_null) from test_dict_merge;
-- result:
{"2":{"lst":["str",2,"YmVpamluZw","c2hhbmdoYWk"]},"3":{"lst":["i32",2,1,2]}}
-- !result
select dict_merge(city_array) from test_dict_merge;
-- result:
{"2":{"lst":["str",3,"YmVpamluZw","c2hhbmdoYWk","c2hlbnpoZW4"]},"3":{"lst":["i32",3,1,2,3]}}
-- !result
select dict_merge(city_array_null) from test_dict_merge;
-- result:
{"2":{"lst":["str",2,"c2hhbmdoYWk","c2hlbnpoZW4"]},"3":{"lst":["i32",2,1,2]}}
-- !result
27 changes: 27 additions & 0 deletions test/sql/test_agg_function/T/test_dict_merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- name: testDictMerge
CREATE TABLE `test_dict_merge` (
`id` int NULL COMMENT "",
`city` string NOT NULL COMMENT "",
`city_null` string NULL COMMENT "",
`city_array` array<string> NOT NULL COMMENT "",
`city_array_null` array<string> NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 4
PROPERTIES (
"replication_num" = "1",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);

insert into test_dict_merge values
(1, "beijing", "beijing", ["beijing", "shanghai"], NULL),
(1, "beijing", NULL, ["shenzhen", "shanghai"], ["shenzhen", "shanghai"]),
(1, "shanghai", "shanghai", ["shenzhen", NULL], ["shenzhen", NULL]),
(1, "shanghai", NULL, ["beijing", NULL, "shanghai"], NULL);

select dict_merge(city) from test_dict_merge;
select dict_merge(city_null) from test_dict_merge;
select dict_merge(city_array) from test_dict_merge;
select dict_merge(city_array_null) from test_dict_merge;
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ select count(*) from tpch_customer_null where c_mktsegment is not null;
-- result:
24
-- !result
select dict_merge(c_mktsegment) from tpch_customer_null;
-- result:
{"2":{"lst":["str",4,"QVVUT01PQklMRQ","QlVJTERJTkc","RlVSTklUVVJF","TUFDSElORVJZ"]},"3":{"lst":["i32",4,1,2,3,4]}}
-- !result
shell: ossutil64 rm -rf oss://${oss_bucket}/test_parquet_dict_with_null_value/${uuid0}/ >/dev/null || echo "exit 0" >/dev/null
-- result:
0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,6 @@ select count(*) from tpch_customer_null where c_mktsegment is null;

select count(*) from tpch_customer_null where c_mktsegment is not null;

select dict_merge(c_mktsegment) from tpch_customer_null;

shell: ossutil64 rm -rf oss://${oss_bucket}/test_parquet_dict_with_null_value/${uuid0}/ >/dev/null || echo "exit 0" >/dev/null

0 comments on commit deeea1e

Please sign in to comment.