Skip to content

Commit

Permalink
HBASE-22711 Spark connector doesn't use the given mapping when insert…
Browse files Browse the repository at this point in the history
…ing data

* insert always uses the given mapping

* supports the following types:
  * binary
  * boolean
  * byte, short, int, long
  * float, double
  * date, timestamp
  * string

Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
meszibalu authored Jul 22, 2019
1 parent 91231ca commit d631afb
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,9 @@ case class HBaseRelation (
if (field != null) {
if (field.isRowKey) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
DefaultSourceStaticUtils.getByteValue(field,
value.toString), null))
Utils.toBytes(value, field), null))
}
val byteValue =
DefaultSourceStaticUtils.getByteValue(field, value.toString)
val byteValue = Utils.toBytes(value, field)
valueArray += byteValue
}
new EqualLogicExpression(attr, valueArray.length - 1, false)
Expand Down Expand Up @@ -929,12 +927,6 @@ class ColumnFilterCollection {
@InterfaceAudience.Private
object DefaultSourceStaticUtils {

val rawInteger = new RawInteger
val rawLong = new RawLong
val rawFloat = new RawFloat
val rawDouble = new RawDouble
val rawString = RawString.ASCENDING

val byteRange = new ThreadLocal[PositionedByteRange] {
override def initialValue(): PositionedByteRange = {
val range = new SimplePositionedMutableByteRange()
Expand Down Expand Up @@ -973,93 +965,6 @@ object DefaultSourceStaticUtils {
lastFiveExecutionRules.poll()
}
}

/**
* This method will convert the result content from HBase into the
* SQL value type that is requested by the Spark SQL schema definition
*
* @param field The structure of the SparkSQL Column
* @param r The result object from HBase
* @return The converted object type
*/
def getValue(field: Field,
r: Result): Any = {
if (field.isRowKey) {
val row = r.getRow

field.dt match {
case IntegerType => rawInteger.decode(getFreshByteRange(row))
case LongType => rawLong.decode(getFreshByteRange(row))
case FloatType => rawFloat.decode(getFreshByteRange(row))
case DoubleType => rawDouble.decode(getFreshByteRange(row))
case StringType => rawString.decode(getFreshByteRange(row))
case TimestampType => rawLong.decode(getFreshByteRange(row))
case _ => Bytes.toString(row)
}
} else {
val cellByteValue =
r.getColumnLatestCell(field.cfBytes, field.colBytes)
if (cellByteValue == null) null
else field.dt match {
case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case StringType => Bytes.toString(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength)
case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case _ => Bytes.toString(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength)
}
}
}

/**
* This will convert the value from SparkSQL to be stored into HBase using the
* right byte Type
*
* @param value String value from SparkSQL
* @return Returns the byte array to go into HBase
*/
def getByteValue(field: Field,
value: String): Array[Byte] = {
field.dt match {
case IntegerType =>
val result = new Array[Byte](Bytes.SIZEOF_INT)
val localDataRange = getFreshByteRange(result)
rawInteger.encode(localDataRange, value.toInt)
localDataRange.getBytes
case LongType =>
val result = new Array[Byte](Bytes.SIZEOF_LONG)
val localDataRange = getFreshByteRange(result)
rawLong.encode(localDataRange, value.toLong)
localDataRange.getBytes
case FloatType =>
val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
val localDataRange = getFreshByteRange(result)
rawFloat.encode(localDataRange, value.toFloat)
localDataRange.getBytes
case DoubleType =>
val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
val localDataRange = getFreshByteRange(result)
rawDouble.encode(localDataRange, value.toDouble)
localDataRange.getBytes
case StringType =>
Bytes.toBytes(value)
case TimestampType =>
val result = new Array[Byte](Bytes.SIZEOF_LONG)
val localDataRange = getFreshByteRange(result)
rawLong.encode(localDataRange, value.toLong)
localDataRange.getBytes

case _ => Bytes.toBytes(value)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@

package org.apache.hadoop.hbase.spark.datasources

import java.sql.{Date, Timestamp}

import org.apache.hadoop.hbase.spark.AvroSerdes
import org.apache.hadoop.hbase.util.Bytes
//import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
object Utils {


/**
* Parses the hbase field to it's corresponding
* scala type which can then be put into a Spark GenericRow
Expand All @@ -48,14 +47,16 @@ object Utils {
} else {
// Fall back to atomic type
f.dt match {
case BooleanType => toBoolean(src, offset)
case BooleanType => src(offset) != 0
case ByteType => src(offset)
case DoubleType => Bytes.toDouble(src, offset)
case FloatType => Bytes.toFloat(src, offset)
case IntegerType => Bytes.toInt(src, offset)
case LongType|TimestampType => Bytes.toLong(src, offset)
case ShortType => Bytes.toShort(src, offset)
case StringType => toUTF8String(src, offset, length)
case IntegerType => Bytes.toInt(src, offset)
case LongType => Bytes.toLong(src, offset)
case FloatType => Bytes.toFloat(src, offset)
case DoubleType => Bytes.toDouble(src, offset)
case DateType => new Date(Bytes.toLong(src, offset))
case TimestampType => new Timestamp(Bytes.toLong(src, offset))
case StringType => UTF8String.fromBytes(src, offset, length)
case BinaryType =>
val newArray = new Array[Byte](length)
System.arraycopy(src, offset, newArray, 0, length)
Expand All @@ -73,28 +74,19 @@ object Utils {
val record = field.catalystToAvro(input)
AvroSerdes.serialize(record, field.schema.get)
} else {
input match {
case data: Boolean => Bytes.toBytes(data)
case data: Byte => Array(data)
case data: Array[Byte] => data
case data: Double => Bytes.toBytes(data)
case data: Float => Bytes.toBytes(data)
case data: Int => Bytes.toBytes(data)
case data: Long => Bytes.toBytes(data)
case data: Short => Bytes.toBytes(data)
case data: UTF8String => data.getBytes
case data: String => Bytes.toBytes(data)
// TODO: add more data type support
field.dt match {
case BooleanType => Bytes.toBytes(input.asInstanceOf[Boolean])
case ByteType => Array(input.asInstanceOf[Number].byteValue)
case ShortType => Bytes.toBytes(input.asInstanceOf[Number].shortValue)
case IntegerType => Bytes.toBytes(input.asInstanceOf[Number].intValue)
case LongType => Bytes.toBytes(input.asInstanceOf[Number].longValue)
case FloatType => Bytes.toBytes(input.asInstanceOf[Number].floatValue)
case DoubleType => Bytes.toBytes(input.asInstanceOf[Number].doubleValue)
case DateType | TimestampType => Bytes.toBytes(input.asInstanceOf[java.util.Date].getTime)
case StringType => Bytes.toBytes(input.toString)
case BinaryType => input.asInstanceOf[Array[Byte]]
case _ => throw new Exception(s"unsupported data type ${field.dt}")
}
}
}

def toBoolean(input: Array[Byte], offset: Int): Boolean = {
input(offset) != 0
}

def toUTF8String(input: Array[Byte], offset: Int, length: Int): UTF8String = {
UTF8String.fromBytes(input.slice(offset, offset + length))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.hbase.spark

import java.sql.{Date, Timestamp}

import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
Expand Down Expand Up @@ -89,8 +91,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {

val t1TableName = "t1"
val t2TableName = "t2"
val t3TableName = "t3"
val columnFamily = "c"

val timestamp = 1234567890000L

var sqlContext:SQLContext = null
var df:DataFrame = null

Expand All @@ -109,12 +114,22 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
catch {
case e: Exception => logInfo(" - no table " + t2TableName + " found")
}
try
TEST_UTIL.deleteTable(TableName.valueOf(t3TableName))
catch {
case e: Exception => logInfo(" - no table " + t3TableName + " found")
}

logInfo(" - creating table " + t1TableName)
TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
logInfo(" - creating table " + t2TableName)
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
logInfo(" - creating table " + t3TableName)
TEST_UTIL.createTable(TableName.valueOf(t3TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")

val sparkConf = new SparkConf
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
Expand All @@ -124,7 +139,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {

val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
try {
val t1Table = connection.getTable(TableName.valueOf("t1"))
val t1Table = connection.getTable(TableName.valueOf(t1TableName))

try {
var put = new Put(Bytes.toBytes("get1"))
Expand Down Expand Up @@ -158,7 +173,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
t1Table.close()
}

val t2Table = connection.getTable(TableName.valueOf("t2"))
val t2Table = connection.getTable(TableName.valueOf(t2TableName))

try {
var put = new Put(Bytes.toBytes(1))
Expand Down Expand Up @@ -191,6 +206,26 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
} finally {
t2Table.close()
}

val t3Table = connection.getTable(TableName.valueOf(t3TableName))

try {
val put = new Put(Bytes.toBytes("row"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("binary"), Array(1.toByte, 2.toByte, 3.toByte))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("boolean"), Bytes.toBytes(true))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("byte"), Array(127.toByte))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("short"), Bytes.toBytes(32767.toShort))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("int"), Bytes.toBytes(1000000))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("long"), Bytes.toBytes(10000000000L))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("float"), Bytes.toBytes(0.5f))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("double"), Bytes.toBytes(0.125))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("date"), Bytes.toBytes(timestamp))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("timestamp"), Bytes.toBytes(timestamp))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("string"), Bytes.toBytes("string"))
t3Table.put(put)
} finally {
t3Table.close()
}
} finally {
connection.close()
}
Expand Down Expand Up @@ -807,6 +842,59 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
assert(executionRules.dynamicLogicExpression == null)
}

test("Test mapping") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t3"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"BINARY_FIELD":{"cf":"c", "col":"binary", "type":"binary"},
|"BOOLEAN_FIELD":{"cf":"c", "col":"boolean", "type":"boolean"},
|"BYTE_FIELD":{"cf":"c", "col":"byte", "type":"byte"},
|"SHORT_FIELD":{"cf":"c", "col":"short", "type":"short"},
|"INT_FIELD":{"cf":"c", "col":"int", "type":"int"},
|"LONG_FIELD":{"cf":"c", "col":"long", "type":"long"},
|"FLOAT_FIELD":{"cf":"c", "col":"float", "type":"float"},
|"DOUBLE_FIELD":{"cf":"c", "col":"double", "type":"double"},
|"DATE_FIELD":{"cf":"c", "col":"date", "type":"date"},
|"TIMESTAMP_FIELD":{"cf":"c", "col":"timestamp", "type":"timestamp"},
|"STRING_FIELD":{"cf":"c", "col":"string", "type":"string"}
|}
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog))

df.registerTempTable("hbaseTestMapping")

val results = sqlContext.sql("SELECT binary_field, boolean_field, " +
"byte_field, short_field, int_field, long_field, " +
"float_field, double_field, date_field, timestamp_field, " +
"string_field FROM hbaseTestMapping").collect()

assert(results.length == 1)

val result = results(0)

System.out.println("row: " + result)
System.out.println("0: " + result.get(0))
System.out.println("1: " + result.get(1))
System.out.println("2: " + result.get(2))
System.out.println("3: " + result.get(3))

assert(result.get(0).asInstanceOf[Array[Byte]].sameElements(Array(1.toByte, 2.toByte, 3.toByte)))
assert(result.get(1) == true)
assert(result.get(2) == 127)
assert(result.get(3) == 32767)
assert(result.get(4) == 1000000)
assert(result.get(5) == 10000000000L)
assert(result.get(6) == 0.5)
assert(result.get(7) == 0.125)
// sql date stores only year, month and day, so checking it is within a day
assert(Math.abs(result.get(8).asInstanceOf[Date].getTime - timestamp) <= 86400000)
assert(result.get(9).asInstanceOf[Timestamp].getTime == timestamp)
assert(result.get(10) == "string")
}

def writeCatalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA
|"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"},
|"col6":{"cf":"cf1", "col":"col5", "type":"$map"},
|"col7":{"cf":"cf1", "col":"col6", "type":"$array"},
|"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}
|"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"},
|"col9":{"cf":"cf1", "col":"col8", "type":"date"},
|"col10":{"cf":"cf1", "col":"col9", "type":"timestamp"}
|}
|}""".stripMargin
val parameters = Map(HBaseTableCatalog.tableCatalog->catalog)
Expand All @@ -63,6 +65,8 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA
assert(t.getField("col2").length == Bytes.SIZEOF_DOUBLE)
assert(t.getField("col1").length == -1)
assert(t.getField("col8").length == -1)
assert(t.getField("col9").dt == DateType)
assert(t.getField("col10").dt == TimestampType)
}

checkDataType(
Expand Down Expand Up @@ -95,7 +99,7 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA
assert(DataTypeParserWrapper.parse("BINARY") === t.getField("C_FIELD").dt)
}

test("compatiblity") {
test("compatibility") {
val m = Map("hbase.columns.mapping" ->
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD DOUBLE c:b, C_FIELD BINARY c:c,",
"hbase.table" -> "t1")
Expand Down

0 comments on commit d631afb

Please sign in to comment.