Skip to content

Commit

Permalink
HBASE-22599 Let hbase-connectors compile against HBase 2.2.0
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
meszibalu authored Jul 22, 2019
1 parent 8f88efd commit 91231ca
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,15 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* a alternative implementation of a connection object that forwards the mutations to a kafka queue
* depending on the routing rules (see kafka-route-rules.xml).
* */
@InterfaceAudience.Private
public class KafkaBridgeConnection implements Connection {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBridgeConnection.class);

private final Configuration conf;
private final User user;
private final ExecutorService pool;
private volatile boolean closed = false;
private TopicRoutingRules routingRules;
private Producer<byte[],byte[]> producer;
Expand All @@ -74,32 +67,23 @@ public KafkaBridgeConnection(Configuration conf,
ExecutorService pool,
User user) throws IOException {
this.conf = conf;
this.user = user;
this.pool = pool;
setupRules();
startKafkaConnection();
}

/**
* for testing.
* @param conf hbase configuration
* @param pool executor service
* @param user user with connection
* @param conf hbase configuration
* @param routingRules a set of routing rules
* @param producer a kafka producer
* @throws IOException on error
*/
public KafkaBridgeConnection(Configuration conf,
ExecutorService pool,
User user,
TopicRoutingRules routingRules,
Producer<byte[],byte[]> producer)
throws IOException {
@VisibleForTesting
public KafkaBridgeConnection(Configuration conf, TopicRoutingRules routingRules,
Producer<byte[],byte[]> producer) {
this.conf = conf;
this.user = user;
this.pool = pool;
this.producer=producer;
this.routingRules=routingRules;
this.producer = producer;
this.routingRules = routingRules;
}

private void setupRules() throws IOException {
Expand Down Expand Up @@ -161,6 +145,11 @@ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return null;
}

/* Without @Override, we can also compile it against HBase 2.1. */
/* @Override */
public void clearRegionLocationCache() {
}

@Override
public Admin getAdmin() throws IOException {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -89,8 +88,7 @@ public void testSendMessage() {
rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8")));
Configuration conf = new Configuration();
KafkaBridgeConnection connection =
new KafkaBridgeConnection(
conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer);
new KafkaBridgeConnection(conf,rules,myTestingProducer);
long zeTimestamp = System.currentTimeMillis();
Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp);
put.addColumn("FAMILY".getBytes("UTF-8"),
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@
<compileSource>1.8</compileSource>
<java.min.version>${compileSource}</java.min.version>
<maven.min.version>3.5.0</maven.min.version>
<hbase.version>2.1.0</hbase.version>
<hbase.version>2.2.0</hbase.version>
<maven.compiler.version>3.6.1</maven.compiler.version>
<exec.maven.version>1.6.0</exec.maven.version>
<audience-annotations.version>0.5.0</audience-annotations.version>
<junit.version>4.12</junit.version>
<hbase-thirdparty.version>2.1.0</hbase-thirdparty.version>
<hadoop-two.version>2.7.7</hadoop-two.version>
<hadoop-two.version>2.8.5</hadoop-two.version>
<hadoop-three.version>3.0.3</hadoop-three.version>
<hadoop.version>${hadoop-two.version}</hadoop.version>
<slf4j.version>1.7.25</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class ConnectionMocker extends Connection {

def isAborted: Boolean = true
def abort(why: String, e: Throwable) = {}

/* Without override, we can also compile it against HBase 2.1. */
/* override */ def clearRegionLocationCache(): Unit = {}
}

class HBaseConnectionCacheSuite extends FunSuite with Logging {
Expand Down

0 comments on commit 91231ca

Please sign in to comment.