Skip to content

Commit

Permalink
upgrade obkv clients and adapt to new hbase api
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Dec 4, 2024
1 parent d12cb9b commit bcdd04c
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 62 deletions.
5 changes: 0 additions & 5 deletions flink-connector-obkv-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ under the License.
<artifactId>obkv-hbase-client</artifactId>
</dependency>

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>

<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.alipay.oceanbase.hbase.OHTableClient;
import com.alipay.oceanbase.hbase.constants.OHConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,14 +37,14 @@ public class OBKVHBaseConnectionProvider implements ConnectionProvider {

private final OBKVHBaseConnectorOptions options;

private final TableCache<HTableInterface> tableCache;
private final TableCache<Table> tableCache;

public OBKVHBaseConnectionProvider(OBKVHBaseConnectorOptions options) {
this.options = options;
this.tableCache = new TableCache<>();
}

public HTableInterface getHTableClient(TableId tableId) {
public Table getHTableClient(TableId tableId) {
return tableCache.get(
tableId.identifier(),
() -> {
Expand Down Expand Up @@ -88,7 +88,7 @@ private Configuration getConfig(String databaseName) {

@Override
public void close() throws Exception {
for (HTableInterface table : tableCache.getAll()) {
for (Table table : tableCache.getAll()) {
table.close();
}
tableCache.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void flush(List<DataChangeRecord> records) throws Exception {
Put put = new Put(rowKey);
columnValues.forEach(
entry ->
put.add(
put.addColumn(
family,
Bytes.toBytes(entry.getKey()),
(byte[]) entry.getValue()));
Expand All @@ -91,10 +91,10 @@ public void flush(List<DataChangeRecord> records) throws Exception {
Delete delete = new Delete(rowKey);
for (Map.Entry<String, Object> entry :
((Map<String, Object>) familyValue).entrySet()) {
delete.deleteColumn(family, Bytes.toBytes(entry.getKey()));
delete.addColumn(family, Bytes.toBytes(entry.getKey()));
}
columnValues.forEach(
entry -> delete.deleteColumn(family, Bytes.toBytes(entry.getKey())));
entry -> delete.addColumn(family, Bytes.toBytes(entry.getKey())));
familyDeleteListMap.computeIfAbsent(family, k -> new ArrayList<>()).add(delete);
}
}
Expand All @@ -107,7 +107,7 @@ public void flush(List<DataChangeRecord> records) throws Exception {
}

private void flush(
HTableInterface table,
Table table,
Map<byte[], List<Put>> familyPutListMap,
Map<byte[], List<Delete>> familyDeleteListMap)
throws Exception {
Expand Down
5 changes: 0 additions & 5 deletions flink-connector-oceanbase-directload/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ under the License.
<artifactId>obkv-table-client</artifactId>
</dependency>

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>

<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ public static void tearDown() {
Stream.of(CONFIG_SERVER, CONTAINER).forEach(GenericContainer::stop);
}

@Override
protected String getFlinkDockerImageTag() {
// the hbase packages are not compatible with jdk 11
return super.getFlinkDockerImageTag() + "-java8";
}

@Before
public void before() throws Exception {
super.before();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public void submitSQLJob(List<String> sqlLines, Path... jars)
copyAndGetContainerPath(jobManager, jar.toAbsolutePath().toString());
commands.add(containerPath);
}
commands.add("-Dsofa.middleware.log.disable=true");

Container.ExecResult execResult =
jobManager.execInContainer("bash", "-c", String.join(" ", commands));
Expand Down
15 changes: 6 additions & 9 deletions flink-sql-connector-obkv-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,13 @@ under the License.
<includes>
<include>com.oceanbase:*</include>
<include>com.alibaba:*</include>
<include>com.alipay.sofa:bolt</include>
<include>com.alipay.sofa.common:sofa-common-tools</include>
<include>com.google.guava:guava</include>
<include>commons-lang:commons-lang</include>
<include>mysql:mysql-connector-java</include>
<include>com.alipay.*:*</include>
<include>com.google.*:*</include>
<include>commons-*:*</include>
<include>io.netty:*</include>
<include>commons-logging:commons-logging</include>
<include>org.apache.hadoop:hadoop-core</include>
<include>org.apache.hbase:*</include>
<include>com.lmax:disruptor</include>
<include>org.apache.hadoop*:*</include>
<include>org.apache.hbase*:*</include>
<include>mysql:mysql-connector-java</include>
</includes>
</artifactSet>
<relocations>
Expand Down
8 changes: 3 additions & 5 deletions flink-sql-connector-oceanbase-directload/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ under the License.
<includes>
<include>com.oceanbase:*</include>
<include>com.alibaba:*</include>
<include>com.alipay.sofa:bolt</include>
<include>com.alipay.sofa.common:sofa-common-tools</include>
<include>com.google.guava:guava</include>
<include>commons-lang:commons-lang</include>
<include>com.alipay.*:*</include>
<include>com.google.*:*</include>
<include>commons-*:*</include>
<include>io.netty:*</include>
<include>com.lmax:disruptor</include>
</includes>
</artifactSet>
<relocations>
Expand Down
34 changes: 11 additions & 23 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,18 @@ under the License.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-hbase-client</artifactId>
<version>0.1.5</version>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>1.2.14-SNAPSHOT</version>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -128,11 +120,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -210,6 +197,15 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<sofa.middleware.log.disable>true</sofa.middleware.log.disable>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
Expand Down Expand Up @@ -299,14 +295,6 @@ under the License.
</upToDateChecking>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
Expand Down

0 comments on commit bcdd04c

Please sign in to comment.