diff --git a/docs/sink/flink-connector-oceanbase-tools-cdc.md b/docs/sink/flink-connector-oceanbase-tools-cdc.md
new file mode 100644
index 00000000..34863104
--- /dev/null
+++ b/docs/sink/flink-connector-oceanbase-tools-cdc.md
@@ -0,0 +1,188 @@
+# Flink Connector OceanBase By Tools CDC
+
+English | [简体中文](flink-connector-oceanbase-tools-cdc_cn.md)
+
+This project is a flink command line tool that supports the synchronization of CDC tasks to oceanbase through the Flink command line, which greatly simplifies the command writing of data synchronization to oceanbase through flink.
+
+## Getting Started
+
+You can get the release packages at [Releases Page](https://github.com/oceanbase/flink-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbase-directload).
+
+```xml
+
+ com.oceanbase
+ flink-connector-oceanbase-tools-cdc
+ ${project.version}
+
+```
+
+If you want to use the latest snapshot version, you can specify by configuring the Maven snapshot repository:
+
+```xml
+
+ com.oceanbase
+ flink-connector-oceanbase-tools-cdc
+ ${project.version}
+
+
+
+
+ sonatype-snapshots
+ Sonatype Snapshot Repository
+ https://s01.oss.sonatype.org/content/repositories/snapshots/
+
+ true
+
+
+
+```
+
+You can also manually build it from the source code.
+
+```shell
+git clone https://github.com/oceanbase/flink-connector-oceanbase.git
+cd flink-connector-oceanbase
+mvn clean package -DskipTests
+```
+
+### Notes:
+
+* Currently, the project supports using Flink CDC to access multiple tables or the entire database. During synchronization, you need to add the corresponding Flink CDC dependency in the `$FLINK_HOME/lib` directory, such as flink-sql-connector-mysql-cdc-\${version}. jar, flink-sql-connector-oracle-cdc-\${version}.jar, flink-sql-connector-sqlserver-cdc-\${version}.jar.
+ * The dependent Flink CDC version needs to be above 3.1. If you need to use Flink CDC to synchronize MySQL and Oracle, you also need to add the relevant JDBC driver under `$FLINK_HOME/lib`.
+ * If you synchronize data to OceanBase, you must use oceanBase or mysql as the protocol name for the URL connection string of OceanBase.
+
+### MySQL Synchronous OceanBase Example
+
+#### Geting Ready
+
+Create a table test_history_strt_sink in a MySql database test_db library, test_history_text.
+
+```mysql
+use test_db;
+CREATE TABLE test_history_str (
+ itemid bigint NOT NULL,
+ clock integer DEFAULT '0' NOT NULL,
+ value varchar(255) DEFAULT '' NOT NULL,
+ ns integer DEFAULT '0' NOT NULL,
+ PRIMARY KEY (itemid,clock,ns)
+);
+CREATE TABLE test_history_text (
+ itemid bigint NOT NULL,
+ clock integer DEFAULT '0' NOT NULL,
+ value text NOT NULL,
+ ns integer DEFAULT '0' NOT NULL,
+ PRIMARY KEY (itemid,clock,ns)
+);
+```
+
+#### Build A Flink Task
+
+##### An example of the Flink command line
+
+```shell
+$FLINK_HOME/bin/flink run \
+ -Dexecution.checkpointing.interval=10s \
+ -Dparallelism.default=1 \
+ -c com.oceanbase.connector.flink.tools.cdc.CdcTools \
+ lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
+ mysql-sync-database \
+ --database test_db \
+ --mysql-conf hostname=xxxx \
+ --mysql-conf port=3306 \
+ --mysql-conf username=root \
+ --mysql-conf password=xxxx \
+ --mysql-conf database-name=test_db \
+ --including-tables "tbl1|test.*" \
+ --sink-conf username=xxxx \
+ --sink-conf password=xxxx \
+ --sink-conf url=jdbc:mysql://xxxx:xxxx
+```
+
+Replace the above database information with your real database information, and when a message similar to the following appears, the task is successfully built and submitted.
+
+```shell
+Job has been submitted with JobID 0177b201a407045a17445aa288f0f111
+```
+
+The tool automatically parses the information on the command line and creates a table, which can be queried and verified in OceanBase.
+
+MySQL to insert test data
+
+```sql
+INSERT INTO test_db.test_history_str (itemid,clock,value,ns) VALUES
+ (1,2,'ces1',1123);
+INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
+ (1,21131,'ces1',21321),
+ (2,21321,'ces2',12321);
+```
+
+Since it is a CDC task, after data is inserted in MySQL, you can query and verify the synchronized data in OceanBase.
+
+### Parameter parsing
+
+This configuration is the program configuration information of flink
+
+```shell
+-Dexecution.checkpointing.interval=10s
+-Dparallelism.default=1
+```
+
+Specify the JAR package of the program and the entry of the program
+
+```shell
+-c com.oceanbase.connector.flink.tools.cdc.CdcTools \
+lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
+```
+
+The name of the database
+
+```shell
+--database test_db
+```
+
+This name is customized, meaning the name given to this database, and ultimately serves as the naming rule for flink tasks.
+
+## Configuration Items
+
+#### Supported data sources
+
+| Data source identifier | Data source |
+|-------------------------|----------------------|
+| mysql-sync-database | mysql datasource |
+| oracle-sync-database | oracle datasource |
+| postgres-sync-database | postgres datasource |
+| sqlserver-sync-database | sqlserver datasource |
+| db2-sync-database | db2 datasource |
+
+#### Configuration Items
+
+| Configuration Items | Comment |
+|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| --job-name | Flink task name, optional. |
+| --database | Database name synchronized to OceanBase. |
+| --table-prefix | OceanBase table prefix name, such as --table-prefix ods_. |
+| --table-suffix | Same as above, the suffix name of the OceanBase table. |
+| --including-tables | For MySQL tables that need to be synchronized, you can use|to separate multiple tables and support regular expressions. For example --including-tables table1. |
+| --excluding-tables | For tables that do not need to be synchronized, the usage is the same as above. |
+| --mysql-conf | MySQL CDC Source configuration,where hostname/username/password/database-name is required. When the synchronized library table contains a non-primary key table, `scan.incremental.snapshot.chunk.key-column` must be set, and only one field of non-null type can be selected.
For example: `scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`, different database table columns are separated by `,`. |
+| --oracle-conf | Oracle CDC Source configuration,where hostname/username/password/database-name/schema-name is required. |
+| --postgres-conf | Postgres CDC Source configuration,where hostname/username/password/database-name/schema-name/slot.name is required. |
+| --sqlserver-conf | SQLServer CDC Source configuration, where hostname/username/password/database-name/schema-name is required. |
+| --db2-conf | Db2 CDC Source configuration, where hostname/username/password/database-name/schema-name is required. |
+| --sink-conf | See below --sink-conf configuration items. |
+| --ignore-default-value | Turn off synchronization of MySQL table structures by default. It is suitable for the situation when synchronizing MySQL data to oceanbase, the field has a default value, but the actual inserted data is null. |
+| --create-table-only | Whether to only synchronize the structure of the table. |
+
+#### Configuration items of sink-conf
+
+| Configuration Items | Default Value | Required | Comment |
+|---------------------|---------------|----------|-------------------------------------------------------------------|
+| url | -- | N | jdbc connection information, such as: jdbc:mysql://127.0.0.1:2881 |
+| username | -- | Y | Username to access oceanbase |
+| password | -- | Y | Password to access oceanbase |
+
+## Reference information
+
+- [https://github.com/oceanbase/obkv-table-client-java](https://github.com/oceanbase/obkv-table-client-java)
+- [https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector)
+
diff --git a/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md b/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md
new file mode 100644
index 00000000..fecd5512
--- /dev/null
+++ b/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md
@@ -0,0 +1,188 @@
+# Flink Connector OceanBase By Tools CDC
+
+[English](flink-connector-oceanbase-tools-cdc.md) | 简体中文
+
+本项目是一个可以支持通过Flink命令行构建cdc任务同步到oceanbase的flink命令行工具,极大的简化了通过flink同步数据到oceanbase的命令书写。
+
+## 开始上手
+
+您可以在 [Releases 页面](https://github.com/oceanbase/flink-connector-oceanbase/releases) 或者 [Maven 中央仓库](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbas-directload) 找到正式的发布版本。
+
+```xml
+
+ com.oceanbase
+ flink-connector-oceanbase-tools-cdc
+ ${project.version}
+
+```
+
+如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定:
+
+```xml
+
+ com.oceanbase
+ flink-connector-oceanbase-tools-cdc
+ ${project.version}
+
+
+
+
+ sonatype-snapshots
+ Sonatype Snapshot Repository
+ https://s01.oss.sonatype.org/content/repositories/snapshots/
+
+ true
+
+
+
+```
+
+您也可以通过源码构建的方式获得程序包。
+
+```shell
+git clone https://github.com/oceanbase/flink-connector-oceanbase.git
+cd flink-connector-oceanbase
+mvn clean package -DskipTests
+```
+
+### 注意事项:
+
+* 目前项目支持使用Flink CDC接入多表或整库。同步时需要在 `$FLINK_HOME/lib` 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-\${version}.jar,flink-sql-connector-oracle-cdc-\${version}.jar ,flink-sql-connector-sqlserver-cdc-\${version}.jar
+* 依赖的 Flink CDC 版本需要在 3.1 以上,如果需使用 Flink CDC 同步 MySQL 和 Oracle,还需要在 `$FLINK_HOME/lib` 下增加相关的 JDBC 驱动。
+* 同步至oceanbase时,oceanbase的url连接串需要使用oceanbase或mysql作为协议名称。
+
+### MySQL同步至OceanBase示例
+
+#### 准备
+
+在 MySql 数据库test_db库中创建表 test_history_strt_sink,test_history_text。
+
+```mysql
+use test_db;
+CREATE TABLE test_history_str (
+ itemid bigint NOT NULL,
+ clock integer DEFAULT '0' NOT NULL,
+ value varchar(255) DEFAULT '' NOT NULL,
+ ns integer DEFAULT '0' NOT NULL,
+ PRIMARY KEY (itemid,clock,ns)
+);
+CREATE TABLE test_history_text (
+ itemid bigint NOT NULL,
+ clock integer DEFAULT '0' NOT NULL,
+ value text NOT NULL,
+ ns integer DEFAULT '0' NOT NULL,
+ PRIMARY KEY (itemid,clock,ns)
+);
+```
+
+#### 构建Flink任务
+
+##### Flink命令行示例
+
+```shell
+$FLINK_HOME/bin/flink run \
+ -Dexecution.checkpointing.interval=10s \
+ -Dparallelism.default=1 \
+ -c com.oceanbase.connector.flink.tools.cdc.CdcTools \
+ lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
+ mysql-sync-database \
+ --database test_db \
+ --mysql-conf hostname=xxxx \
+ --mysql-conf port=3306 \
+ --mysql-conf username=root \
+ --mysql-conf password=xxxx \
+ --mysql-conf database-name=test_db \
+ --including-tables "tbl1|test.*" \
+ --sink-conf username=xxxx \
+ --sink-conf password=xxxx \
+ --sink-conf url=jdbc:mysql://xxxx:xxxx
+```
+
+请将以上的数据库信息替换为您真实的数据库信息,当出现类似于以下的信息时,任务构建成功并提交。
+
+```shell
+Job has been submitted with JobID 0177b201a407045a17445aa288f0f111
+```
+
+工具会自动解析命令行的信息并进行建表,可在OceanBase 中查询验证。
+
+MySQl中插入测试数据
+
+```sql
+INSERT INTO test_db.test_history_str (itemid,clock,value,ns) VALUES
+ (1,2,'ces1',1123);
+INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
+ (1,21131,'ces1',21321),
+ (2,21321,'ces2',12321);
+```
+
+由于是CDC任务,MySQL中插入数据后,即可在 OceanBase 中查询验证同步过来的数据。
+
+### 参数解析
+
+该配置是flink的程序配置信息
+
+```shell
+-Dexecution.checkpointing.interval=10s
+-Dparallelism.default=1
+```
+
+指定程序的jar包和程序的入口
+
+```shell
+-c com.oceanbase.connector.flink.tools.cdc.CdcTools \
+lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
+```
+
+数据库名称
+
+```shell
+--database test_db
+```
+
+这个名称是自定义的,意为给这个数据库取的名字,最终作为flink任务的命名规则。
+
+## 配置项
+
+#### 支持的数据源
+
+| 数据源标识 | 数据源 |
+|-------------------------|--------------|
+| mysql-sync-database | mysql数据源 |
+| oracle-sync-database | oracle数据源 |
+| postgres-sync-database | postgres数据源 |
+| sqlserver-sync-database | sqlserver数据源 |
+| db2-sync-database | db2数据源 |
+
+#### 配置项
+
+| 配置项 | 描述 |
+|------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------|
+| --job-name | Flink 任务名称,非必需。 |
+| --database | 同步到 OceanBase 的数据库名。 |
+| --table-prefix | OceanBase表前缀名,例如 --table-prefix ods_。 |
+| --table-suffix | 同上,OceanBase表的后缀名。 |
+| --including-tables | 需要同步的 MySQL 表,可以使用|分隔多个表,并支持正则表达式。比如--including-tables table1。 | `分隔多个表,并支持正则表达式。比如--including-tables table1 |
+| --excluding-tables | 不需要同步的表,用法同上。 |
+| --mysql-conf | MySQL CDCSource 配置,其中 hostname/username/password/database-name 是必需的。同步的库表中含有非主键表时,必须设置scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。
例如:`scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`,不同的库表列之间用`,`隔开。 |
+| --oracle-conf | Oracle CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 |
+| --postgres-conf | Postgres CDCSource 配置,其中 hostname/username/password/database-name/schema-name/slot.name 是必需的。 |
+| --sqlserver-conf | SQLServer CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 |
+| --db2-conf | SQLServer CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 |
+| --sink-conf | 见下面--sink-conf的配置项。 |
+| --ignore-default-value | 关闭同步 MySQL 表结构的默认值。适用于同步 MySQL 数据到 OceanBase 时,字段有默认值,但实际插入数据为 null 情况。 |
+| --create-table-only | 是否只仅仅同步表的结构。 |
+
+#### sink-conf的配置项
+
+| 配置项 | 默认值 | 是否需要 | 描述 |
+|----------|-----|------|-----------------------------------------|
+| url | -- | N | jdbc 连接信息,如:jdbc:mysql://127.0.0.1:2881 |
+| username | -- | Y | 访问 oceanbase 的用户名 |
+| password | -- | Y | 访问 oceanbase 的密码 |
+
+## 参考信息
+
+- [https://github.com/oceanbase/obkv-table-client-java](https://github.com/oceanbase/obkv-table-client-java)
+- [https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector)
+
diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java
similarity index 96%
rename from flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java
rename to flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java
index 073cc081..e1e12d6d 100644
--- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java
+++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java
@@ -39,14 +39,14 @@
import java.util.List;
import java.util.Map;
-/** OceanBase Sink Operate. */
+/** OceanBase Sink Build. */
@Public
-public class oceanBaseSinkOperate implements Serializable {
+public class OceanBaseSinkBuild implements Serializable {
private static final long serialVersionUID = 1L;
protected Configuration sinkConfig;
private final OceanBaseConnectorOptions connectorOptions;
- public oceanBaseSinkOperate(Configuration cdcSinkConfig) {
+ public OceanBaseSinkBuild(Configuration cdcSinkConfig) {
sinkConfig = cdcSinkConfig;
this.connectorOptions = getOceanBaseConnectorOptions();
}
diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java
index fb6b7e95..26e02fe7 100644
--- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java
+++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java
@@ -17,8 +17,8 @@
import com.oceanbase.connector.flink.connection.OceanBaseToolsConnectProvider;
import com.oceanbase.connector.flink.tools.catalog.OceanBaseSchemaFactory;
+import com.oceanbase.connector.flink.tools.catalog.OceanBaseSinkBuild;
import com.oceanbase.connector.flink.tools.catalog.TableSchema;
-import com.oceanbase.connector.flink.tools.catalog.oceanBaseSinkOperate;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
@@ -100,10 +100,10 @@ public void create() {
}
public void build() throws Exception {
- oceanBaseSinkOperate oceanBaseSinkOperate = new oceanBaseSinkOperate(sinkConfig);
+ OceanBaseSinkBuild oceanBaseSinkBuild = new OceanBaseSinkBuild(sinkConfig);
OceanBaseToolsConnectProvider oceanBaseConnectionProvider =
new OceanBaseToolsConnectProvider(
- oceanBaseSinkOperate.getOceanBaseConnectorOptions());
+ oceanBaseSinkBuild.getOceanBaseConnectorOptions());
List schemaList = getSchemaList();
Preconditions.checkState(
!schemaList.isEmpty(),
@@ -157,7 +157,7 @@ public void build() throws Exception {
int sinkParallel = sinkConfig.getInteger(SINK_PARALLELISM, sideOutput.getParallelism());
String uidName = getUidName(targetDbSet, dbTbl);
sideOutput
- .sinkTo(oceanBaseSinkOperate.createGenericOceanBaseSink(dbTbl.f0, dbTbl.f1))
+ .sinkTo(oceanBaseSinkBuild.createGenericOceanBaseSink(dbTbl.f0, dbTbl.f1))
.setParallelism(sinkParallel)
.name(uidName)
.uid(uidName);
@@ -200,7 +200,7 @@ protected boolean isSyncNeeded(String tableName) {
return sync;
}
- protected String getSyncTableList(List syncTables) {
+ public String getSyncTableList(List syncTables) {
// includingTablePattern and ^excludingPattern
if (includingTables == null) {
includingTables = ".*";
@@ -217,7 +217,7 @@ protected String getSyncTableList(List syncTables) {
}
/** Filter table that many tables merge to one. */
- protected HashMap multiToOneRulesParser(
+ public HashMap multiToOneRulesParser(
String multiToOneOrigin, String multiToOneTarget) {
if (StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin)
|| StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)) {
@@ -273,7 +273,7 @@ private void handleTableCreationFailure(Exception ex) {
}
}
- protected Properties getJdbcProperties() {
+ public Properties getJdbcProperties() {
Properties jdbcProps = new Properties();
for (Map.Entry entry : config.toMap().entrySet()) {
String key = entry.getKey();
@@ -285,7 +285,7 @@ protected Properties getJdbcProperties() {
return jdbcProps;
}
- protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {
+ public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {
StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
jdbcProperties.forEach(
(key, value) -> jdbcUrlBuilder.append("&").append(key).append("=").append(value));
diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java
index c7dc5bf8..abb34c9d 100644
--- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java
+++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java
@@ -223,7 +223,7 @@ public String getTableListPrefix() {
}
@Override
- protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {
+ public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) {
StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl);
boolean firstParam = true;
for (Map.Entry