From 699c0caaa290bec006de4571a1c7ea6faec98d38 Mon Sep 17 00:00:00 2001 From: Alexander Date: Sun, 18 Aug 2024 18:25:47 +0200 Subject: [PATCH] Fixed ReplacingMergeTree EngineSpec parsing: is_deleted column presence caused error (#357) * Fixed ReplacingMergeTree EngineSpec parsing: is_deleted column presence caused error * Preserved is_deleted column in engine spec, added the same fix for ReplicatedReplacingMergeTree --- .../clickhouse/spark/parse/AstVisitor.scala | 6 ++-- .../spark/spec/TableEngineSpec.scala | 2 ++ .../spark/parse/SQLParserSuite.scala | 34 +++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala index f4f4f8fc..dc1a7331 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/parse/AstVisitor.scala @@ -102,7 +102,8 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging { case eg: String if "ReplacingMergeTree" equalsIgnoreCase eg => ReplacingMergeTreeEngineSpec( engine_clause = engineExpr, - version_column = seqToOption(engineArgs).map(_.asInstanceOf[FieldRef]), + version_column = engineArgs.lift(0).map(_.asInstanceOf[FieldRef]), + is_deleted_column = engineArgs.lift(1).map(_.asInstanceOf[FieldRef]), _sorting_key = tupleIfNeeded(orderByOpt.toList), _primary_key = tupleIfNeeded(pkOpt.toList), _partition_key = tupleIfNeeded(partOpt.toList), @@ -127,7 +128,8 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging { engine_clause = engineExpr, zk_path = engineArgs.head.asInstanceOf[StringLiteral].value, replica_name = engineArgs(1).asInstanceOf[StringLiteral].value, - version_column = seqToOption(engineArgs.drop(2)).map(_.asInstanceOf[FieldRef]), + version_column = engineArgs.lift(2).map(_.asInstanceOf[FieldRef]), + is_deleted_column = engineArgs.lift(3).map(_.asInstanceOf[FieldRef]), _sorting_key = tupleIfNeeded(orderByOpt.toList), _primary_key = tupleIfNeeded(pkOpt.toList), _partition_key = tupleIfNeeded(partOpt.toList), diff --git a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala index 13534d76..48f56c3b 100644 --- a/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala +++ b/clickhouse-core/src/main/scala/com/clickhouse/spark/spec/TableEngineSpec.scala @@ -88,6 +88,7 @@ case class ReplicatedMergeTreeEngineSpec( case class ReplacingMergeTreeEngineSpec( engine_clause: String, version_column: Option[FieldRef] = None, + is_deleted_column: Option[FieldRef] = None, var _sorting_key: TupleExpr = TupleExpr(List.empty), var _primary_key: TupleExpr = TupleExpr(List.empty), var _partition_key: TupleExpr = TupleExpr(List.empty), @@ -109,6 +110,7 @@ case class ReplicatedReplacingMergeTreeEngineSpec( zk_path: String, replica_name: String, version_column: Option[FieldRef] = None, + is_deleted_column: Option[FieldRef] = None, var _sorting_key: TupleExpr = TupleExpr(List.empty), var _primary_key: TupleExpr = TupleExpr(List.empty), var _partition_key: TupleExpr = TupleExpr(List.empty), diff --git a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala index 181e7f88..bbcdf245 100644 --- a/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala +++ b/clickhouse-core/src/test/scala/com/clickhouse/spark/parse/SQLParserSuite.scala @@ -83,6 +83,21 @@ class SQLParserSuite extends AnyFunSuite { assert(actual === expected) } + test("parse ReplacingMergeTree - 3") { + val ddl = "ReplacingMergeTree(ts, is_deleted) " + + "PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192" + val actual = parser.parseEngineClause(ddl) + val expected = ReplacingMergeTreeEngineSpec( + engine_clause = "ReplacingMergeTree(ts, is_deleted)", + version_column = Some(FieldRef("ts")), + is_deleted_column = Some(FieldRef("is_deleted")), + _sorting_key = TupleExpr(FieldRef("id") :: Nil), + _partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))), + _settings = Map("index_granularity" -> "8192") + ) + assert(actual === expected) + } + test("parse ReplicatedReplacingMergeTree - 1") { val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}') " + "PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192" @@ -115,6 +130,25 @@ class SQLParserSuite extends AnyFunSuite { assert(actual === expected) } + test("parse ReplicatedReplacingMergeTree - 3") { + val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " + + "ts, is_deleted) PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192" + val actual = parser.parseEngineClause(ddl) + val expected = ReplicatedReplacingMergeTreeEngineSpec( + engine_clause = + "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " + + "ts, is_deleted)", + zk_path = "/clickhouse/tables/{shard}/wj_report/wj_respondent", + replica_name = "{replica}", + version_column = Some(FieldRef("ts")), + is_deleted_column = Some(FieldRef("is_deleted")), + _sorting_key = TupleExpr(FieldRef("id") :: Nil), + _partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))), + _settings = Map("index_granularity" -> "8192") + ) + assert(actual === expected) + } + test("parse Distributed - 1") { val ddl = "Distributed('default', 'wj_report', 'wj_respondent_local')" val actual = parser.parseEngineClause(ddl)