From f8ee8f6b78432d8fa0884105ec04fdd58c3e37b3 Mon Sep 17 00:00:00 2001 From: ILuffZhe <37180946+ILuffZhe@users.noreply.github.com> Date: Wed, 2 Aug 2023 19:54:32 +0800 Subject: [PATCH] HBASE-27488 [hbase-connectors] Duplicate result when searching HBase by Spark (#106) Signed-off-by: Reid Chan --- .../spark/datasources/HBaseTableScanRDD.scala | 18 +++++++++++------- .../hbase/spark/DefaultSourceSuite.scala | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index fe325f7d..c334076f 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -114,16 +114,20 @@ class HBaseTableScanRDD(relation: HBaseRelation, hbaseContext: HBaseContext): Iterator[Result] = { g.grouped(relation.bulkGetSize).flatMap{ x => val gets = new ArrayList[Get](x.size) + val rowkeySet = new mutable.HashSet[String]() x.foreach{ y => - val g = new Get(y) - handleTimeSemantics(g) - columns.foreach { d => - if (!d.isRowKey) { - g.addColumn(d.cfBytes, d.colBytes) + if (!rowkeySet.contains(y.mkString("Array(", ", ", ")"))) { + val g = new Get(y) + handleTimeSemantics(g) + columns.foreach { d => + if (!d.isRowKey) { + g.addColumn(d.cfBytes, d.colBytes) + } } + filter.foreach(g.setFilter(_)) + gets.add(g) + rowkeySet.add(y.mkString("Array(", ", ", ")")) } - filter.foreach(g.setFilter(_)) - gets.add(g) } hbaseContext.applyCreds() val tmp = tbr.get(gets) diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index 366c9baa..47145d35 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -297,6 +297,22 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(executionRules.rowKeyFilter.ranges.size == 0) } + /** + * A example of query three fields and also only using rowkey points for the filter, + * some rowkey points are duplicate. + */ + test("Test rowKey point only rowKey query, which contains duplicate rowkey") { + val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " + + "WHERE " + + "(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get1')").take(10) + val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll() + assert(results.length == 2) + assert(executionRules.dynamicLogicExpression.toExpressionString. + equals("( KEY_FIELD == 0 OR KEY_FIELD == 1 )")) + assert(executionRules.rowKeyFilter.points.size == 2) + assert(executionRules.rowKeyFilter.ranges.size == 0) + } + /** * A example of query three fields and also only using cell points for the filter */