diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java index 55ded5ce..a0d3dc24 100644 --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java @@ -40,10 +40,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * a alternative implementation of a connection object that forwards the mutations to a kafka queue @@ -51,11 +48,7 @@ * */ @InterfaceAudience.Private public class KafkaBridgeConnection implements Connection { - private static final Logger LOG = LoggerFactory.getLogger(KafkaBridgeConnection.class); - private final Configuration conf; - private final User user; - private final ExecutorService pool; private volatile boolean closed = false; private TopicRoutingRules routingRules; private Producer producer; @@ -74,32 +67,23 @@ public KafkaBridgeConnection(Configuration conf, ExecutorService pool, User user) throws IOException { this.conf = conf; - this.user = user; - this.pool = pool; setupRules(); startKafkaConnection(); } /** * for testing. - * @param conf hbase configuration - * @param pool executor service - * @param user user with connection + * @param conf hbase configuration * @param routingRules a set of routing rules * @param producer a kafka producer * @throws IOException on error */ - public KafkaBridgeConnection(Configuration conf, - ExecutorService pool, - User user, - TopicRoutingRules routingRules, - Producer producer) - throws IOException { + @VisibleForTesting + public KafkaBridgeConnection(Configuration conf, TopicRoutingRules routingRules, + Producer producer) { this.conf = conf; - this.user = user; - this.pool = pool; - this.producer=producer; - this.routingRules=routingRules; + this.producer = producer; + this.routingRules = routingRules; } private void setupRules() throws IOException { @@ -161,6 +145,11 @@ public RegionLocator getRegionLocator(TableName tableName) throws IOException { return null; } + /* Without @Override, we can also compile it against HBase 2.1. */ + /* @Override */ + public void clearRegionLocationCache() { + } + @Override public Admin getAdmin() throws IOException { return null; diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java index a474cdce..5ef19163 100644 --- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java +++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java @@ -23,7 +23,6 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -89,8 +88,7 @@ public void testSendMessage() { rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8"))); Configuration conf = new Configuration(); KafkaBridgeConnection connection = - new KafkaBridgeConnection( - conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer); + new KafkaBridgeConnection(conf,rules,myTestingProducer); long zeTimestamp = System.currentTimeMillis(); Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp); put.addColumn("FAMILY".getBytes("UTF-8"), diff --git a/pom.xml b/pom.xml index 79fed675..7d095ea4 100755 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ 0.5.0 4.12 2.1.0 - 2.7.7 + 2.8.5 3.0.3 ${hadoop-two.version} 1.7.25 diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index d9d5a66f..70f5cab2 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -440,11 +440,9 @@ case class HBaseRelation ( if (field != null) { if (field.isRowKey) { parentRowKeyFilter.mergeIntersect(new RowKeyFilter( - DefaultSourceStaticUtils.getByteValue(field, - value.toString), null)) + Utils.toBytes(value, field), null)) } - val byteValue = - DefaultSourceStaticUtils.getByteValue(field, value.toString) + val byteValue = Utils.toBytes(value, field) valueArray += byteValue } new EqualLogicExpression(attr, valueArray.length - 1, false) @@ -929,12 +927,6 @@ class ColumnFilterCollection { @InterfaceAudience.Private object DefaultSourceStaticUtils { - val rawInteger = new RawInteger - val rawLong = new RawLong - val rawFloat = new RawFloat - val rawDouble = new RawDouble - val rawString = RawString.ASCENDING - val byteRange = new ThreadLocal[PositionedByteRange] { override def initialValue(): PositionedByteRange = { val range = new SimplePositionedMutableByteRange() @@ -973,93 +965,6 @@ object DefaultSourceStaticUtils { lastFiveExecutionRules.poll() } } - - /** - * This method will convert the result content from HBase into the - * SQL value type that is requested by the Spark SQL schema definition - * - * @param field The structure of the SparkSQL Column - * @param r The result object from HBase - * @return The converted object type - */ - def getValue(field: Field, - r: Result): Any = { - if (field.isRowKey) { - val row = r.getRow - - field.dt match { - case IntegerType => rawInteger.decode(getFreshByteRange(row)) - case LongType => rawLong.decode(getFreshByteRange(row)) - case FloatType => rawFloat.decode(getFreshByteRange(row)) - case DoubleType => rawDouble.decode(getFreshByteRange(row)) - case StringType => rawString.decode(getFreshByteRange(row)) - case TimestampType => rawLong.decode(getFreshByteRange(row)) - case _ => Bytes.toString(row) - } - } else { - val cellByteValue = - r.getColumnLatestCell(field.cfBytes, field.colBytes) - if (cellByteValue == null) null - else field.dt match { - case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray, - cellByteValue.getValueOffset, cellByteValue.getValueLength)) - case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray, - cellByteValue.getValueOffset, cellByteValue.getValueLength)) - case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray, - cellByteValue.getValueOffset, cellByteValue.getValueLength)) - case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray, - cellByteValue.getValueOffset, cellByteValue.getValueLength)) - case StringType => Bytes.toString(cellByteValue.getValueArray, - cellByteValue.getValueOffset, cellByteValue.getValueLength) - case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray, - cellByteValue.getValueOffset, cellByteValue.getValueLength)) - case _ => Bytes.toString(cellByteValue.getValueArray, - cellByteValue.getValueOffset, cellByteValue.getValueLength) - } - } - } - - /** - * This will convert the value from SparkSQL to be stored into HBase using the - * right byte Type - * - * @param value String value from SparkSQL - * @return Returns the byte array to go into HBase - */ - def getByteValue(field: Field, - value: String): Array[Byte] = { - field.dt match { - case IntegerType => - val result = new Array[Byte](Bytes.SIZEOF_INT) - val localDataRange = getFreshByteRange(result) - rawInteger.encode(localDataRange, value.toInt) - localDataRange.getBytes - case LongType => - val result = new Array[Byte](Bytes.SIZEOF_LONG) - val localDataRange = getFreshByteRange(result) - rawLong.encode(localDataRange, value.toLong) - localDataRange.getBytes - case FloatType => - val result = new Array[Byte](Bytes.SIZEOF_FLOAT) - val localDataRange = getFreshByteRange(result) - rawFloat.encode(localDataRange, value.toFloat) - localDataRange.getBytes - case DoubleType => - val result = new Array[Byte](Bytes.SIZEOF_DOUBLE) - val localDataRange = getFreshByteRange(result) - rawDouble.encode(localDataRange, value.toDouble) - localDataRange.getBytes - case StringType => - Bytes.toBytes(value) - case TimestampType => - val result = new Array[Byte](Bytes.SIZEOF_LONG) - val localDataRange = getFreshByteRange(result) - rawLong.encode(localDataRange, value.toLong) - localDataRange.getBytes - - case _ => Bytes.toBytes(value) - } - } } /** diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala index 093c6ac0..1e505854 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Utils.scala @@ -18,18 +18,17 @@ package org.apache.hadoop.hbase.spark.datasources +import java.sql.{Date, Timestamp} + import org.apache.hadoop.hbase.spark.AvroSerdes import org.apache.hadoop.hbase.util.Bytes -//import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String - import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private object Utils { - /** * Parses the hbase field to it's corresponding * scala type which can then be put into a Spark GenericRow @@ -48,14 +47,16 @@ object Utils { } else { // Fall back to atomic type f.dt match { - case BooleanType => toBoolean(src, offset) + case BooleanType => src(offset) != 0 case ByteType => src(offset) - case DoubleType => Bytes.toDouble(src, offset) - case FloatType => Bytes.toFloat(src, offset) - case IntegerType => Bytes.toInt(src, offset) - case LongType|TimestampType => Bytes.toLong(src, offset) case ShortType => Bytes.toShort(src, offset) - case StringType => toUTF8String(src, offset, length) + case IntegerType => Bytes.toInt(src, offset) + case LongType => Bytes.toLong(src, offset) + case FloatType => Bytes.toFloat(src, offset) + case DoubleType => Bytes.toDouble(src, offset) + case DateType => new Date(Bytes.toLong(src, offset)) + case TimestampType => new Timestamp(Bytes.toLong(src, offset)) + case StringType => UTF8String.fromBytes(src, offset, length) case BinaryType => val newArray = new Array[Byte](length) System.arraycopy(src, offset, newArray, 0, length) @@ -73,28 +74,19 @@ object Utils { val record = field.catalystToAvro(input) AvroSerdes.serialize(record, field.schema.get) } else { - input match { - case data: Boolean => Bytes.toBytes(data) - case data: Byte => Array(data) - case data: Array[Byte] => data - case data: Double => Bytes.toBytes(data) - case data: Float => Bytes.toBytes(data) - case data: Int => Bytes.toBytes(data) - case data: Long => Bytes.toBytes(data) - case data: Short => Bytes.toBytes(data) - case data: UTF8String => data.getBytes - case data: String => Bytes.toBytes(data) - // TODO: add more data type support + field.dt match { + case BooleanType => Bytes.toBytes(input.asInstanceOf[Boolean]) + case ByteType => Array(input.asInstanceOf[Number].byteValue) + case ShortType => Bytes.toBytes(input.asInstanceOf[Number].shortValue) + case IntegerType => Bytes.toBytes(input.asInstanceOf[Number].intValue) + case LongType => Bytes.toBytes(input.asInstanceOf[Number].longValue) + case FloatType => Bytes.toBytes(input.asInstanceOf[Number].floatValue) + case DoubleType => Bytes.toBytes(input.asInstanceOf[Number].doubleValue) + case DateType | TimestampType => Bytes.toBytes(input.asInstanceOf[java.util.Date].getTime) + case StringType => Bytes.toBytes(input.toString) + case BinaryType => input.asInstanceOf[Array[Byte]] case _ => throw new Exception(s"unsupported data type ${field.dt}") } } } - - def toBoolean(input: Array[Byte], offset: Int): Boolean = { - input(offset) != 0 - } - - def toUTF8String(input: Array[Byte], offset: Int, length: Int): UTF8String = { - UTF8String.fromBytes(input.slice(offset, offset + length)) - } } diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index afe515bc..72a84cf1 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -17,6 +17,8 @@ package org.apache.hadoop.hbase.spark +import java.sql.{Date, Timestamp} + import org.apache.avro.Schema import org.apache.avro.generic.GenericData import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} @@ -89,8 +91,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { val t1TableName = "t1" val t2TableName = "t2" + val t3TableName = "t3" val columnFamily = "c" + val timestamp = 1234567890000L + var sqlContext:SQLContext = null var df:DataFrame = null @@ -109,12 +114,22 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { catch { case e: Exception => logInfo(" - no table " + t2TableName + " found") } + try + TEST_UTIL.deleteTable(TableName.valueOf(t3TableName)) + catch { + case e: Exception => logInfo(" - no table " + t3TableName + " found") + } + logInfo(" - creating table " + t1TableName) TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily)) logInfo(" - created table") logInfo(" - creating table " + t2TableName) TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily)) logInfo(" - created table") + logInfo(" - creating table " + t3TableName) + TEST_UTIL.createTable(TableName.valueOf(t3TableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + val sparkConf = new SparkConf sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true") sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100") @@ -124,7 +139,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration) try { - val t1Table = connection.getTable(TableName.valueOf("t1")) + val t1Table = connection.getTable(TableName.valueOf(t1TableName)) try { var put = new Put(Bytes.toBytes("get1")) @@ -158,7 +173,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { t1Table.close() } - val t2Table = connection.getTable(TableName.valueOf("t2")) + val t2Table = connection.getTable(TableName.valueOf(t2TableName)) try { var put = new Put(Bytes.toBytes(1)) @@ -191,6 +206,26 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { } finally { t2Table.close() } + + val t3Table = connection.getTable(TableName.valueOf(t3TableName)) + + try { + val put = new Put(Bytes.toBytes("row")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("binary"), Array(1.toByte, 2.toByte, 3.toByte)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("boolean"), Bytes.toBytes(true)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("byte"), Array(127.toByte)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("short"), Bytes.toBytes(32767.toShort)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("int"), Bytes.toBytes(1000000)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("long"), Bytes.toBytes(10000000000L)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("float"), Bytes.toBytes(0.5f)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("double"), Bytes.toBytes(0.125)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("date"), Bytes.toBytes(timestamp)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("timestamp"), Bytes.toBytes(timestamp)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("string"), Bytes.toBytes("string")) + t3Table.put(put) + } finally { + t3Table.close() + } } finally { connection.close() } @@ -807,6 +842,59 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(executionRules.dynamicLogicExpression == null) } + test("Test mapping") { + val catalog = s"""{ + |"table":{"namespace":"default", "name":"t3"}, + |"rowkey":"key", + |"columns":{ + |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"}, + |"BINARY_FIELD":{"cf":"c", "col":"binary", "type":"binary"}, + |"BOOLEAN_FIELD":{"cf":"c", "col":"boolean", "type":"boolean"}, + |"BYTE_FIELD":{"cf":"c", "col":"byte", "type":"byte"}, + |"SHORT_FIELD":{"cf":"c", "col":"short", "type":"short"}, + |"INT_FIELD":{"cf":"c", "col":"int", "type":"int"}, + |"LONG_FIELD":{"cf":"c", "col":"long", "type":"long"}, + |"FLOAT_FIELD":{"cf":"c", "col":"float", "type":"float"}, + |"DOUBLE_FIELD":{"cf":"c", "col":"double", "type":"double"}, + |"DATE_FIELD":{"cf":"c", "col":"date", "type":"date"}, + |"TIMESTAMP_FIELD":{"cf":"c", "col":"timestamp", "type":"timestamp"}, + |"STRING_FIELD":{"cf":"c", "col":"string", "type":"string"} + |} + |}""".stripMargin + df = sqlContext.load("org.apache.hadoop.hbase.spark", + Map(HBaseTableCatalog.tableCatalog->catalog)) + + df.registerTempTable("hbaseTestMapping") + + val results = sqlContext.sql("SELECT binary_field, boolean_field, " + + "byte_field, short_field, int_field, long_field, " + + "float_field, double_field, date_field, timestamp_field, " + + "string_field FROM hbaseTestMapping").collect() + + assert(results.length == 1) + + val result = results(0) + + System.out.println("row: " + result) + System.out.println("0: " + result.get(0)) + System.out.println("1: " + result.get(1)) + System.out.println("2: " + result.get(2)) + System.out.println("3: " + result.get(3)) + + assert(result.get(0).asInstanceOf[Array[Byte]].sameElements(Array(1.toByte, 2.toByte, 3.toByte))) + assert(result.get(1) == true) + assert(result.get(2) == 127) + assert(result.get(3) == 32767) + assert(result.get(4) == 1000000) + assert(result.get(5) == 10000000000L) + assert(result.get(6) == 0.5) + assert(result.get(7) == 0.125) + // sql date stores only year, month and day, so checking it is within a day + assert(Math.abs(result.get(8).asInstanceOf[Date].getTime - timestamp) <= 86400000) + assert(result.get(9).asInstanceOf[Timestamp].getTime == timestamp) + assert(result.get(10) == "string") + } + def writeCatalog = s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key", diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala index 74bf912c..d6245a64 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala @@ -38,7 +38,9 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA |"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"}, |"col6":{"cf":"cf1", "col":"col5", "type":"$map"}, |"col7":{"cf":"cf1", "col":"col6", "type":"$array"}, - |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"} + |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}, + |"col9":{"cf":"cf1", "col":"col8", "type":"date"}, + |"col10":{"cf":"cf1", "col":"col9", "type":"timestamp"} |} |}""".stripMargin val parameters = Map(HBaseTableCatalog.tableCatalog->catalog) @@ -63,6 +65,8 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA assert(t.getField("col2").length == Bytes.SIZEOF_DOUBLE) assert(t.getField("col1").length == -1) assert(t.getField("col8").length == -1) + assert(t.getField("col9").dt == DateType) + assert(t.getField("col10").dt == TimestampType) } checkDataType( @@ -95,7 +99,7 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt) } - test("compatiblity") { + test("compatibility") { val m = Map("hbase.columns.mapping" -> "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD BINARY c:c,", "hbase.table" -> "t1") diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala index 1b71eb40..f07b1d6e 100644 --- a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala @@ -59,6 +59,9 @@ class ConnectionMocker extends Connection { def isAborted: Boolean = true def abort(why: String, e: Throwable) = {} + + /* Without override, we can also compile it against HBase 2.1. */ + /* override */ def clearRegionLocationCache(): Unit = {} } class HBaseConnectionCacheSuite extends FunSuite with Logging {