Skip to content

Commit

Permalink
Merge branch 'master' into add-snapshot-read
Browse files Browse the repository at this point in the history
  • Loading branch information
mini666 authored Jul 29, 2019
2 parents b2e8d85 + d631afb commit 55fcefe
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,15 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* a alternative implementation of a connection object that forwards the mutations to a kafka queue
* depending on the routing rules (see kafka-route-rules.xml).
* */
@InterfaceAudience.Private
public class KafkaBridgeConnection implements Connection {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBridgeConnection.class);

private final Configuration conf;
private final User user;
private final ExecutorService pool;
private volatile boolean closed = false;
private TopicRoutingRules routingRules;
private Producer<byte[],byte[]> producer;
Expand All @@ -74,32 +67,23 @@ public KafkaBridgeConnection(Configuration conf,
ExecutorService pool,
User user) throws IOException {
this.conf = conf;
this.user = user;
this.pool = pool;
setupRules();
startKafkaConnection();
}

/**
* for testing.
* @param conf hbase configuration
* @param pool executor service
* @param user user with connection
* @param conf hbase configuration
* @param routingRules a set of routing rules
* @param producer a kafka producer
* @throws IOException on error
*/
public KafkaBridgeConnection(Configuration conf,
ExecutorService pool,
User user,
TopicRoutingRules routingRules,
Producer<byte[],byte[]> producer)
throws IOException {
@VisibleForTesting
public KafkaBridgeConnection(Configuration conf, TopicRoutingRules routingRules,
Producer<byte[],byte[]> producer) {
this.conf = conf;
this.user = user;
this.pool = pool;
this.producer=producer;
this.routingRules=routingRules;
this.producer = producer;
this.routingRules = routingRules;
}

private void setupRules() throws IOException {
Expand Down Expand Up @@ -161,6 +145,11 @@ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return null;
}

/* Without @Override, we can also compile it against HBase 2.1. */
/* @Override */
public void clearRegionLocationCache() {
}

@Override
public Admin getAdmin() throws IOException {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -89,8 +88,7 @@ public void testSendMessage() {
rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8")));
Configuration conf = new Configuration();
KafkaBridgeConnection connection =
new KafkaBridgeConnection(
conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer);
new KafkaBridgeConnection(conf,rules,myTestingProducer);
long zeTimestamp = System.currentTimeMillis();
Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp);
put.addColumn("FAMILY".getBytes("UTF-8"),
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
<audience-annotations.version>0.5.0</audience-annotations.version>
<junit.version>4.12</junit.version>
<hbase-thirdparty.version>2.1.0</hbase-thirdparty.version>
<hadoop-two.version>2.7.7</hadoop-two.version>
<hadoop-two.version>2.8.5</hadoop-two.version>
<hadoop-three.version>3.0.3</hadoop-three.version>
<hadoop.version>${hadoop-two.version}</hadoop.version>
<slf4j.version>1.7.25</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,9 @@ case class HBaseRelation (
if (field != null) {
if (field.isRowKey) {
parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
DefaultSourceStaticUtils.getByteValue(field,
value.toString), null))
Utils.toBytes(value, field), null))
}
val byteValue =
DefaultSourceStaticUtils.getByteValue(field, value.toString)
val byteValue = Utils.toBytes(value, field)
valueArray += byteValue
}
new EqualLogicExpression(attr, valueArray.length - 1, false)
Expand Down Expand Up @@ -929,12 +927,6 @@ class ColumnFilterCollection {
@InterfaceAudience.Private
object DefaultSourceStaticUtils {

val rawInteger = new RawInteger
val rawLong = new RawLong
val rawFloat = new RawFloat
val rawDouble = new RawDouble
val rawString = RawString.ASCENDING

val byteRange = new ThreadLocal[PositionedByteRange] {
override def initialValue(): PositionedByteRange = {
val range = new SimplePositionedMutableByteRange()
Expand Down Expand Up @@ -973,93 +965,6 @@ object DefaultSourceStaticUtils {
lastFiveExecutionRules.poll()
}
}

/**
* This method will convert the result content from HBase into the
* SQL value type that is requested by the Spark SQL schema definition
*
* @param field The structure of the SparkSQL Column
* @param r The result object from HBase
* @return The converted object type
*/
def getValue(field: Field,
r: Result): Any = {
if (field.isRowKey) {
val row = r.getRow

field.dt match {
case IntegerType => rawInteger.decode(getFreshByteRange(row))
case LongType => rawLong.decode(getFreshByteRange(row))
case FloatType => rawFloat.decode(getFreshByteRange(row))
case DoubleType => rawDouble.decode(getFreshByteRange(row))
case StringType => rawString.decode(getFreshByteRange(row))
case TimestampType => rawLong.decode(getFreshByteRange(row))
case _ => Bytes.toString(row)
}
} else {
val cellByteValue =
r.getColumnLatestCell(field.cfBytes, field.colBytes)
if (cellByteValue == null) null
else field.dt match {
case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case StringType => Bytes.toString(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength)
case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength))
case _ => Bytes.toString(cellByteValue.getValueArray,
cellByteValue.getValueOffset, cellByteValue.getValueLength)
}
}
}

/**
* This will convert the value from SparkSQL to be stored into HBase using the
* right byte Type
*
* @param value String value from SparkSQL
* @return Returns the byte array to go into HBase
*/
def getByteValue(field: Field,
value: String): Array[Byte] = {
field.dt match {
case IntegerType =>
val result = new Array[Byte](Bytes.SIZEOF_INT)
val localDataRange = getFreshByteRange(result)
rawInteger.encode(localDataRange, value.toInt)
localDataRange.getBytes
case LongType =>
val result = new Array[Byte](Bytes.SIZEOF_LONG)
val localDataRange = getFreshByteRange(result)
rawLong.encode(localDataRange, value.toLong)
localDataRange.getBytes
case FloatType =>
val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
val localDataRange = getFreshByteRange(result)
rawFloat.encode(localDataRange, value.toFloat)
localDataRange.getBytes
case DoubleType =>
val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
val localDataRange = getFreshByteRange(result)
rawDouble.encode(localDataRange, value.toDouble)
localDataRange.getBytes
case StringType =>
Bytes.toBytes(value)
case TimestampType =>
val result = new Array[Byte](Bytes.SIZEOF_LONG)
val localDataRange = getFreshByteRange(result)
rawLong.encode(localDataRange, value.toLong)
localDataRange.getBytes

case _ => Bytes.toBytes(value)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@

package org.apache.hadoop.hbase.spark.datasources

import java.sql.{Date, Timestamp}

import org.apache.hadoop.hbase.spark.AvroSerdes
import org.apache.hadoop.hbase.util.Bytes
//import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
object Utils {


/**
* Parses the hbase field to it's corresponding
* scala type which can then be put into a Spark GenericRow
Expand All @@ -48,14 +47,16 @@ object Utils {
} else {
// Fall back to atomic type
f.dt match {
case BooleanType => toBoolean(src, offset)
case BooleanType => src(offset) != 0
case ByteType => src(offset)
case DoubleType => Bytes.toDouble(src, offset)
case FloatType => Bytes.toFloat(src, offset)
case IntegerType => Bytes.toInt(src, offset)
case LongType|TimestampType => Bytes.toLong(src, offset)
case ShortType => Bytes.toShort(src, offset)
case StringType => toUTF8String(src, offset, length)
case IntegerType => Bytes.toInt(src, offset)
case LongType => Bytes.toLong(src, offset)
case FloatType => Bytes.toFloat(src, offset)
case DoubleType => Bytes.toDouble(src, offset)
case DateType => new Date(Bytes.toLong(src, offset))
case TimestampType => new Timestamp(Bytes.toLong(src, offset))
case StringType => UTF8String.fromBytes(src, offset, length)
case BinaryType =>
val newArray = new Array[Byte](length)
System.arraycopy(src, offset, newArray, 0, length)
Expand All @@ -73,28 +74,19 @@ object Utils {
val record = field.catalystToAvro(input)
AvroSerdes.serialize(record, field.schema.get)
} else {
input match {
case data: Boolean => Bytes.toBytes(data)
case data: Byte => Array(data)
case data: Array[Byte] => data
case data: Double => Bytes.toBytes(data)
case data: Float => Bytes.toBytes(data)
case data: Int => Bytes.toBytes(data)
case data: Long => Bytes.toBytes(data)
case data: Short => Bytes.toBytes(data)
case data: UTF8String => data.getBytes
case data: String => Bytes.toBytes(data)
// TODO: add more data type support
field.dt match {
case BooleanType => Bytes.toBytes(input.asInstanceOf[Boolean])
case ByteType => Array(input.asInstanceOf[Number].byteValue)
case ShortType => Bytes.toBytes(input.asInstanceOf[Number].shortValue)
case IntegerType => Bytes.toBytes(input.asInstanceOf[Number].intValue)
case LongType => Bytes.toBytes(input.asInstanceOf[Number].longValue)
case FloatType => Bytes.toBytes(input.asInstanceOf[Number].floatValue)
case DoubleType => Bytes.toBytes(input.asInstanceOf[Number].doubleValue)
case DateType | TimestampType => Bytes.toBytes(input.asInstanceOf[java.util.Date].getTime)
case StringType => Bytes.toBytes(input.toString)
case BinaryType => input.asInstanceOf[Array[Byte]]
case _ => throw new Exception(s"unsupported data type ${field.dt}")
}
}
}

def toBoolean(input: Array[Byte], offset: Int): Boolean = {
input(offset) != 0
}

def toUTF8String(input: Array[Byte], offset: Int, length: Int): UTF8String = {
UTF8String.fromBytes(input.slice(offset, offset + length))
}
}
Loading

0 comments on commit 55fcefe

Please sign in to comment.