Skip to content

Commit

Permalink
[Feature-DTStack#1897][gbase8s] Added gbase8s connector and gbasehk c…
Browse files Browse the repository at this point in the history
…onnector
  • Loading branch information
libailin authored and zoudaokoulife committed Jun 17, 2024
1 parent 01dac19 commit 6fdd77a
Show file tree
Hide file tree
Showing 15 changed files with 912 additions and 0 deletions.
63 changes: 63 additions & 0 deletions chunjun-connectors/chunjun-connector-gbase8s/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>chunjun-connectors</artifactId>
<groupId>com.dtstack.chunjun</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>chunjun-connector-gbase8s</artifactId>
<name>ChunJun : Connectors : GBase8s</name>

<properties>
<connector.dir>gbase8s</connector.dir>
</properties>

<dependencies>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-jdbc-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.gbasedbt.jdbc.Driver</groupId>
<artifactId>gbasedbt</artifactId>
<version>3.5.1_1_d0c87a</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.gbase8s.converter;

import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.throwable.UnsupportedTypeException;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;

public class Gbase8sRawTypeConverter {

public static DataType apply(TypeConfig type) {
switch (type.getType()) {
case "BIT":
return DataTypes.BOOLEAN();
case "TINYINT":
return DataTypes.TINYINT();
case "SMALLINT":
case "MEDIUMINT":
case "INT":
case "INTEGER":
case "INT24":
case "SERIAL":
return DataTypes.INT();
case "BIGINT":
case "INT8":
case "BIGSERIAL":
case "SERIAL8":
return DataTypes.BIGINT();
case "REAL":
case "FLOAT":
case "SMALLFLOAT":
return DataTypes.FLOAT();
case "DECIMAL":
case "DEC":
case "NUMERIC":
case "MONEY":
// TODO 精度应该可以动态传进来?
return DataTypes.DECIMAL(38, 18);
case "DOUBLE":
case "PRECISION":
return DataTypes.DOUBLE();
case "CHAR":
case "VARCHAR":
case "TINYTEXT":
case "TEXT":
case "MEDIUMTEXT":
case "LVARCHAR":
case "LONGTEXT":
case "JSON":
case "ENUM":
case "CHARACTER":
case "VARYING":
case "NCHAR":
case "SET":
return DataTypes.STRING();
case "DATE":
return DataTypes.DATE();
case "YEAR":
return DataTypes.INTERVAL(DataTypes.YEAR());
case "TIME":
return DataTypes.TIME();
case "TIMESTAMP":
return DataTypes.TIMESTAMP();
case "DATETIME":
return DataTypes.TIMESTAMP(5);
case "TINYBLOB":
case "BLOB":
case "MEDIUMBLOB":
case "LONGBLOB":
case "BINARY":
case "VARBINARY":
case "GEOMETRY":
// BYTES 底层调用的是VARBINARY最大长度
return DataTypes.BYTES();

default:
throw new UnsupportedTypeException(type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.gbase8s.dialect;

import com.dtstack.chunjun.connector.gbase8s.converter.Gbase8sRawTypeConverter;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.converter.RawTypeMapper;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class Gbase8sDialect implements JdbcDialect {

private static final String GBASE_QUOTATION_MASK = "";

@Override
public String dialectName() {
return "GBase8s";
}

@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:gbasedbt-sqli:");
}

@Override
public RawTypeMapper getRawTypeConverter() {
return Gbase8sRawTypeConverter::apply;
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("com.gbasedbt.jdbc.Driver");
}

/** build select sql , such as (SELECT :A "A",? "B" FROM DUAL) */
public String buildDualQueryStatement(String[] column) {
StringBuilder sb = new StringBuilder("SELECT count(1),");
String placeholders =
Arrays.stream(column)
.map(f -> ":" + f + " as " + quoteIdentifier(f))
.collect(Collectors.joining(", "));
sb.append(placeholders);

return sb.toString();
}

@Override
public Optional<String> getUpsertStatement(
String schema,
String tableName,
String[] fieldNames,
String[] uniqueKeyFields,
boolean allReplace) {
tableName = buildTableInfoWithSchema(schema, tableName);
StringBuilder mergeIntoSql = new StringBuilder(64);
mergeIntoSql
.append("MERGE INTO ")
.append(tableName)
.append(" T1 USING (")
.append(buildDualQueryStatement(fieldNames))
.append(" FROM ")
.append(tableName)
.append(" limit 1 ")
.append(") T2 ON (")
.append(buildEqualConditions(uniqueKeyFields))
.append(") ");

String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace);

if (StringUtils.isNotEmpty(updateSql)) {
mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET ");
mergeIntoSql.append(updateSql);
}

mergeIntoSql
.append(" WHEN NOT MATCHED THEN ")
.append("INSERT (")
.append(
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", ")))
.append(") VALUES (")
.append(
Arrays.stream(fieldNames)
.map(col -> "T2." + quoteIdentifier(col))
.collect(Collectors.joining(", ")))
.append(")");

return Optional.of(mergeIntoSql.toString());
}

/** build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") */
private String buildUpdateConnection(
String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
return Arrays.stream(fieldNames)
.filter(col -> !uniqueKeyList.contains(col))
.map(col -> buildConnectString(allReplace, col))
.collect(Collectors.joining(","));
}

/**
* Depending on parameter [allReplace] build different sql part. e.g T1."A"=T2."A" or
* T1."A"=nvl(T2."A",T1."A")
*/
private String buildConnectString(boolean allReplace, String col) {
return allReplace
? quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ " = "
+ quoteIdentifier("T2")
+ "."
+ quoteIdentifier(col)
: quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ " =NVL("
+ quoteIdentifier("T2")
+ "."
+ quoteIdentifier(col)
+ ","
+ quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ ")";
}

/** build sql part e.g: T1.`A` = T2.`A`, T1.`B` = T2.`B` */
private String buildEqualConditions(String[] uniqueKeyFields) {
return Arrays.stream(uniqueKeyFields)
.map(col -> "T1." + quoteIdentifier(col) + " = T2." + quoteIdentifier(col))
.collect(Collectors.joining(", "));
}

@Override
public String quoteIdentifier(String identifier) {
if (identifier.startsWith(GBASE_QUOTATION_MASK)
&& identifier.endsWith(GBASE_QUOTATION_MASK)) {
return identifier;
}
return GBASE_QUOTATION_MASK + identifier + GBASE_QUOTATION_MASK;
}

@Override
public String getRowNumColumn(String orderBy) {
return "ROWID as " + getRowNumColumnAlias();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.gbase8s.sink;

import com.dtstack.chunjun.config.SyncConfig;
import com.dtstack.chunjun.connector.gbase8s.dialect.Gbase8sDialect;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;

public class Gbase8sSinkFactory extends JdbcSinkFactory {

public Gbase8sSinkFactory(SyncConfig syncConfig) {
super(syncConfig, new Gbase8sDialect());
}
}
Loading

0 comments on commit 6fdd77a

Please sign in to comment.