From 63c5c1823efedb1ac6d61ee0e0969e09f035db01 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Wed, 3 Jul 2024 17:13:43 +0800 Subject: [PATCH 1/4] fix column mismatch --- .../org/apache/spark/sql/IssueTestSuite.scala | 8 ++++++++ .../com/pingcap/tikv/meta/TiDAGRequest.java | 19 ++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala index 2109adb532..51820b7431 100644 --- a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala @@ -22,6 +22,14 @@ import org.apache.spark.sql.functions.{col, sum} class IssueTestSuite extends BaseTiSparkTest { + test("test column mismatch, issue 2750") { + val dbTable = "tispark_test.column_mismatch" + tidbStmt.execute(s"drop table if exists $dbTable") + tidbStmt.execute( + s"CREATE TABLE $dbTable (`CI_NO` varchar(64) NOT NULL, `AC_DT` bigint(20) NOT NULL, `SRC_KEY` varchar(100) NOT NULL, PRIMARY KEY (`SRC_KEY`,`CI_NO`,`AC_DT`) /*T![clustered_index] CLUSTERED */, KEY `IDX_FLOW_01` (`CI_NO`,`AC_DT`))") + spark.sql(s"select ci_no,ac_dt from $dbTable ").show() + } + test("test like escape") { val dbTable = "tispark_test.like_escape" tidbStmt.execute(s"drop table if exists $dbTable") diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java index be32501615..4baf65bd44 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java @@ -227,7 +227,14 @@ public DAGRequest buildDAGGetIndexData() { indexScanBuilder.setTableId(getPhysicalId()).setIndexId(indexInfo.getId()); if (tableInfo.isCommonHandle()) { for (TiIndexColumn col : tableInfo.getPrimaryKey().getIndexColumns()) { - indexScanBuilder.addPrimaryColumnIds(tableInfo.getColumn(col.getName()).getId()); + long primaryColumnId = tableInfo.getColumn(col.getName()).getId(); + indexScanBuilder.addPrimaryColumnIds(primaryColumnId); + // add primary columns to columns: + // https://github.com/pingcap/tidb/blob/ca7ba14da87abd060922b32c893b121c4c3d6373/pkg/executor/builder.go#L569 + if (!isPrimaryInColumns(indexScanBuilder.getColumnsList(), primaryColumnId)) { + TiColumnInfo columnInfo = tableInfo.getColumn(col.getName()); + indexScanBuilder.addColumns(ColumnInfo.newBuilder(columnInfo.toProto(tableInfo))); + } } } @@ -238,6 +245,16 @@ public DAGRequest buildDAGGetIndexData() { return buildRequest(dagRequestBuilder, outputOffsets); } + private boolean isPrimaryInColumns( + java.util.List columnsList, long primaryColumnId) { + for (com.pingcap.tidb.tipb.ColumnInfo column : columnsList) { + if (column.getColumnId() == primaryColumnId) { + return true; + } + } + return false; + } + private void addIndexLookUpIndexRangeScanExecutorCols( IndexScan.Builder indexScanBuilder, List outputOffsets, From c48aaaac9a8c022b7a795d47196d57ab3dfc6e50 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Fri, 19 Jul 2024 13:26:07 +0800 Subject: [PATCH 2/4] fix --- .../org/apache/spark/sql/IssueTestSuite.scala | 5 +++-- .../com/pingcap/tikv/meta/TiDAGRequest.java | 21 ++++--------------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala index 51820b7431..fb8824faa7 100644 --- a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala @@ -26,8 +26,9 @@ class IssueTestSuite extends BaseTiSparkTest { val dbTable = "tispark_test.column_mismatch" tidbStmt.execute(s"drop table if exists $dbTable") tidbStmt.execute( - s"CREATE TABLE $dbTable (`CI_NO` varchar(64) NOT NULL, `AC_DT` bigint(20) NOT NULL, `SRC_KEY` varchar(100) NOT NULL, PRIMARY KEY (`SRC_KEY`,`CI_NO`,`AC_DT`) /*T![clustered_index] CLUSTERED */, KEY `IDX_FLOW_01` (`CI_NO`,`AC_DT`))") - spark.sql(s"select ci_no,ac_dt from $dbTable ").show() + s"CREATE TABLE $dbTable (`CI_NO` varchar(64) NOT NULL, `AC_DT` bigint(20) NOT NULL, `YM_DT` bigint(20) NOT NULL,`SYS_TYPE` varchar(20) NOT NULL, PRIMARY KEY (`CI_NO`,`AC_DT`,`YM_DT`,`SYS_TYPE`) /*T![clustered_index] CLUSTERED */, KEY `IDX_FLOW_01` (`CI_NO`,`AC_DT`))") + tidbStmt.execute(s"insert into $dbTable values('1',1,1,'1')") + spark.sql(s"select ci_no,ac_dt from $dbTable").show() } test("test like escape") { diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java index 4baf65bd44..99f94b60e6 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java @@ -227,14 +227,11 @@ public DAGRequest buildDAGGetIndexData() { indexScanBuilder.setTableId(getPhysicalId()).setIndexId(indexInfo.getId()); if (tableInfo.isCommonHandle()) { for (TiIndexColumn col : tableInfo.getPrimaryKey().getIndexColumns()) { - long primaryColumnId = tableInfo.getColumn(col.getName()).getId(); - indexScanBuilder.addPrimaryColumnIds(primaryColumnId); + TiColumnInfo columnInfo = tableInfo.getColumn(col.getName()); + indexScanBuilder.addPrimaryColumnIds(columnInfo.getId()); // add primary columns to columns: - // https://github.com/pingcap/tidb/blob/ca7ba14da87abd060922b32c893b121c4c3d6373/pkg/executor/builder.go#L569 - if (!isPrimaryInColumns(indexScanBuilder.getColumnsList(), primaryColumnId)) { - TiColumnInfo columnInfo = tableInfo.getColumn(col.getName()); - indexScanBuilder.addColumns(ColumnInfo.newBuilder(columnInfo.toProto(tableInfo))); - } + // https://github.com/pingcap/tidb/blob/ddcaadbb856f0890e91e4c77991f0d2aa5aa93d0/pkg/planner/core/planbuilder.go#L1515 + indexScanBuilder.addColumns(ColumnInfo.newBuilder(columnInfo.toProto(tableInfo))); } } @@ -245,16 +242,6 @@ public DAGRequest buildDAGGetIndexData() { return buildRequest(dagRequestBuilder, outputOffsets); } - private boolean isPrimaryInColumns( - java.util.List columnsList, long primaryColumnId) { - for (com.pingcap.tidb.tipb.ColumnInfo column : columnsList) { - if (column.getColumnId() == primaryColumnId) { - return true; - } - } - return false; - } - private void addIndexLookUpIndexRangeScanExecutorCols( IndexScan.Builder indexScanBuilder, List outputOffsets, From 1da850af9a5268f1046f8e811c93b2611da4a3db Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Tue, 23 Jul 2024 13:56:54 +0800 Subject: [PATCH 3/4] fix index look up fail --- .../scala/org/apache/spark/sql/IssueTestSuite.scala | 5 ++++- .../java/com/pingcap/tikv/meta/TiDAGRequest.java | 13 ++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala index fb8824faa7..750bd046f8 100644 --- a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala @@ -28,7 +28,10 @@ class IssueTestSuite extends BaseTiSparkTest { tidbStmt.execute( s"CREATE TABLE $dbTable (`CI_NO` varchar(64) NOT NULL, `AC_DT` bigint(20) NOT NULL, `YM_DT` bigint(20) NOT NULL,`SYS_TYPE` varchar(20) NOT NULL, PRIMARY KEY (`CI_NO`,`AC_DT`,`YM_DT`,`SYS_TYPE`) /*T![clustered_index] CLUSTERED */, KEY `IDX_FLOW_01` (`CI_NO`,`AC_DT`))") tidbStmt.execute(s"insert into $dbTable values('1',1,1,'1')") - spark.sql(s"select ci_no,ac_dt from $dbTable").show() + spark.sql(s"select ci_no,ac_dt from $dbTable").explain() + spark.sqlContext.setConf(TiConfigConst.USE_INDEX_SCAN_FIRST, "true") + spark.sql(s"select * from $dbTable").show() + spark.sqlContext.setConf(TiConfigConst.USE_INDEX_SCAN_FIRST, "false") } test("test like escape") { diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java index 99f94b60e6..223a0602cd 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java @@ -229,9 +229,6 @@ public DAGRequest buildDAGGetIndexData() { for (TiIndexColumn col : tableInfo.getPrimaryKey().getIndexColumns()) { TiColumnInfo columnInfo = tableInfo.getColumn(col.getName()); indexScanBuilder.addPrimaryColumnIds(columnInfo.getId()); - // add primary columns to columns: - // https://github.com/pingcap/tidb/blob/ddcaadbb856f0890e91e4c77991f0d2aa5aa93d0/pkg/planner/core/planbuilder.go#L1515 - indexScanBuilder.addColumns(ColumnInfo.newBuilder(columnInfo.toProto(tableInfo))); } } @@ -304,6 +301,16 @@ private void addIndexReaderIndexRangeScanExecutorCols( + columnInfo.getName()); } } + if (tableInfo.isCommonHandle()) { + for (TiIndexColumn col : tableInfo.getPrimaryKey().getIndexColumns()) { + TiColumnInfo columnInfo = tableInfo.getColumn(col.getName()); + // add primary columns to columns: + // https://github.com/pingcap/tidb/blob/ddcaadbb856f0890e91e4c77991f0d2aa5aa93d0/pkg/planner/core/planbuilder.go#L1515 + if (!isDoubleRead()) { + indexScanBuilder.addColumns(ColumnInfo.newBuilder(columnInfo.toProto(tableInfo))); + } + } + } } private void addIndexColsToScanBuilder( From 217f80810d55c09039f84a969133a8c152f6d730 Mon Sep 17 00:00:00 2001 From: shiyuhang <1136742008@qq.com> Date: Tue, 23 Jul 2024 18:49:52 +0800 Subject: [PATCH 4/4] opt --- .../src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java index 223a0602cd..e08bcf151f 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java @@ -227,8 +227,7 @@ public DAGRequest buildDAGGetIndexData() { indexScanBuilder.setTableId(getPhysicalId()).setIndexId(indexInfo.getId()); if (tableInfo.isCommonHandle()) { for (TiIndexColumn col : tableInfo.getPrimaryKey().getIndexColumns()) { - TiColumnInfo columnInfo = tableInfo.getColumn(col.getName()); - indexScanBuilder.addPrimaryColumnIds(columnInfo.getId()); + indexScanBuilder.addPrimaryColumnIds(tableInfo.getColumn(col.getName()).getId()); } } @@ -306,9 +305,7 @@ private void addIndexReaderIndexRangeScanExecutorCols( TiColumnInfo columnInfo = tableInfo.getColumn(col.getName()); // add primary columns to columns: // https://github.com/pingcap/tidb/blob/ddcaadbb856f0890e91e4c77991f0d2aa5aa93d0/pkg/planner/core/planbuilder.go#L1515 - if (!isDoubleRead()) { - indexScanBuilder.addColumns(ColumnInfo.newBuilder(columnInfo.toProto(tableInfo))); - } + indexScanBuilder.addColumns(ColumnInfo.newBuilder(columnInfo.toProto(tableInfo))); } } }