Skip to content

Commit

Permalink
fix: use flag in preflush to indicate whether reorder is required (#1298
Browse files Browse the repository at this point in the history
)

## Rationale
Current we suggest new primary keys in preflush, and do a reorder in
flush task.

There is no easy way for flush task to know if reorder if required.

## Detailed Changes
- Introduce a flag to represent whether reorder is required
- Change `col` to `ident` when build logical plan, `ident` will not
normalize column name.

## Test Plan
Integration test.
  • Loading branch information
jiacai2050 authored Nov 8, 2023
1 parent 617b166 commit c33ab01
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 13 deletions.
24 changes: 20 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub struct TableFlushRequest {
pub table_data: TableDataRef,
/// Max sequence number to flush (inclusive).
pub max_sequence: SequenceNumber,

/// We may suggest new primary keys in preflush. if suggestion happened, we
/// need to ensure data is in new order.
need_reorder: bool,
}

#[derive(Clone)]
Expand Down Expand Up @@ -287,7 +291,7 @@ impl FlushTask {
// Start flush duration timer.
let local_metrics = self.table_data.metrics.local_flush_metrics();
let _timer = local_metrics.start_flush_timer();
self.dump_memtables(request_id, &mems_to_flush)
self.dump_memtables(request_id, &mems_to_flush, flush_req.need_reorder)
.await
.box_err()
.context(FlushJobWithCause {
Expand Down Expand Up @@ -316,6 +320,7 @@ impl FlushTask {
let mut last_sequence = table_data.last_sequence();
// Switch (freeze) all mutable memtables. And update segment duration if
// suggestion is returned.
let mut need_reorder = false;
if let Some(suggest_segment_duration) = current_version.suggest_duration() {
info!(
"Update segment duration, table:{}, table_id:{}, segment_duration:{:?}",
Expand All @@ -324,6 +329,7 @@ impl FlushTask {
assert!(!suggest_segment_duration.is_zero());

if let Some(pk_idx) = current_version.suggest_primary_key() {
need_reorder = true;
let mut schema = table_data.schema();
info!(
"Update primary key, table:{}, table_id:{}, old:{:?}, new:{:?}",
Expand Down Expand Up @@ -388,6 +394,7 @@ impl FlushTask {
Ok(TableFlushRequest {
table_data: table_data.clone(),
max_sequence: last_sequence,
need_reorder,
})
}

Expand All @@ -401,6 +408,7 @@ impl FlushTask {
&self,
request_id: RequestId,
mems_to_flush: &FlushableMemTables,
need_reorder: bool,
) -> Result<()> {
let local_metrics = self.table_data.metrics.local_flush_metrics();
let mut files_to_level0 = Vec::with_capacity(mems_to_flush.memtables.len());
Expand All @@ -410,7 +418,12 @@ impl FlushTask {
// process sampling memtable and frozen memtable
if let Some(sampling_mem) = &mems_to_flush.sampling_mem {
if let Some(seq) = self
.dump_sampling_memtable(request_id, sampling_mem, &mut files_to_level0)
.dump_sampling_memtable(
request_id,
sampling_mem,
&mut files_to_level0,
need_reorder,
)
.await?
{
flushed_sequence = seq;
Expand Down Expand Up @@ -500,6 +513,7 @@ impl FlushTask {
request_id: RequestId,
sampling_mem: &SamplingMemTable,
files_to_level0: &mut Vec<AddFile>,
need_reorder: bool,
) -> Result<Option<SequenceNumber>> {
let (min_key, max_key) = match (sampling_mem.mem.min_key(), sampling_mem.mem.max_key()) {
(Some(min_key), Some(max_key)) => (min_key, max_key),
Expand Down Expand Up @@ -589,11 +603,13 @@ impl FlushTask {

let iter = build_mem_table_iter(sampling_mem.mem.clone(), &self.table_data)?;
let timestamp_idx = self.table_data.schema().timestamp_index();
if let Some(pk_idx) = self.table_data.current_version().suggest_primary_key() {
if need_reorder {
let schema = self.table_data.schema();
let primary_key_indexes = schema.primary_key_indexes();
let reorder = Reorder {
iter,
schema: self.table_data.schema(),
order_by_col_indexes: pk_idx,
order_by_col_indexes: primary_key_indexes.to_vec(),
};
let mut stream = reorder.into_stream().await.context(ReorderMemIter)?;
while let Some(data) = stream.next().await {
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/reorder_memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion::{
execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream as DfRecordBatchStream, SendableRecordBatchStream, Statistics,
},
prelude::{col, Expr, SessionConfig, SessionContext},
prelude::{ident, Expr, SessionConfig, SessionContext},
sql::TableReference,
};
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Reorder {
let columns = schema.columns();
let sort_exprs = sort_by_col_idx
.iter()
.map(|i| col(&columns[*i].name).sort(true, true))
.map(|i| ident(&columns[*i].name).sort(true, true))
.collect::<Vec<_>>();
let df_plan = LogicalPlanBuilder::scan(DUMMY_TABLE_NAME, source, None)?
.sort(sort_exprs)?
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl PrimaryKeySampler {
.iter()
.enumerate()
.map(|(idx, col)| {
if idx == timestamp_index {
if col.data_type.is_timestamp() {
return None;
}

Expand Down
26 changes: 22 additions & 4 deletions integration_tests/cases/env/local/ddl/sampling-primary-key.result
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ CREATE TABLE `sampling_primary_key_table` (
v3 double,
v5 double,
name string TAG,
value int64 NOT NULL,
myVALUE int64 NOT NULL,
t timestamp NOT NULL,
timestamp KEY (t)) ENGINE = Analytic WITH (
update_mode='append',
Expand All @@ -20,10 +20,10 @@ affected_rows: 0
show create table `sampling_primary_key_table`;

Table,Create Table,
String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"),
String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"),


INSERT INTO `sampling_primary_key_table` (t, name, value)
INSERT INTO `sampling_primary_key_table` (t, name, myVALUE)
VALUES
(1695348000000, "ceresdb2", 200),
(1695348000005, "ceresdb2", 100),
Expand All @@ -32,12 +32,30 @@ INSERT INTO `sampling_primary_key_table` (t, name, value)

affected_rows: 4

select * from `sampling_primary_key_table`;

tsid,t,v1,v2,v3,v5,name,myVALUE,
UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200),
UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100),
UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100),
UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200),


-- After flush, its primary key should changed.
-- SQLNESS ARG pre_cmd=flush
show create table `sampling_primary_key_table`;

Table,Create Table,
String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(value,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"),
String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(myVALUE,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"),


select * from `sampling_primary_key_table`;

tsid,t,v1,v2,v3,v5,name,myVALUE,
UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100),
UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100),
UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200),
UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200),


DROP TABLE IF EXISTS `sampling_primary_key_table`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE TABLE `sampling_primary_key_table` (
v3 double,
v5 double,
name string TAG,
value int64 NOT NULL,
myVALUE int64 NOT NULL,
t timestamp NOT NULL,
timestamp KEY (t)) ENGINE = Analytic WITH (
update_mode='append',
Expand All @@ -16,15 +16,19 @@ CREATE TABLE `sampling_primary_key_table` (

show create table `sampling_primary_key_table`;

INSERT INTO `sampling_primary_key_table` (t, name, value)
INSERT INTO `sampling_primary_key_table` (t, name, myVALUE)
VALUES
(1695348000000, "ceresdb2", 200),
(1695348000005, "ceresdb2", 100),
(1695348000001, "ceresdb1", 100),
(1695348000003, "ceresdb3", 200);

select * from `sampling_primary_key_table`;

-- After flush, its primary key should changed.
-- SQLNESS ARG pre_cmd=flush
show create table `sampling_primary_key_table`;

select * from `sampling_primary_key_table`;

DROP TABLE IF EXISTS `sampling_primary_key_table`;

0 comments on commit c33ab01

Please sign in to comment.