From 349d87c0d88ba560bcd7bde2f83741ba14c5c973 Mon Sep 17 00:00:00 2001 From: zyh Date: Mon, 23 Dec 2024 16:51:33 +0800 Subject: [PATCH 1/5] fix:add flink command line doc --- .../flink-connector-oceanbase-tools-cdc.md | 188 ++++++++++++++++++ .../flink-connector-oceanbase-tools-cdc_cn.md | 188 ++++++++++++++++++ .../tools/catalog/oceanBaseSinkOperate.java | 4 +- .../flink/tools/cdc/DatabaseSync.java | 12 +- .../flink/tools/cdc/db2/Db2DatabaseSync.java | 2 +- .../cdc/postgres/PostgresDatabaseSync.java | 2 +- .../tests/CdcMysqlSyncDatabaseITCase.java | 17 +- .../tools/tests/container/MySqlContainer.java | 8 +- .../tools/tests/tools/DatabaseSyncTest.java | 179 +++++++++++++++++ 9 files changed, 569 insertions(+), 31 deletions(-) create mode 100644 docs/sink/flink-connector-oceanbase-tools-cdc.md create mode 100644 docs/sink/flink-connector-oceanbase-tools-cdc_cn.md create mode 100644 flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/tools/DatabaseSyncTest.java diff --git a/docs/sink/flink-connector-oceanbase-tools-cdc.md b/docs/sink/flink-connector-oceanbase-tools-cdc.md new file mode 100644 index 00000000..9a05bd0f --- /dev/null +++ b/docs/sink/flink-connector-oceanbase-tools-cdc.md @@ -0,0 +1,188 @@ +# Flink Connector OceanBase By Tools CDC + +English | [简体中文](flink-connector-oceanbase-tools-cdc_cn.md) + +This project is a flink command line tool that supports the synchronization of CDC tasks to oceanbase through the Flink command line, which greatly simplifies the command writing of data synchronization to oceanbase through flink. + +## Getting Started + +You can get the release packages at [Releases Page](https://github.com/oceanbase/flink-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbase-directload). + +```xml + + com.oceanbase + flink-connector-oceanbase-tools-cdc + ${project.version} + +``` + +如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定: + +```xml + + com.oceanbase + flink-connector-oceanbase-tools-cdc + ${project.version} + + + + + sonatype-snapshots + Sonatype Snapshot Repository + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + true + + + +``` + +You can also manually build it from the source code. + +```shell +git clone https://github.com/oceanbase/flink-connector-oceanbase.git +cd flink-connector-oceanbase +mvn clean package -DskipTests +``` + +### Notes: + +* Currently, the project supports using Flink CDC to access multiple tables or the entire database. During synchronization, you need to add the corresponding Flink CDC dependency in the `$FLINK_HOME/lib` directory, such as flink-sql-connector-mysql-cdc-\${version}. jar, flink-sql-connector-oracle-cdc-\${version}.jar, flink-sql-connector-sqlserver-cdc-\${version}.jar +* The dependent Flink CDC version needs to be above 3.1. If you need to use Flink CDC to synchronize MySQL and Oracle, you also need to add the relevant JDBC driver under `$FLINK_HOME/lib` +* When synchronizing to oceanbase, the url connection string of oceanbase needs to use the mysql protocol. + +### MySQL Synchronous OceanBase Example + +#### Geting Ready + +Create a table test_history_strt_sink in a MySql database test_db library, test_history_text. + +```mysql +use test_db; +CREATE TABLE test_history_str ( + itemid bigint NOT NULL, + clock integer DEFAULT '0' NOT NULL, + value varchar(255) DEFAULT '' NOT NULL, + ns integer DEFAULT '0' NOT NULL, + PRIMARY KEY (itemid,clock,ns) +); +CREATE TABLE test_history_text ( + itemid bigint NOT NULL, + clock integer DEFAULT '0' NOT NULL, + value text NOT NULL, + ns integer DEFAULT '0' NOT NULL, + PRIMARY KEY (itemid,clock,ns) +); +``` + +#### Build A Flink Task + +##### An example of the Flink command line + +```shell +$FLINK_HOME/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c com.oceanbase.connector.flink.tools.cdc.CdcTools \ + lib/flink-connector-oceanbase-tools-cdc-${version}.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=xxxx \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=xxxx \ + --mysql-conf database-name=test_db \ + --including-tables "tbl1|test.*" \ + --sink-conf username=xxxx \ + --sink-conf password=xxxx \ + --sink-conf url=jdbc:mysql://xxxx:xxxx +``` + +Replace the above database information with your real database information, and when a message similar to the following appears, the task is successfully built and submitted. + +```shell +Job has been submitted with JobID 0177b201a407045a17445aa288f0f111 +``` + +The tool automatically parses the information on the command line and creates a table, which can be queried and verified in OceanBase. + +MySQL to insert test data + +```sql +INSERT INTO test_db.test_history_str (itemid,clock,value,ns) VALUES + (1,2,'ces1',1123); +INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES + (1,21131,'ces1',21321), + (2,21321,'ces2',12321); +``` + +Since it is a CDC task, after data is inserted in MySQL, you can query and verify the synchronized data in OceanBase. + +### Parameter parsing + +This configuration is the program configuration information of flink + +```shell +-Dexecution.checkpointing.interval=10s +-Dparallelism.default=1 +``` + +Specify the JAR package of the program and the entry of the program + +```shell +-c com.oceanbase.connector.flink.tools.cdc.CdcTools \ +lib/flink-connector-oceanbase-tools-cdc-${version}.jar \ +``` + +The name of the database + +```shell +--database test_db +``` + +This name is customized, meaning the name given to this database, and ultimately serves as the naming rule for flink tasks. + +## Configuration Items + +#### Supported data sources + +| Data source identifier | Data source | +|-------------------------|----------------------| +| mysql-sync-database | mysql datasource | +| oracle-sync-database | oracle datasource | +| postgres-sync-database | postgres datasource | +| sqlserver-sync-database | sqlserver datasource | +| db2-sync-database | db2 datasource | + +#### Configuration Items + +| Configuration Items | Comment | +|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| --job-name | Flink task name, optional. | +| --database | Database name synchronized to OceanBase. | +| --table-prefix | OceanBase table prefix name, such as --table-prefix ods_. | +| --table-suffix | Same as above, the suffix name of the OceanBase table. | +| --including-tables | For MySQL tables that need to be synchronized, you can use|to separate multiple tables and support regular expressions. For example --including-tables table1. | +| --excluding-tables | For tables that do not need to be synchronized, the usage is the same as above. | +| --mysql-conf | MySQL CDC Source configuration,where hostname/username/password/database-name is required. When the synchronized library table contains a non-primary key table, `scan.incremental.snapshot.chunk.key-column` must be set, and only one field of non-null type can be selected.
For example: `scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`, different database table columns are separated by `,`. | +| --oracle-conf | Oracle CDC Source configuration,where hostname/username/password/database-name/schema-name is required. | +| --postgres-conf | Postgres CDC Source configuration,where hostname/username/password/database-name/schema-name/slot.name is required. | +| --sqlserver-conf | SQLServer CDC Source configuration, where hostname/username/password/database-name/schema-name is required. | +| --db2-conf | Db2 CDC Source configuration, where hostname/username/password/database-name/schema-name is required. | +| --sink-conf | See below --sink-conf configuration items. | +| --ignore-default-value | Turn off synchronization of MySQL table structures by default. It is suitable for the situation when synchronizing MySQL data to oceanbase, the field has a default value, but the actual inserted data is null. | +| --create-table-only | Whether to only synchronize the structure of the table. | + +#### Configuration items of sink-conf + +| Configuration Items | Default Value | Required | Comment | +|---------------------|---------------|----------|-------------------------------------------------------------------| +| url | -- | N | jdbc connection information, such as: jdbc:mysql://127.0.0.1:2881 | +| username | -- | Y | Username to access oceanbase | +| password | -- | Y | Password to access oceanbase | + +## Reference information + +- [https://github.com/oceanbase/obkv-table-client-java](https://github.com/oceanbase/obkv-table-client-java) +- [https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector) + diff --git a/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md b/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md new file mode 100644 index 00000000..b4cbc0d1 --- /dev/null +++ b/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md @@ -0,0 +1,188 @@ +# Flink Connector OceanBase By Tools CDC + +[English](flink-connector-oceanbase-tools-cdc.md) | 简体中文 + +本项目是一个可以支持通过Flink命令行构建cdc任务同步到oceanbase的flink命令行工具,极大的简化了通过flink同步数据到oceanbase的命令书写。 + +## 开始上手 + +您可以在 [Releases 页面](https://github.com/oceanbase/flink-connector-oceanbase/releases) 或者 [Maven 中央仓库](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbas-directload) 找到正式的发布版本。 + +```xml + + com.oceanbase + flink-connector-oceanbase-tools-cdc + ${project.version} + +``` + +如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定: + +```xml + + com.oceanbase + flink-connector-oceanbase-tools-cdc + ${project.version} + + + + + sonatype-snapshots + Sonatype Snapshot Repository + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + true + + + +``` + +您也可以通过源码构建的方式获得程序包。 + +```shell +git clone https://github.com/oceanbase/flink-connector-oceanbase.git +cd flink-connector-oceanbase +mvn clean package -DskipTests +``` + +### 注意事项: + +* 目前项目支持使用Flink CDC接入多表或整库。同步时需要在 `$FLINK_HOME/lib` 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-\${version}.jar,flink-sql-connector-oracle-cdc-\${version}.jar ,flink-sql-connector-sqlserver-cdc-\${version}.jar +* 依赖的 Flink CDC 版本需要在 3.1 以上,如果需使用 Flink CDC 同步 MySQL 和 Oracle,还需要在 `$FLINK_HOME/lib` 下增加相关的 JDBC 驱动。 +* 同步至oceanbase时,oceanbase的url连接串需要使用mysql的协议。 + +### MySQL同步至OceanBase示例 + +#### 准备 + +在 MySql 数据库test_db库中创建表 test_history_strt_sink,test_history_text。 + +```mysql +use test_db; +CREATE TABLE test_history_str ( + itemid bigint NOT NULL, + clock integer DEFAULT '0' NOT NULL, + value varchar(255) DEFAULT '' NOT NULL, + ns integer DEFAULT '0' NOT NULL, + PRIMARY KEY (itemid,clock,ns) +); +CREATE TABLE test_history_text ( + itemid bigint NOT NULL, + clock integer DEFAULT '0' NOT NULL, + value text NOT NULL, + ns integer DEFAULT '0' NOT NULL, + PRIMARY KEY (itemid,clock,ns) +); +``` + +#### 构建Flink任务 + +##### Flink命令行示例 + +```shell +$FLINK_HOME/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c com.oceanbase.connector.flink.tools.cdc.CdcTools \ + lib/flink-connector-oceanbase-tools-cdc-${version}.jar \ + mysql-sync-database \ + --database test_db \ + --mysql-conf hostname=xxxx \ + --mysql-conf port=3306 \ + --mysql-conf username=root \ + --mysql-conf password=xxxx \ + --mysql-conf database-name=test_db \ + --including-tables "tbl1|test.*" \ + --sink-conf username=xxxx \ + --sink-conf password=xxxx \ + --sink-conf url=jdbc:mysql://xxxx:xxxx +``` + +请将以上的数据库信息替换为您真实的数据库信息,当出现类似于以下的信息时,任务构建成功并提交。 + +```shell +Job has been submitted with JobID 0177b201a407045a17445aa288f0f111 +``` + +工具会自动解析命令行的信息并进行建表,可在OceanBase 中查询验证。 + +MySQl中插入测试数据 + +```sql +INSERT INTO test_db.test_history_str (itemid,clock,value,ns) VALUES + (1,2,'ces1',1123); +INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES + (1,21131,'ces1',21321), + (2,21321,'ces2',12321); +``` + +由于是CDC任务,MySQL中插入数据后,即可在 OceanBase 中查询验证同步过来的数据。 + +### 参数解析 + +该配置是flink的程序配置信息 + +```shell +-Dexecution.checkpointing.interval=10s +-Dparallelism.default=1 +``` + +指定程序的jar包和程序的入口 + +```shell +-c com.oceanbase.connector.flink.tools.cdc.CdcTools \ +lib/flink-connector-oceanbase-tools-cdc-${version}.jar \ +``` + +数据库名称 + +```shell +--database test_db +``` + +这个名称是自定义的,意为给这个数据库取的名字,最终作为flink任务的命名规则。 + +## 配置项 + +#### 支持的数据源 + +| 数据源标识 | 数据源 | +|-------------------------|--------------| +| mysql-sync-database | mysql数据源 | +| oracle-sync-database | oracle数据源 | +| postgres-sync-database | postgres数据源 | +| sqlserver-sync-database | sqlserver数据源 | +| db2-sync-database | db2数据源 | + +#### 配置项 + +| 配置项 | 描述 | +|------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------| +| --job-name | Flink 任务名称,非必需。 | +| --database | 同步到 OceanBase 的数据库名。 | +| --table-prefix | OceanBase表前缀名,例如 --table-prefix ods_。 | +| --table-suffix | 同上,OceanBase表的后缀名。 | +| --including-tables | 需要同步的 MySQL 表,可以使用|分隔多个表,并支持正则表达式。比如--including-tables table1。 | `分隔多个表,并支持正则表达式。比如--including-tables table1 | +| --excluding-tables | 不需要同步的表,用法同上。 | +| --mysql-conf | MySQL CDCSource 配置,其中 hostname/username/password/database-name 是必需的。同步的库表中含有非主键表时,必须设置scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。
例如:`scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...`,不同的库表列之间用`,`隔开。 | +| --oracle-conf | Oracle CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 | +| --postgres-conf | Postgres CDCSource 配置,其中 hostname/username/password/database-name/schema-name/slot.name 是必需的。 | +| --sqlserver-conf | SQLServer CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 | +| --db2-conf | SQLServer CDCSource 配置,其中 hostname/username/password/database-name/schema-name 是必需的。 | +| --sink-conf | 见下面--sink-conf的配置项。 | +| --ignore-default-value | 关闭同步 MySQL 表结构的默认值。适用于同步 MySQL 数据到 OceanBase 时,字段有默认值,但实际插入数据为 null 情况。 | +| --create-table-only | 是否只仅仅同步表的结构。 | + +#### sink-conf的配置项 + +| 配置项 | 默认值 | 是否需要 | 描述 | +|----------|-----|------|-----------------------------------------| +| url | -- | N | jdbc 连接信息,如:jdbc:mysql://127.0.0.1:2881 | +| username | -- | Y | 访问 oceanbase 的用户名 | +| password | -- | Y | 访问 oceanbase 的密码 | + +## 参考信息 + +- [https://github.com/oceanbase/obkv-table-client-java](https://github.com/oceanbase/obkv-table-client-java) +- [https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector) + diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java index 073cc081..09783c19 100644 --- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java +++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java @@ -41,12 +41,12 @@ /** OceanBase Sink Operate. */ @Public -public class oceanBaseSinkOperate implements Serializable { +public class OceanBaseSinkOperate implements Serializable { private static final long serialVersionUID = 1L; protected Configuration sinkConfig; private final OceanBaseConnectorOptions connectorOptions; - public oceanBaseSinkOperate(Configuration cdcSinkConfig) { + public OceanBaseSinkOperate(Configuration cdcSinkConfig) { sinkConfig = cdcSinkConfig; this.connectorOptions = getOceanBaseConnectorOptions(); } diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java index fb6b7e95..cf6cf124 100644 --- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java +++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java @@ -17,8 +17,8 @@ import com.oceanbase.connector.flink.connection.OceanBaseToolsConnectProvider; import com.oceanbase.connector.flink.tools.catalog.OceanBaseSchemaFactory; +import com.oceanbase.connector.flink.tools.catalog.OceanBaseSinkOperate; import com.oceanbase.connector.flink.tools.catalog.TableSchema; -import com.oceanbase.connector.flink.tools.catalog.oceanBaseSinkOperate; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; @@ -100,7 +100,7 @@ public void create() { } public void build() throws Exception { - oceanBaseSinkOperate oceanBaseSinkOperate = new oceanBaseSinkOperate(sinkConfig); + OceanBaseSinkOperate oceanBaseSinkOperate = new OceanBaseSinkOperate(sinkConfig); OceanBaseToolsConnectProvider oceanBaseConnectionProvider = new OceanBaseToolsConnectProvider( oceanBaseSinkOperate.getOceanBaseConnectorOptions()); @@ -200,7 +200,7 @@ protected boolean isSyncNeeded(String tableName) { return sync; } - protected String getSyncTableList(List syncTables) { + public String getSyncTableList(List syncTables) { // includingTablePattern and ^excludingPattern if (includingTables == null) { includingTables = ".*"; @@ -217,7 +217,7 @@ protected String getSyncTableList(List syncTables) { } /** Filter table that many tables merge to one. */ - protected HashMap multiToOneRulesParser( + public HashMap multiToOneRulesParser( String multiToOneOrigin, String multiToOneTarget) { if (StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin) || StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)) { @@ -273,7 +273,7 @@ private void handleTableCreationFailure(Exception ex) { } } - protected Properties getJdbcProperties() { + public Properties getJdbcProperties() { Properties jdbcProps = new Properties(); for (Map.Entry entry : config.toMap().entrySet()) { String key = entry.getKey(); @@ -285,7 +285,7 @@ protected Properties getJdbcProperties() { return jdbcProps; } - protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); jdbcProperties.forEach( (key, value) -> jdbcUrlBuilder.append("&").append(key).append("=").append(value)); diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java index c7dc5bf8..abb34c9d 100644 --- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java +++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/db2/Db2DatabaseSync.java @@ -223,7 +223,7 @@ public String getTableListPrefix() { } @Override - protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { StringBuilder jdbcUrlBuilder = new StringBuilder(initialJdbcUrl); boolean firstParam = true; for (Map.Entry entry : jdbcProperties.entrySet()) { diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/postgres/PostgresDatabaseSync.java index dc95e39f..da1f3cc1 100644 --- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -228,7 +228,7 @@ public String getTableListPrefix() { } @Override - protected String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { + public String getJdbcUrlTemplate(String initialJdbcUrl, Properties jdbcProperties) { if (!initialJdbcUrl.startsWith("?")) { return super.getJdbcUrlTemplate(initialJdbcUrl, jdbcProperties); diff --git a/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java index 038d4194..cd162db1 100644 --- a/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java +++ b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java @@ -145,7 +145,7 @@ private static void extractedCdcSync() throws Exception { .setCreateTableOnly(false) .create(); databaseSync.build(); - env.executeAsync(String.format("MySQL-Doris Database Sync: %s", MYSQL_DATABASE)); + env.executeAsync(String.format("MySQL-OceanBase Database Sync: %s", MYSQL_DATABASE)); } static void checkResult() { @@ -153,10 +153,7 @@ static void checkResult() { String sinkSql = String.format("select * from %s order by 1", MYSQL_TABLE_NAME); try (Statement sourceStatement = getConnection( - getJdbcUrl( - MYSQL_HOST, - MYSQL_CONTAINER.getMappedPort(MYSQL_PORT), - MYSQL_DATABASE), + MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER_NAME, MYSQL_USER_PASSWORD) .createStatement( @@ -209,14 +206,4 @@ public static Connection getConnection(String jdbcUrl, String userName, String p throws SQLException { return DriverManager.getConnection(jdbcUrl, userName, password); } - - public static String getJdbcUrl(String host, Integer port, String schema) { - return "jdbc:mysql://" - + host - + ":" - + port - + "/" - + schema - + "?useUnicode=true&characterEncoding=UTF-8&useSSL=false"; - } } diff --git a/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/container/MySqlContainer.java b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/container/MySqlContainer.java index e6de4270..373cff9d 100644 --- a/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/container/MySqlContainer.java +++ b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/container/MySqlContainer.java @@ -25,7 +25,7 @@ /** * Docker container for MySQL. The difference between this class and {@link - * org.testcontainers.containers.MySqlContainer} is that TC MySQLContainer has problems when + * org.testcontainers.containers.MySQLContainer} is that TC MySQLContainer has problems when * overriding mysql conf file, i.e. my.cnf. */ @SuppressWarnings("MagicNumber") @@ -92,11 +92,7 @@ public String getJdbcUrl(String databaseName) { + getDatabasePort() + "/" + databaseName - + additionalUrlParams; - } - - public void setDatabaseName(String databaseName) { - this.databaseName = databaseName; + + "?serverTimezone=Asia/Shanghai"; } @Override diff --git a/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/tools/DatabaseSyncTest.java b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/tools/DatabaseSyncTest.java new file mode 100644 index 00000000..f55f6ab2 --- /dev/null +++ b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/tools/DatabaseSyncTest.java @@ -0,0 +1,179 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.cdc.tools.tests.tools; + +import com.oceanbase.connector.flink.tools.cdc.DatabaseSync; +import com.oceanbase.connector.flink.tools.cdc.db2.Db2DatabaseSync; +import com.oceanbase.connector.flink.tools.cdc.mysql.MysqlDatabaseSync; +import com.oceanbase.connector.flink.tools.cdc.postgres.PostgresDatabaseSync; +import com.oceanbase.connector.flink.tools.cdc.sqlserver.SqlServerDatabaseSync; + +import org.apache.flink.configuration.Configuration; + +import org.junit.Assert; +import org.junit.Test; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.*; + +/** Unit tests for the {@link DatabaseSync}. */ +public class DatabaseSyncTest { + @Test + public void multiToOneRulesParserTest() throws Exception { + String[][] testCase = { + {"a_.*|b_.*", "a|b"} // Normal condition + // ,{"a_.*|b_.*","a|b|c"} // Unequal length + // ,{"",""} // Null value + // ,{"***....","a"} // Abnormal regular expression + }; + DatabaseSync databaseSync = new MysqlDatabaseSync(); + Arrays.stream(testCase) + .forEach( + arr -> { + databaseSync.multiToOneRulesParser(arr[0], arr[1]); + }); + } + + @Test + public void getSyncTableListTest() throws Exception { + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.setIncludingTables("tbl_1|tbl_2"); + Configuration config = new Configuration(); + config.setString("database-name", "db"); + config.setString("table-name", "tbl.*"); + databaseSync.setConfig(config); + String syncTableList = databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2")); + assertEquals("(db)\\.(tbl_1|tbl_2)", syncTableList); + } + + @Test + public void singleSinkTablePatternTest() throws SQLException { + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.setIncludingTables(".*"); + databaseSync.setExcludingTables("customer|dates|lineorder"); + Configuration config = new Configuration(); + config.setString("database-name", "ssb_test"); + databaseSync.setConfig(config); + List tableList = + Arrays.asList("customer", "dates", "lineorder", "test1", "test2", "test3"); + String syncTableListPattern = databaseSync.getSyncTableList(tableList); + assertTrue("ssb_test.test1".matches(syncTableListPattern)); + assertTrue("ssb_test.test2".matches(syncTableListPattern)); + assertTrue("ssb_test.test3".matches(syncTableListPattern)); + assertFalse("ssb_test.customer".matches(syncTableListPattern)); + assertFalse("ssb_test.dates".matches(syncTableListPattern)); + assertFalse("ssb_test.lineorder".matches(syncTableListPattern)); + } + + @Test + public void getJdbcPropertiesTest() throws Exception { + DatabaseSync databaseSync = new MysqlDatabaseSync(); + Map mysqlConfig = new HashMap<>(); + mysqlConfig.put("jdbc.properties.use_ssl", "false"); + + Configuration config = Configuration.fromMap(mysqlConfig); + databaseSync.setConfig(config); + Properties jdbcProperties = databaseSync.getJdbcProperties(); + Assert.assertEquals(1, jdbcProperties.size()); + Assert.assertEquals("false", jdbcProperties.getProperty("use_ssl")); + } + + @Test + public void getJdbcUrlTemplateTest() throws SQLException { + String mysqlJdbcTemplate = "jdbc:mysql://%s:%d?useInformationSchema=true"; + String postgresJdbcTemplate = "jdbc:postgresql://%s:%d/%s?"; + String sqlServerJdbcTemplate = "jdbc:sqlserver://%s:%d;database=%s;"; + String db2JdbcTemplate = "jdbc:db2://%s:%d/%s"; + + // mysql jdbc properties configuration + DatabaseSync mysqlDatabaseSync = new MysqlDatabaseSync(); + Map mysqlJdbcConfig = new LinkedHashMap<>(); + mysqlJdbcConfig.put("jdbc.properties.use_ssl", "false"); + + DatabaseSync postgresDatabaseSync = new PostgresDatabaseSync(); + Map postgresJdbcConfig = new LinkedHashMap<>(); + postgresJdbcConfig.put("jdbc.properties.ssl", "false"); + + DatabaseSync sqlServerDatabaseSync = new SqlServerDatabaseSync(); + Map sqlServerJdbcConfig = new LinkedHashMap<>(); + sqlServerJdbcConfig.put("jdbc.properties.encrypt", "false"); + sqlServerJdbcConfig.put("jdbc.properties.integratedSecurity", "false"); + + DatabaseSync db2DatabaseSync = new Db2DatabaseSync(); + Map db2JdbcConfig = new LinkedHashMap<>(); + db2JdbcConfig.put("jdbc.properties.ssl", "false"); + db2JdbcConfig.put("jdbc.properties.allowNextOnExhaustedResultSet", "1"); + db2JdbcConfig.put("jdbc.properties.resultSetHoldability", "1"); + + Configuration mysqlConfig = Configuration.fromMap(mysqlJdbcConfig); + mysqlDatabaseSync.setConfig(mysqlConfig); + + Configuration postgresConfig = Configuration.fromMap(postgresJdbcConfig); + postgresDatabaseSync.setConfig(postgresConfig); + + Configuration sqlServerConfig = Configuration.fromMap(sqlServerJdbcConfig); + sqlServerDatabaseSync.setConfig(sqlServerConfig); + + Configuration db2Config = Configuration.fromMap(db2JdbcConfig); + db2DatabaseSync.setConfig(db2Config); + + Properties mysqlJdbcProperties = mysqlDatabaseSync.getJdbcProperties(); + Assert.assertEquals(1, mysqlJdbcProperties.size()); + Assert.assertEquals("false", mysqlJdbcProperties.getProperty("use_ssl")); + String mysqlJdbcUrlTemplate = + mysqlDatabaseSync.getJdbcUrlTemplate(mysqlJdbcTemplate, mysqlJdbcProperties); + Assert.assertEquals(mysqlJdbcTemplate + "&use_ssl=false", mysqlJdbcUrlTemplate); + + Properties postgresJdbcProperties = postgresDatabaseSync.getJdbcProperties(); + Assert.assertEquals(1, postgresJdbcProperties.size()); + Assert.assertEquals("false", postgresJdbcProperties.getProperty("ssl")); + String postgresJdbcUrlTemplate = + postgresDatabaseSync.getJdbcUrlTemplate( + postgresJdbcTemplate, postgresJdbcProperties); + Assert.assertEquals(postgresJdbcTemplate + "&ssl=false", postgresJdbcUrlTemplate); + + Properties sqlServerJdbcProperties = sqlServerDatabaseSync.getJdbcProperties(); + Assert.assertEquals(2, sqlServerJdbcProperties.size()); + Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("encrypt")); + Assert.assertEquals("false", sqlServerJdbcProperties.getProperty("integratedSecurity")); + String sqlServerJdbcUrlTemplate = + sqlServerDatabaseSync.getJdbcUrlTemplate( + sqlServerJdbcTemplate, sqlServerJdbcProperties); + Assert.assertEquals( + sqlServerJdbcTemplate + "encrypt=false;integratedSecurity=false;", + sqlServerJdbcUrlTemplate); + + Properties db2JdbcProperties = db2DatabaseSync.getJdbcProperties(); + Assert.assertEquals(3, db2JdbcProperties.size()); + Assert.assertEquals("false", db2JdbcProperties.getProperty("ssl")); + Assert.assertEquals("1", db2JdbcProperties.getProperty("allowNextOnExhaustedResultSet")); + Assert.assertEquals("1", db2JdbcProperties.getProperty("resultSetHoldability")); + String db2JdbcUrlTemplate = + db2DatabaseSync.getJdbcUrlTemplate(db2JdbcTemplate, db2JdbcProperties); + Assert.assertEquals( + db2JdbcTemplate + + ":allowNextOnExhaustedResultSet=1;ssl=false;resultSetHoldability=1;", + db2JdbcUrlTemplate); + } +} From 907b9e121722207b49e1ed2cd65bd4e192ab0dd1 Mon Sep 17 00:00:00 2001 From: zyh Date: Mon, 23 Dec 2024 17:10:05 +0800 Subject: [PATCH 2/5] fix:add flink command line doc --- ...{oceanBaseSinkOperate.java => OceanBaseSinkBuild.java} | 4 ++-- .../oceanbase/connector/flink/tools/cdc/DatabaseSync.java | 8 ++++---- pom.xml | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) rename flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/{oceanBaseSinkOperate.java => OceanBaseSinkBuild.java} (97%) diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java similarity index 97% rename from flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java rename to flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java index 09783c19..cebc7c8d 100644 --- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/oceanBaseSinkOperate.java +++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java @@ -41,12 +41,12 @@ /** OceanBase Sink Operate. */ @Public -public class OceanBaseSinkOperate implements Serializable { +public class OceanBaseSinkBuild implements Serializable { private static final long serialVersionUID = 1L; protected Configuration sinkConfig; private final OceanBaseConnectorOptions connectorOptions; - public OceanBaseSinkOperate(Configuration cdcSinkConfig) { + public OceanBaseSinkBuild(Configuration cdcSinkConfig) { sinkConfig = cdcSinkConfig; this.connectorOptions = getOceanBaseConnectorOptions(); } diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java index cf6cf124..26e02fe7 100644 --- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java +++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/cdc/DatabaseSync.java @@ -17,7 +17,7 @@ import com.oceanbase.connector.flink.connection.OceanBaseToolsConnectProvider; import com.oceanbase.connector.flink.tools.catalog.OceanBaseSchemaFactory; -import com.oceanbase.connector.flink.tools.catalog.OceanBaseSinkOperate; +import com.oceanbase.connector.flink.tools.catalog.OceanBaseSinkBuild; import com.oceanbase.connector.flink.tools.catalog.TableSchema; import org.apache.flink.api.java.tuple.Tuple2; @@ -100,10 +100,10 @@ public void create() { } public void build() throws Exception { - OceanBaseSinkOperate oceanBaseSinkOperate = new OceanBaseSinkOperate(sinkConfig); + OceanBaseSinkBuild oceanBaseSinkBuild = new OceanBaseSinkBuild(sinkConfig); OceanBaseToolsConnectProvider oceanBaseConnectionProvider = new OceanBaseToolsConnectProvider( - oceanBaseSinkOperate.getOceanBaseConnectorOptions()); + oceanBaseSinkBuild.getOceanBaseConnectorOptions()); List schemaList = getSchemaList(); Preconditions.checkState( !schemaList.isEmpty(), @@ -157,7 +157,7 @@ public void build() throws Exception { int sinkParallel = sinkConfig.getInteger(SINK_PARALLELISM, sideOutput.getParallelism()); String uidName = getUidName(targetDbSet, dbTbl); sideOutput - .sinkTo(oceanBaseSinkOperate.createGenericOceanBaseSink(dbTbl.f0, dbTbl.f1)) + .sinkTo(oceanBaseSinkBuild.createGenericOceanBaseSink(dbTbl.f0, dbTbl.f1)) .setParallelism(sinkParallel) .name(uidName) .uid(uidName); diff --git a/pom.xml b/pom.xml index 061f5457..5cdba1b5 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,7 @@ under the License. com.oceanbase obkv-table-client - 1.2.14-SNAPSHOT + 1.3.0 org.slf4j From 98261a958c70fc68c909d8053c9f8be402f9a103 Mon Sep 17 00:00:00 2001 From: zyh Date: Tue, 24 Dec 2024 09:39:39 +0800 Subject: [PATCH 3/5] Empty-Commit --- .idea/vcs.xml | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 61bb2f0c..fe008969 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,18 +1,4 @@ - - + - + \ No newline at end of file From 87fc5d6c7021ccb2d324c8a7f4af59a31ce24cca Mon Sep 17 00:00:00 2001 From: zyh Date: Tue, 24 Dec 2024 09:41:03 +0800 Subject: [PATCH 4/5] fix:add flink command line doc --- .idea/vcs.xml | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index fe008969..61bb2f0c 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,4 +1,18 @@ + - + - \ No newline at end of file + From 148b604d891fc9145c7510e5f014943c56ef34b2 Mon Sep 17 00:00:00 2001 From: zyh Date: Fri, 27 Dec 2024 15:24:09 +0800 Subject: [PATCH 5/5] fix:add flink command line doc --- docs/sink/flink-connector-oceanbase-tools-cdc.md | 8 ++++---- docs/sink/flink-connector-oceanbase-tools-cdc_cn.md | 2 +- .../connector/flink/tools/catalog/OceanBaseSinkBuild.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/sink/flink-connector-oceanbase-tools-cdc.md b/docs/sink/flink-connector-oceanbase-tools-cdc.md index 9a05bd0f..34863104 100644 --- a/docs/sink/flink-connector-oceanbase-tools-cdc.md +++ b/docs/sink/flink-connector-oceanbase-tools-cdc.md @@ -16,7 +16,7 @@ You can get the release packages at [Releases Page](https://github.com/oceanbase ``` -如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定: +If you want to use the latest snapshot version, you can specify by configuring the Maven snapshot repository: ```xml @@ -47,9 +47,9 @@ mvn clean package -DskipTests ### Notes: -* Currently, the project supports using Flink CDC to access multiple tables or the entire database. During synchronization, you need to add the corresponding Flink CDC dependency in the `$FLINK_HOME/lib` directory, such as flink-sql-connector-mysql-cdc-\${version}. jar, flink-sql-connector-oracle-cdc-\${version}.jar, flink-sql-connector-sqlserver-cdc-\${version}.jar -* The dependent Flink CDC version needs to be above 3.1. If you need to use Flink CDC to synchronize MySQL and Oracle, you also need to add the relevant JDBC driver under `$FLINK_HOME/lib` -* When synchronizing to oceanbase, the url connection string of oceanbase needs to use the mysql protocol. +* Currently, the project supports using Flink CDC to access multiple tables or the entire database. During synchronization, you need to add the corresponding Flink CDC dependency in the `$FLINK_HOME/lib` directory, such as flink-sql-connector-mysql-cdc-\${version}. jar, flink-sql-connector-oracle-cdc-\${version}.jar, flink-sql-connector-sqlserver-cdc-\${version}.jar. + * The dependent Flink CDC version needs to be above 3.1. If you need to use Flink CDC to synchronize MySQL and Oracle, you also need to add the relevant JDBC driver under `$FLINK_HOME/lib`. + * If you synchronize data to OceanBase, you must use oceanBase or mysql as the protocol name for the URL connection string of OceanBase. ### MySQL Synchronous OceanBase Example diff --git a/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md b/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md index b4cbc0d1..fecd5512 100644 --- a/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md +++ b/docs/sink/flink-connector-oceanbase-tools-cdc_cn.md @@ -49,7 +49,7 @@ mvn clean package -DskipTests * 目前项目支持使用Flink CDC接入多表或整库。同步时需要在 `$FLINK_HOME/lib` 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc-\${version}.jar,flink-sql-connector-oracle-cdc-\${version}.jar ,flink-sql-connector-sqlserver-cdc-\${version}.jar * 依赖的 Flink CDC 版本需要在 3.1 以上,如果需使用 Flink CDC 同步 MySQL 和 Oracle,还需要在 `$FLINK_HOME/lib` 下增加相关的 JDBC 驱动。 -* 同步至oceanbase时,oceanbase的url连接串需要使用mysql的协议。 +* 同步至oceanbase时,oceanbase的url连接串需要使用oceanbase或mysql作为协议名称。 ### MySQL同步至OceanBase示例 diff --git a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java index cebc7c8d..e1e12d6d 100644 --- a/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java +++ b/flink-connector-oceanbase-tools-cdc/src/main/java/com/oceanbase/connector/flink/tools/catalog/OceanBaseSinkBuild.java @@ -39,7 +39,7 @@ import java.util.List; import java.util.Map; -/** OceanBase Sink Operate. */ +/** OceanBase Sink Build. */ @Public public class OceanBaseSinkBuild implements Serializable { private static final long serialVersionUID = 1L;