From c33ab014f182a2462b17ece94ffc95e99b4e9a59 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Wed, 8 Nov 2023 14:02:43 +0800 Subject: [PATCH] fix: use flag in preflush to indicate whether reorder is required (#1298) ## Rationale Current we suggest new primary keys in preflush, and do a reorder in flush task. There is no easy way for flush task to know if reorder if required. ## Detailed Changes - Introduce a flag to represent whether reorder is required - Change `col` to `ident` when build logical plan, `ident` will not normalize column name. ## Test Plan Integration test. --- .../src/instance/flush_compaction.rs | 24 ++++++++++++++--- .../src/instance/reorder_memtable.rs | 4 +-- analytic_engine/src/sampler.rs | 2 +- .../env/local/ddl/sampling-primary-key.result | 26 ++++++++++++++++--- .../env/local/ddl/sampling-primary-key.sql | 8 ++++-- 5 files changed, 51 insertions(+), 13 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index f8911aaecf..fa66d78953 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -196,6 +196,10 @@ pub struct TableFlushRequest { pub table_data: TableDataRef, /// Max sequence number to flush (inclusive). pub max_sequence: SequenceNumber, + + /// We may suggest new primary keys in preflush. if suggestion happened, we + /// need to ensure data is in new order. + need_reorder: bool, } #[derive(Clone)] @@ -287,7 +291,7 @@ impl FlushTask { // Start flush duration timer. let local_metrics = self.table_data.metrics.local_flush_metrics(); let _timer = local_metrics.start_flush_timer(); - self.dump_memtables(request_id, &mems_to_flush) + self.dump_memtables(request_id, &mems_to_flush, flush_req.need_reorder) .await .box_err() .context(FlushJobWithCause { @@ -316,6 +320,7 @@ impl FlushTask { let mut last_sequence = table_data.last_sequence(); // Switch (freeze) all mutable memtables. And update segment duration if // suggestion is returned. + let mut need_reorder = false; if let Some(suggest_segment_duration) = current_version.suggest_duration() { info!( "Update segment duration, table:{}, table_id:{}, segment_duration:{:?}", @@ -324,6 +329,7 @@ impl FlushTask { assert!(!suggest_segment_duration.is_zero()); if let Some(pk_idx) = current_version.suggest_primary_key() { + need_reorder = true; let mut schema = table_data.schema(); info!( "Update primary key, table:{}, table_id:{}, old:{:?}, new:{:?}", @@ -388,6 +394,7 @@ impl FlushTask { Ok(TableFlushRequest { table_data: table_data.clone(), max_sequence: last_sequence, + need_reorder, }) } @@ -401,6 +408,7 @@ impl FlushTask { &self, request_id: RequestId, mems_to_flush: &FlushableMemTables, + need_reorder: bool, ) -> Result<()> { let local_metrics = self.table_data.metrics.local_flush_metrics(); let mut files_to_level0 = Vec::with_capacity(mems_to_flush.memtables.len()); @@ -410,7 +418,12 @@ impl FlushTask { // process sampling memtable and frozen memtable if let Some(sampling_mem) = &mems_to_flush.sampling_mem { if let Some(seq) = self - .dump_sampling_memtable(request_id, sampling_mem, &mut files_to_level0) + .dump_sampling_memtable( + request_id, + sampling_mem, + &mut files_to_level0, + need_reorder, + ) .await? { flushed_sequence = seq; @@ -500,6 +513,7 @@ impl FlushTask { request_id: RequestId, sampling_mem: &SamplingMemTable, files_to_level0: &mut Vec, + need_reorder: bool, ) -> Result> { let (min_key, max_key) = match (sampling_mem.mem.min_key(), sampling_mem.mem.max_key()) { (Some(min_key), Some(max_key)) => (min_key, max_key), @@ -589,11 +603,13 @@ impl FlushTask { let iter = build_mem_table_iter(sampling_mem.mem.clone(), &self.table_data)?; let timestamp_idx = self.table_data.schema().timestamp_index(); - if let Some(pk_idx) = self.table_data.current_version().suggest_primary_key() { + if need_reorder { + let schema = self.table_data.schema(); + let primary_key_indexes = schema.primary_key_indexes(); let reorder = Reorder { iter, schema: self.table_data.schema(), - order_by_col_indexes: pk_idx, + order_by_col_indexes: primary_key_indexes.to_vec(), }; let mut stream = reorder.into_stream().await.context(ReorderMemIter)?; while let Some(data) = stream.next().await { diff --git a/analytic_engine/src/instance/reorder_memtable.rs b/analytic_engine/src/instance/reorder_memtable.rs index b29ecac02b..2e0901bbaa 100644 --- a/analytic_engine/src/instance/reorder_memtable.rs +++ b/analytic_engine/src/instance/reorder_memtable.rs @@ -39,7 +39,7 @@ use datafusion::{ execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream as DfRecordBatchStream, SendableRecordBatchStream, Statistics, }, - prelude::{col, Expr, SessionConfig, SessionContext}, + prelude::{ident, Expr, SessionConfig, SessionContext}, sql::TableReference, }; use futures::{Stream, StreamExt}; @@ -241,7 +241,7 @@ impl Reorder { let columns = schema.columns(); let sort_exprs = sort_by_col_idx .iter() - .map(|i| col(&columns[*i].name).sort(true, true)) + .map(|i| ident(&columns[*i].name).sort(true, true)) .collect::>(); let df_plan = LogicalPlanBuilder::scan(DUMMY_TABLE_NAME, source, None)? .sort(sort_exprs)? diff --git a/analytic_engine/src/sampler.rs b/analytic_engine/src/sampler.rs index 0e5445169c..86d85e56fc 100644 --- a/analytic_engine/src/sampler.rs +++ b/analytic_engine/src/sampler.rs @@ -292,7 +292,7 @@ impl PrimaryKeySampler { .iter() .enumerate() .map(|(idx, col)| { - if idx == timestamp_index { + if col.data_type.is_timestamp() { return None; } diff --git a/integration_tests/cases/env/local/ddl/sampling-primary-key.result b/integration_tests/cases/env/local/ddl/sampling-primary-key.result index 27e8da08c7..0324b1fc1b 100644 --- a/integration_tests/cases/env/local/ddl/sampling-primary-key.result +++ b/integration_tests/cases/env/local/ddl/sampling-primary-key.result @@ -8,7 +8,7 @@ CREATE TABLE `sampling_primary_key_table` ( v3 double, v5 double, name string TAG, - value int64 NOT NULL, + myVALUE int64 NOT NULL, t timestamp NOT NULL, timestamp KEY (t)) ENGINE = Analytic WITH ( update_mode='append', @@ -20,10 +20,10 @@ affected_rows: 0 show create table `sampling_primary_key_table`; Table,Create Table, -String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), +String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), -INSERT INTO `sampling_primary_key_table` (t, name, value) +INSERT INTO `sampling_primary_key_table` (t, name, myVALUE) VALUES (1695348000000, "ceresdb2", 200), (1695348000005, "ceresdb2", 100), @@ -32,12 +32,30 @@ INSERT INTO `sampling_primary_key_table` (t, name, value) affected_rows: 4 +select * from `sampling_primary_key_table`; + +tsid,t,v1,v2,v3,v5,name,myVALUE, +UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200), +UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100), +UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100), +UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200), + + -- After flush, its primary key should changed. -- SQLNESS ARG pre_cmd=flush show create table `sampling_primary_key_table`; Table,Create Table, -String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(value,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), +String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(myVALUE,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), + + +select * from `sampling_primary_key_table`; + +tsid,t,v1,v2,v3,v5,name,myVALUE, +UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100), +UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100), +UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200), +UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200), DROP TABLE IF EXISTS `sampling_primary_key_table`; diff --git a/integration_tests/cases/env/local/ddl/sampling-primary-key.sql b/integration_tests/cases/env/local/ddl/sampling-primary-key.sql index 2536e721b1..4c1612ee67 100644 --- a/integration_tests/cases/env/local/ddl/sampling-primary-key.sql +++ b/integration_tests/cases/env/local/ddl/sampling-primary-key.sql @@ -7,7 +7,7 @@ CREATE TABLE `sampling_primary_key_table` ( v3 double, v5 double, name string TAG, - value int64 NOT NULL, + myVALUE int64 NOT NULL, t timestamp NOT NULL, timestamp KEY (t)) ENGINE = Analytic WITH ( update_mode='append', @@ -16,15 +16,19 @@ CREATE TABLE `sampling_primary_key_table` ( show create table `sampling_primary_key_table`; -INSERT INTO `sampling_primary_key_table` (t, name, value) +INSERT INTO `sampling_primary_key_table` (t, name, myVALUE) VALUES (1695348000000, "ceresdb2", 200), (1695348000005, "ceresdb2", 100), (1695348000001, "ceresdb1", 100), (1695348000003, "ceresdb3", 200); +select * from `sampling_primary_key_table`; + -- After flush, its primary key should changed. -- SQLNESS ARG pre_cmd=flush show create table `sampling_primary_key_table`; +select * from `sampling_primary_key_table`; + DROP TABLE IF EXISTS `sampling_primary_key_table`;