Skip to content

Commit

Permalink
use flink cdc 3.2.0 and update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 10, 2025
1 parent 68b0549 commit c38bd8f
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 184 deletions.
Original file line number Diff line number Diff line change
@@ -1,41 +1,12 @@
# Flink Connector OceanBase By Tools CDC
# Flink Connector OceanBase CLI

English | [简体中文](flink-connector-oceanbase-tools-cdc_cn.md)
English | [简体中文](flink-connector-oceanbase-cli_cn.md)

This project is a flink command line tool that supports the synchronization of CDC tasks to oceanbase through the Flink command line, which greatly simplifies the command writing of data synchronization to oceanbase through flink.
The project is a set of CLI (command line interface) tools that supports submitting Flink jobs to migrate data from other data sources to OceanBase.

## Getting Started

You can get the release packages at [Releases Page](https://github.com/oceanbase/flink-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbase-directload).

```xml
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-tools-cdc</artifactId>
<version>${project.version}</version>
</dependency>
```

If you want to use the latest snapshot version, you can specify by configuring the Maven snapshot repository:

```xml
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-tools-cdc</artifactId>
<version>${project.version}</version>
</dependency>

<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
```
You can get the release packages at [Releases Page](https://github.com/oceanbase/flink-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbase-cli),or get the latest snapshot packages at [Sonatype Snapshot](https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-connector-oceanbase-cli).

You can also manually build it from the source code.

Expand All @@ -45,17 +16,23 @@ cd flink-connector-oceanbase
mvn clean package -DskipTests
```

### Notes:
### Using Flink CDC as Source

#### Dependencies

This project is based on the SQL Client JAR of [Flink CDC Source Connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/overview/).

We do not provide Flink CDC Source Connector in the JAR package of this project, so you need to manually download the used Flink CDC SQL JAR. Note that this project requires Flink CDC to be 3.2.0 or later version.

If you're using Flink Oracle CDC as source, you need also download the dependencies of the source connector, see the *Dependencies* chapter of [Oracle CDC Connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/oracle-cdc/#sql-client-jar).

* Currently, the project supports using Flink CDC to access multiple tables or the entire database. During synchronization, you need to add the corresponding Flink CDC dependency in the `$FLINK_HOME/lib` directory, such as flink-sql-connector-mysql-cdc-\${version}. jar, flink-sql-connector-oracle-cdc-\${version}.jar, flink-sql-connector-sqlserver-cdc-\${version}.jar.
* The dependent Flink CDC version needs to be above 3.1. If you need to use Flink CDC to synchronize MySQL and Oracle, you also need to add the relevant JDBC driver under `$FLINK_HOME/lib`.
* If you synchronize data to OceanBase, you must use oceanBase or mysql as the protocol name for the URL connection string of OceanBase.
#### Demo: Migrate from Flink MySQL CDC to OceanBase

### MySQL Synchronous OceanBase Example
##### Preparation

#### Geting Ready
Add the CLI JAR `flink-connector-oceanbase-cli-xxx.jar` and dependency JAR `flink-sql-connector-mysql-cdc-xxx.jar` to `$FLINK_HOME/lib`.

Create a table test_history_strt_sink in a MySql database test_db library, test_history_text.
Then prepare tables and data in MySQL database.

```mysql
use test_db;
Expand All @@ -73,18 +50,21 @@ CREATE TABLE test_history_text (
ns integer DEFAULT '0' NOT NULL,
PRIMARY KEY (itemid,clock,ns)
);

INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
(1,21131,'ces1',21321);
```

#### Build A Flink Task
##### Submit Job via CLI

##### An example of the Flink command line
Replace the following command with your real database information, and execute it to submit a Flink job.

```shell
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.oceanbase.connector.flink.tools.cdc.CdcTools \
lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
lib/flink-connector-oceanbase-cli-xxx.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=xxxx \
Expand All @@ -98,66 +78,26 @@ $FLINK_HOME/bin/flink run \
--sink-conf url=jdbc:mysql://xxxx:xxxx
```

Replace the above database information with your real database information, and when a message similar to the following appears, the task is successfully built and submitted.
##### Check and Verify

```shell
Job has been submitted with JobID 0177b201a407045a17445aa288f0f111
```
Check the target OceanBase database, you should find out these two tables and one row data.

The tool automatically parses the information on the command line and creates a table, which can be queried and verified in OceanBase.

MySQL to insert test data
You can go on insert test data to MySQL database as below:

```sql
INSERT INTO test_db.test_history_str (itemid,clock,value,ns) VALUES
(1,2,'ces1',1123);
INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
(1,21131,'ces1',21321),
(2,21321,'ces2',12321);
```

Since it is a CDC task, after data is inserted in MySQL, you can query and verify the synchronized data in OceanBase.

### Parameter parsing

This configuration is the program configuration information of flink

```shell
-Dexecution.checkpointing.interval=10s
-Dparallelism.default=1
```

Specify the JAR package of the program and the entry of the program

```shell
-c com.oceanbase.connector.flink.tools.cdc.CdcTools \
lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
```

The name of the database

```shell
--database test_db
```

This name is customized, meaning the name given to this database, and ultimately serves as the naming rule for flink tasks.

## Configuration Items

#### Supported data sources

| Data source identifier | Data source |
|-------------------------|----------------------|
| mysql-sync-database | mysql datasource |
| oracle-sync-database | oracle datasource |
| postgres-sync-database | postgres datasource |
| sqlserver-sync-database | sqlserver datasource |
| db2-sync-database | db2 datasource |

#### Configuration Items
#### Options

| Configuration Items | Comment |
| Option | Comment |
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| {identifier} | Data source identifier, only mysql type `mysql-sync-database` is verified now. |
| --job-name | Flink task name, optional. |
| --database | Database name synchronized to OceanBase. |
| --table-prefix | OceanBase table prefix name, such as --table-prefix ods_. |
Expand All @@ -173,13 +113,13 @@ This name is customized, meaning the name given to this database, and ultimately
| --ignore-default-value | Turn off synchronization of MySQL table structures by default. It is suitable for the situation when synchronizing MySQL data to oceanbase, the field has a default value, but the actual inserted data is null. |
| --create-table-only | Whether to only synchronize the structure of the table. |

#### Configuration items of sink-conf
`--sink-conf` option

| Configuration Items | Default Value | Required | Comment |
|---------------------|---------------|----------|-------------------------------------------------------------------|
| url | -- | N | jdbc connection information, such as: jdbc:mysql://127.0.0.1:2881 |
| username | -- | Y | Username to access oceanbase |
| password | -- | Y | Password to access oceanbase |
| Option | Default Value | Required | Comment |
|----------|---------------|----------|-------------------------------------------------------------------|
| url | -- | N | jdbc connection information, such as: jdbc:mysql://127.0.0.1:2881 |
| username | -- | Y | Username to access oceanbase |
| password | -- | Y | Password to access oceanbase |

## Reference information

Expand Down
Loading

0 comments on commit c38bd8f

Please sign in to comment.