diff --git a/docs/data_node/extract_node/mysql-cdc.md b/docs/data_node/extract_node/mysql-cdc.md index 63ab038b57f..6e98e076cc2 100644 --- a/docs/data_node/extract_node/mysql-cdc.md +++ b/docs/data_node/extract_node/mysql-cdc.md @@ -105,7 +105,7 @@ Flink SQL> CREATE TABLE mysql_extract_node ( 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabaseName', - 'table-name' = 'YourTableName'); + 'table-name' = 'YourDatabaseName.YourTableName'); -- Read snapshot and binlogs from mysql_extract_node Flink SQL> SELECT * FROM mysql_extract_node; @@ -125,37 +125,37 @@ TODO: It will be supported in the future. ## MySQL Extract Node Options -|Option| Required| Default| Type| Description| -| --- | --- | --- | --- | --- | -| connector | required | (none) | String | Specify what connector to use, here should be `'mysql-cdc-inlong'`.| -| hostname | required | (none) | String | IP address or hostname of the MySQL database server. | -| username | required | (none) | String | Name of the MySQL database to use when connecting to the MySQL database server.| -| password | required | (none) | String | Password to use when connecting to the MySQL database server.| -| database-name | required | (none) | String | Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression.| -| table-name | required | (none) | String | Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression.| -| port | optional | 3306 | Integer | Integer port number of the MySQL database server.| -| server-id | optional | (none) | Integer | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.| -| scan.incremental.snapshot.enabled | optional | true | Boolean | Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism,the incremental snapshot has many advantages, including:(1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism. Please see [Incremental Snapshot Reading](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading) section for more detailed information.| -| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.| -| scan.snapshot.fetch.size | optional | 1024 | Integer | The maximum fetch size for per poll when read table snapshot.| -| scan.startup.mode | optional | initial | String | Optional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", "latest-offset", "specific-offset" and "timestamp". Please see [Startup Reading Position](#startup-reading-position) section for more detailed information.| -|scan.startup.specific-offset.file |optional |(none) |String |Optional binlog file name used in case of "specific-offset" startup mode | -| scan.startup.specific-offset.pos| optional |>(none) | Long | Optional binlog file position used in case of "specific-offset" startup mode| -| scan.startup.specific-offset.gtid-set| optional| none) | String | Optional GTID set used in case of "specific-offset" startup mode| -| scan.startup.specific-offset.skip-events |optional | (none) | Long | number of events to skip after the specific starting offset| -| scan.startup.specific-offset.skip-rows | optional | (none) | Long | Optional number of rows to skip after the specific starting offset| -|server-time-zone |optional |UTC |String |The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more [here](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types").| -| debezium.min.row.count.to.stream.result | optional | 1000 | Integer | During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot.| -| connect.timeout | optional | 30s | Duration | The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.| -| connect.max-retries| optional| 3| Integer| The max retry times that the connector should retry to build MySQL database server connection.| -| connection.pool.size| optional| 20| Integer| The connection pool size.| -| jdbc.properties.* | optional| 20 | String| Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.| -| heartbeat.interval| optional| 30s| Duration | The interval of sending heartbeat event for tracing the latest available binlog offsets.| -| append-mode | optional | false | Boolean | Whether to support append only, if true the MySQL Extract Node will Convert all upsert streams to append streams to support downstream scenarios where upsert streams are not supported.| -| migrate-all | optional | false | Boolean | Whether it is a full database migration scenario, if it is 'true', MySQL Extract Node will compress the physical fields and other meta fields of the table into 'json'. The special 'data' meta field of the format, currently supports two data formats, if you need data in 'canal json' format, then use the 'data_canal' metadata field, or use the 'data_debezium' metadata field if data in 'debezium json' format is required.| -| row-kinds-filtered| optional| false| Boolean | The specific operation type that needs to be retained, where +U corresponds to the data before the update, -U corresponds to the updated data, and +I corresponds to the data before the update. Inserted data (the existing data is the data of the insert type), -D represents the deleted data, if you want to keep multiple operation types, use & connection. For example +I&-D, the connector will only output the inserted and deleted data, and the updated data will not be output. | -| debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. For example: `'debezium.snapshot.mode' = 'never'`. See more about the [Debezium's MySQL Connector properties](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties)| -| inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]. | +| Option | Required | Default | Type | Description | +|------------------------------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | required | (none) | String | Specify what connector to use, here should be `'mysql-cdc-inlong'`. | +| hostname | required | (none) | String | IP address or hostname of the MySQL database server. | +| username | required | (none) | String | Name of the MySQL database to use when connecting to the MySQL database server. | +| password | required | (none) | String | Password to use when connecting to the MySQL database server. | +| database-name | required | (none) | String | Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression. | +| table-name | required | (none) | String | Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression. The format should be DatabaseName.TableName | +| port | optional | 3306 | Integer | Integer port number of the MySQL database server. | +| server-id | optional | (none) | Integer | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. | +| scan.incremental.snapshot.enabled | optional | true | Boolean | Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism,the incremental snapshot has many advantages, including:(1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism. Please see [Incremental Snapshot Reading](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading) section for more detailed information. | +| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. | +| scan.snapshot.fetch.size | optional | 1024 | Integer | The maximum fetch size for per poll when read table snapshot. | +| scan.startup.mode | optional | initial | String | Optional startup mode for MySQL CDC consumer, valid enumerations are "initial", "earliest-offset", "latest-offset", "specific-offset" and "timestamp". Please see [Startup Reading Position](#startup-reading-position) section for more detailed information. | +| scan.startup.specific-offset.file | optional | (none) | String | Optional binlog file name used in case of "specific-offset" startup mode | +| scan.startup.specific-offset.pos | optional | >(none) | Long | Optional binlog file position used in case of "specific-offset" startup mode | +| scan.startup.specific-offset.gtid-set | optional | none) | String | Optional GTID set used in case of "specific-offset" startup mode | +| scan.startup.specific-offset.skip-events | optional | (none) | Long | number of events to skip after the specific starting offset | +| scan.startup.specific-offset.skip-rows | optional | (none) | Long | Optional number of rows to skip after the specific starting offset | +| server-time-zone | optional | UTC | String | The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more [here](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types"). | +| debezium.min.row.count.to.stream.result | optional | 1000 | Integer | During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot. | +| connect.timeout | optional | 30s | Duration | The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. | +| connect.max-retries | optional | 3 | Integer | The max retry times that the connector should retry to build MySQL database server connection. | +| connection.pool.size | optional | 20 | Integer | The connection pool size. | +| jdbc.properties.* | optional | 20 | String | Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'. | +| heartbeat.interval | optional | 30s | Duration | The interval of sending heartbeat event for tracing the latest available binlog offsets. | +| append-mode | optional | false | Boolean | Whether to support append only, if true the MySQL Extract Node will Convert all upsert streams to append streams to support downstream scenarios where upsert streams are not supported. | +| migrate-all | optional | false | Boolean | Whether it is a full database migration scenario, if it is 'true', MySQL Extract Node will compress the physical fields and other meta fields of the table into 'json'. The special 'data' meta field of the format, currently supports two data formats, if you need data in 'canal json' format, then use the 'data_canal' metadata field, or use the 'data_debezium' metadata field if data in 'debezium json' format is required. | +| row-kinds-filtered | optional | false | Boolean | The specific operation type that needs to be retained, where +U corresponds to the data before the update, -U corresponds to the updated data, and +I corresponds to the data before the update. Inserted data (the existing data is the data of the insert type), -D represents the deleted data, if you want to keep multiple operation types, use & connection. For example +I&-D, the connector will only output the inserted and deleted data, and the updated data will not be output. | +| debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. For example: `'debezium.snapshot.mode' = 'never'`. See more about the [Debezium's MySQL Connector properties](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties) | +| inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]. | ## Available Metadata @@ -204,7 +204,7 @@ CREATE TABLE `mysql_extract_node` ( 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabase', - 'table-name' = 'YourTable', + 'table-name' = 'YourDatabase.YourTable', 'row-kinds-filtered' = '+I' ); ``` diff --git a/docs/data_node/load_node/iceberg.md b/docs/data_node/load_node/iceberg.md index 3eb0a67e43e..b0577b466a5 100644 --- a/docs/data_node/load_node/iceberg.md +++ b/docs/data_node/load_node/iceberg.md @@ -164,7 +164,8 @@ WITH ( 'sink.multiple.format' = 'canal-json', 'sink.multiple.add-column.policy' = 'TRY_IT_BEST', 'sink.multiple.database-pattern' = '${database}', - 'sink.multiple.table-pattern' = 'test_${table}' + 'sink.multiple.table-pattern' = 'test_${table}', + 'sink.multiple.auto-create-table-when-snapshot' = 'true' ); ``` To support multiple sink, it is necessary to set the serialization format of upstream data @@ -246,24 +247,25 @@ Iceberg support schema evolution from source table to target table in multiple s ## Iceberg Load Node Options -| Option | Required | Default | Type | Description | -| ---------------- | ------------------------------------------- | ------- | ------- | ------------------------------------------------------------ | -| connector | required | (none) | String | Specify what connector to use, here should be `'iceberg'`. | -| catalog-type | required | hive | String | `hive` or `hadoop` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. | -| catalog-name | required | (none) | String | Catalog name. | -| catalog-database | required | (none) | String | Database name managed in the iceberg catalog. | -| catalog-table | required | (none) | String | Table name managed in the underlying iceberg catalog and database. | -| catalog-impl | optional for custom catalog | (none) | String | The fully-qualified class name custom catalog implementation, must be set if `catalog-type` is unset. | -| cache-enabled | optional | true | Boolean | Whether to enable catalog cache, default value is `true` | -| uri | required for hive catalog | (none) | String | The Hive metastore’s thrift URI. | -| clients | optional for hive catalog | 2 | Integer | The Hive metastore client pool size, default value is 2. | -| warehouse | optional for hadoop catalog or hive catalog | (none) | String | For Hive catalog,is the Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath. For hadoop catalog,The HDFS directory to store metadata files and data files. | -| hive-conf-dir | optional for hive catalog | (none) | String | Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog. | -| inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId=`{groupId}`&streamId=`{streamId}`&nodeId=`{nodeId}`. | -| sink.multiple.enable | optional | false | Boolean | Whether to enable multiple sink | -| sink.multiple.schema-update.policy | optional | TRY_IT_BEST | Enum | The policy to handle the inconsistency between the schema in the data and the schema of the target table
TRY_IT_BEST: try best, deal with as much as possible, ignore it if can't handled.
IGNORE_WITH_LOG:ignore it and log it,ignore this table later.
THROW_WITH_STOP:throw exception and stop the job, until user deal with schema conflict and job restore. -| sink.multiple.pk-auto-generated | optional | false | Boolean | Whether auto generate primary key, regard all field combined as primary key in multiple sink scenes. | -| sink.multiple.typemap-compatible-with-spark | optional | false | Boolean | Whether to adapt spark type system in auto generate table. | +| Option | Required | Default | Type | Description | +|-----------------------------------------------|---------------------------------------------|-------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | required | (none) | String | Specify what connector to use, here should be `'iceberg'`. | +| catalog-type | required | hive | String | `hive` or `hadoop` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. | +| catalog-name | required | (none) | String | Catalog name. | +| catalog-database | required | (none) | String | Database name managed in the iceberg catalog. | +| catalog-table | required | (none) | String | Table name managed in the underlying iceberg catalog and database. | +| catalog-impl | optional for custom catalog | (none) | String | The fully-qualified class name custom catalog implementation, must be set if `catalog-type` is unset. | +| cache-enabled | optional | true | Boolean | Whether to enable catalog cache, default value is `true` | +| uri | required for hive catalog | (none) | String | The Hive metastore’s thrift URI. | +| clients | optional for hive catalog | 2 | Integer | The Hive metastore client pool size, default value is 2. | +| warehouse | optional for hadoop catalog or hive catalog | (none) | String | For Hive catalog,is the Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath. For hadoop catalog,The HDFS directory to store metadata files and data files. | +| hive-conf-dir | optional for hive catalog | (none) | String | Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog. | +| inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId=`{groupId}`&streamId=`{streamId}`&nodeId=`{nodeId}`. | +| sink.multiple.enable | optional | false | Boolean | Whether to enable multiple sink | +| sink.multiple.schema-update.policy | optional | TRY_IT_BEST | Enum | The policy to handle the inconsistency between the schema in the data and the schema of the target table
TRY_IT_BEST: try best, deal with as much as possible, ignore it if can't handled.
IGNORE_WITH_LOG:ignore it and log it,ignore this table later.
THROW_WITH_STOP:throw exception and stop the job, until user deal with schema conflict and job restore. | +| sink.multiple.pk-auto-generated | optional | false | Boolean | Whether auto generate primary key, regard all field combined as primary key in multiple sink scenes. | +| sink.multiple.typemap-compatible-with-spark | optional | false | Boolean | Whether to adapt spark type system in auto generate table. | +| sink.multiple.auto-create-table-when-snapshot | optional | false | Boolean | Whether to generate table at snapshot phase. | ## Data Type Mapping diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md index f023ebdb325..7506c561035 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md @@ -102,7 +102,7 @@ Flink SQL> CREATE TABLE mysql_extract_node ( 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabaseName', - 'table-name' = 'YourTableName'); + 'table-name' = 'YourDatabaseName.YourTableName'); Flink SQL> SELECT * FROM mysql_extract_node; ``` @@ -121,37 +121,37 @@ TODO: 将在未来支持此功能。 ## MySQL Extract 节点参数 -| 参数 | 是否必须 | 默认值 | 数据类型 | 描述 | -| --- | --- | --- | --- | --- | -| connector | required | (none) | String | 指定要使用的连接器,这里应该是 `'mysql-cdc-inlong'`。 | -| hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 | -| username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户名称。 | -| password | required | (none) | String | 连接到 MySQL 数据库服务器时使用的密码。 | -| database-name | required | (none) | String | 要监控的 MySQL 服务器的数据库名称。 database-name 还支持正则表达式来监控多个表是否匹配正则表达式。 | -| table-name | required | (none) | String | 要监控的 MySQL 数据库的表名。 table-name 还支持正则表达式来监控多个表是否匹配正则表达式。 | -| port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 | -| server-id | optional | (none) | Integer | 此数据库客户端的数字 Id 或数字 Id 范围,数字 Id 语法类似于 '5400', 数字 Id 范围语法类似于 '5400-5408',启用 'scan.incremental.snapshot.enabled' 时建议使用数字 Id 范围语法。 在 MySQL 集群中所有当前运行的数据库进程中,每个 Id 都必须是唯一的。此连接器加入 MySQL 集群 作为另一台服务器(具有此唯一 Id),因此它可以读取 Binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数, 尽管我们建议设置一个明确的值。 | -| scan.incremental.snapshot.enabled | optional | true | Boolean | 增量快照是一种读取表快照的新机制。与旧的快照机制相比, 增量快照有很多优点,包括: (1) 快照读取时 Source 可以并行, (2) Source 可以在快照读取时在 Chunk 粒度上进行检查点, (3) Source 在读快照前不需要获取全局读锁(FLUSH TABLES WITH READ LOCK)。 如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,所以 'server-id' 必须是 '5400-6400' 这样的范围,并且范围必须大于并行度。 请参阅[增量快照阅读](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading)部分了解更多详细信息。 | -| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。 | -| scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次轮询的最大获取大小。 | -| scan.startup.mode | optional | initial | String | MySQL CDC 消费者的可选启动模式,有效枚举为 'initial' 和 'latest-offset'。 请参阅[启动阅读位置](#启动模式)部分了解更多详细信息。 | -| scan.startup.specific-offset.file | optional | (none) | String | 在 'specific-offset' 启动模式下,启动位点的 binlog 文件名。| -| scan.startup.specific-offset.pos| optional | (none) | Long | 在 'specific-offset' 启动模式下,启动位点的 binlog 文件位置。| -| scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 'specific-offset' 启动模式下,启动位点的 GTID 集合。| -| scan.startup.specific-offset.skip-events | optional | (none) | Long | 在指定的启动位点后需要跳过的事件数量。| -| scan.startup.specific-offset.skip-rows | optional | (none) | Long | 在指定的启动位点后需要跳过的数据行数量。| -| server-time-zone | optional | UTC | String | 数据库服务器中的会话时区,例如 'Asia/Shanghai'。 它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。 查看更多[这里](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types)。 | -| debezium.min.row. count.to.stream.result | optional | 1000 | Integer | 在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。此参数确定 MySQL 连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为 1,000。将此参数设置为'0'以跳过所有表大小检查并始终在快照期间流式传输所有结果。 | -| connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间。 | -| connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 | -| connection.pool.size | optional | 20 | Integer | 连接池大小。 | -| jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,例如 'jdbc.properties.useSSL' = 'false'。 | -| heartbeat.interval | optional | 30s | Duration | 发送心跳事件的时间间隔,用于跟踪最新可用的 Binlog 偏移量。 | -| append-mode | optional | false | Boolean | 是否仅支持 Append,如果为 'true',MySQL Extract Node 会将所有 Upsert 流转换为 Append 流,以支持不支持 Upsert 流的下游场景。 | -| migrate-all | optional | false | Boolean | 是否是全库迁移场景,如果为 'true',MySQL Extract Node 则将表的物理字段和其他元字段压缩成 'json' 格式的特殊 'data' 元字段, 目前支持两种 data 格式, 如果需要 'canal json' 格式的数据, 则使用 'data\_canal' 元数据字段,如果需要使用 'debezium json' 格式的数据则使用 'data\_debezium' 元数据字段。 | -| row-kinds-filtered | optional | false | Boolean | 需要保留的特定的操作类型,其中 +U 对应更新前的数据,-U 对应更新后的数据,+I 对应 插入的数据(存量数据为插入类型的数据),-D 代表删除的数据, 如需保留多个操作类型则使用 & 连接。 举例 +I&-D,connector 只会输出插入以及删除的数据,更新的数据则不会输出。 | -| debezium.* | optional | (none) | String | 将 Debezium 的属性传递给用于从 MySQL 服务器捕获数据更改的 Debezium Embedded Engine。 例如:`'debezium.snapshot.mode' = 'never'`。 详细了解 [Debezium 的 MySQL 连接器属性。](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties) | -| inlong.metric.labels | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。 | +| 参数 | 是否必须 | 默认值 | 数据类型 | 描述 | +|------------------------------------------|----------|---------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | required | (none) | String | 指定要使用的连接器,这里应该是 `'mysql-cdc-inlong'`。 | +| hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 | +| username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户名称。 | +| password | required | (none) | String | 连接到 MySQL 数据库服务器时使用的密码。 | +| database-name | required | (none) | String | 要监控的 MySQL 服务器的数据库名称。 database-name 还支持正则表达式来监控多个表是否匹配正则表达式。 | +| table-name | required | (none) | String | 要监控的 MySQL 数据库的表名。 table-name 还支持正则表达式来监控多个表是否匹配正则表达式。格式为 database-name.table-name | +| port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 | +| server-id | optional | (none) | Integer | 此数据库客户端的数字 Id 或数字 Id 范围,数字 Id 语法类似于 '5400', 数字 Id 范围语法类似于 '5400-5408',启用 'scan.incremental.snapshot.enabled' 时建议使用数字 Id 范围语法。 在 MySQL 集群中所有当前运行的数据库进程中,每个 Id 都必须是唯一的。此连接器加入 MySQL 集群 作为另一台服务器(具有此唯一 Id),因此它可以读取 Binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数, 尽管我们建议设置一个明确的值。 | +| scan.incremental.snapshot.enabled | optional | true | Boolean | 增量快照是一种读取表快照的新机制。与旧的快照机制相比, 增量快照有很多优点,包括: (1) 快照读取时 Source 可以并行, (2) Source 可以在快照读取时在 Chunk 粒度上进行检查点, (3) Source 在读快照前不需要获取全局读锁(FLUSH TABLES WITH READ LOCK)。 如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,所以 'server-id' 必须是 '5400-6400' 这样的范围,并且范围必须大于并行度。 请参阅[增量快照阅读](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/connectors/mysql-cdc.html#incremental-snapshot-reading)部分了解更多详细信息。 | +| scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。 | +| scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次轮询的最大获取大小。 | +| scan.startup.mode | optional | initial | String | MySQL CDC 消费者的可选启动模式,有效枚举为 'initial' 和 'latest-offset'。 请参阅[启动阅读位置](#启动模式)部分了解更多详细信息。 | +| scan.startup.specific-offset.file | optional | (none) | String | 在 'specific-offset' 启动模式下,启动位点的 binlog 文件名。 | +| scan.startup.specific-offset.pos | optional | (none) | Long | 在 'specific-offset' 启动模式下,启动位点的 binlog 文件位置。 | +| scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 'specific-offset' 启动模式下,启动位点的 GTID 集合。 | +| scan.startup.specific-offset.skip-events | optional | (none) | Long | 在指定的启动位点后需要跳过的事件数量。 | +| scan.startup.specific-offset.skip-rows | optional | (none) | Long | 在指定的启动位点后需要跳过的数据行数量。 | +| server-time-zone | optional | UTC | String | 数据库服务器中的会话时区,例如 'Asia/Shanghai'。 它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。 查看更多[这里](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types)。 | +| debezium.min.row. count.to.stream.result | optional | 1000 | Integer | 在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。此参数确定 MySQL 连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为 1,000。将此参数设置为'0'以跳过所有表大小检查并始终在快照期间流式传输所有结果。 | +| connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间。 | +| connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 | +| connection.pool.size | optional | 20 | Integer | 连接池大小。 | +| jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,例如 'jdbc.properties.useSSL' = 'false'。 | +| heartbeat.interval | optional | 30s | Duration | 发送心跳事件的时间间隔,用于跟踪最新可用的 Binlog 偏移量。 | +| append-mode | optional | false | Boolean | 是否仅支持 Append,如果为 'true',MySQL Extract Node 会将所有 Upsert 流转换为 Append 流,以支持不支持 Upsert 流的下游场景。 | +| migrate-all | optional | false | Boolean | 是否是全库迁移场景,如果为 'true',MySQL Extract Node 则将表的物理字段和其他元字段压缩成 'json' 格式的特殊 'data' 元字段, 目前支持两种 data 格式, 如果需要 'canal json' 格式的数据, 则使用 'data\_canal' 元数据字段,如果需要使用 'debezium json' 格式的数据则使用 'data\_debezium' 元数据字段。 | +| row-kinds-filtered | optional | false | Boolean | 需要保留的特定的操作类型,其中 +U 对应更新前的数据,-U 对应更新后的数据,+I 对应 插入的数据(存量数据为插入类型的数据),-D 代表删除的数据, 如需保留多个操作类型则使用 & 连接。 举例 +I&-D,connector 只会输出插入以及删除的数据,更新的数据则不会输出。 | +| debezium.* | optional | (none) | String | 将 Debezium 的属性传递给用于从 MySQL 服务器捕获数据更改的 Debezium Embedded Engine。 例如:`'debezium.snapshot.mode' = 'never'`。 详细了解 [Debezium 的 MySQL 连接器属性。](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties) | +| inlong.metric.labels | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。 | ## 可用的元数据字段 @@ -200,7 +200,7 @@ CREATE TABLE `mysql_extract_node` ( 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabase', - 'table-name' = 'YourTable', + 'table-name' = 'YourDatabase.YourTable', 'row-kinds-filtered' = '+I' ); ``` diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md index bd5f00d933c..550ef201794 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md @@ -164,7 +164,8 @@ WITH ( 'sink.multiple.format' = 'canal-json', 'sink.multiple.add-column.policy' = 'TRY_IT_BEST', 'sink.multiple.database-pattern' = '${database}', - 'sink.multiple.table-pattern' = 'test_${table}' + 'sink.multiple.table-pattern' = 'test_${table}', + 'sink.multiple.auto-create-table-when-snapshot' = 'true' ); ``` 要支持多表写入同时需要设置上游数据的序列化格式(通过选项 'sink.multiple.format' @@ -242,24 +243,25 @@ Iceberg在多表写入时支持同步源表结构变更到目标表(DDL同步 ## Iceberg Load 节点参数 -| 选项 | 是否必须 | 默认值 | 类型 | 描述 | -| ---------------- | -------------------------------- | ------ | ------- | ------------------------------------------------------------ | -| connector | 必需 | (none) | String | 指定要使用的连接器,这里应该是`'iceberg'` | -| catalog-type | 必需 | hive | String | `hive`或`hadoop`用于内置目录,或为使用 catalog-impl 的自定义目录实现未设置 | -| catalog-name | 必需 | (none) | String | 目录名称 | -| catalog-database | 必需 | (none) | String | 在Iceberg目录中管理的数据库名称 | -| catalog-table | 必需 | (none) | String | 在底层Iceberg目录和数据库中管理的表名 | -| catalog-impl | 自定义custom 可选 | (none) | String | 如果未设置,则必须设置完全限定的类名自定义目录实现`catalog-type` | -| cache-enabled | 可选 | true | Boolean | 是否启用目录缓存,默认值为`true` | -| uri | hive catalog可选 | (none) | String | Hive 元存储的 thrift URI | -| clients | hive catalog可选 | 2 | Integer | Hive Metastore 客户端池大小,默认值为 2 | -| warehouse | hive catalog或hadoop catalog可选 | (none) | String | 对于 Hive 目录,是 Hive 仓库位置,如果既不设置`hive-conf-dir`指定包含`hive-site.xml`配置文件的位置也不添加正确`hive-site.xml`的类路径,用户应指定此路径。对于hadoop目录,HDFS目录存放元数据文件和数据文件 | -| hive-conf-dir | hive catalog可选 | (none) | String | `hive-site.xml`包含将用于提供自定义 Hive 配置值的配置文件的目录的路径。如果同时设置和创建Iceberg目录时,`hive.metastore.warehouse.dir`from `/hive-site.xml`(或来自类路径的 hive 配置文件)的值将被该值覆盖。`warehouse``hive-conf-dir``warehouse` | -| inlong.metric.labels | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId=`{groupId}`&streamId=`{streamId}`&nodeId=`{nodeId}`。| -| sink.multiple.enable | 可选 | false | Boolean | 是否开启多路写入 | -| sink.multiple.schema-update.policy | 可选 | TRY_IT_BEST | Enum | 遇到数据中schema和目标表不一致时的处理策略
TRY_IT_BEST:尽力而为,尽可能处理,处理不了的则忽略
IGNORE_WITH_LOG:忽略并且记录日志,后续该表数据不再处理
THROW_WITH_STOP:抛异常并且停止任务,直到用户手动处理schema不一致的情况 -| sink.multiple.pk-auto-generated | 可选 | false | Boolean | 是否自动生成主键,对于多路写入自动建表时当源表无主键时是否将所有字段当作主键 | -| sink.multiple.typemap-compatible-with-spark | 可选 | false | Boolean | 是否适配spark的类型系统,对于多路写入自动建表时是否需要适配spark的类型系统 | +| 选项 | 是否必须 | 默认值 | 类型 | 描述 | +|-----------------------------------------------|-------------------------------|-------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | 必需 | (none) | String | 指定要使用的连接器,这里应该是`'iceberg'` | +| catalog-type | 必需 | hive | String | `hive`或`hadoop`用于内置目录,或为使用 catalog-impl 的自定义目录实现未设置 | +| catalog-name | 必需 | (none) | String | 目录名称 | +| catalog-database | 必需 | (none) | String | 在Iceberg目录中管理的数据库名称 | +| catalog-table | 必需 | (none) | String | 在底层Iceberg目录和数据库中管理的表名 | +| catalog-impl | 自定义custom 可选 | (none) | String | 如果未设置,则必须设置完全限定的类名自定义目录实现`catalog-type` | +| cache-enabled | 可选 | true | Boolean | 是否启用目录缓存,默认值为`true` | +| uri | hive catalog可选 | (none) | String | Hive 元存储的 thrift URI | +| clients | hive catalog可选 | 2 | Integer | Hive Metastore 客户端池大小,默认值为 2 | +| warehouse | hive catalog或hadoop catalog可选 | (none) | String | 对于 Hive 目录,是 Hive 仓库位置,如果既不设置`hive-conf-dir`指定包含`hive-site.xml`配置文件的位置也不添加正确`hive-site.xml`的类路径,用户应指定此路径。对于hadoop目录,HDFS目录存放元数据文件和数据文件 | +| hive-conf-dir | hive catalog可选 | (none) | String | `hive-site.xml`包含将用于提供自定义 Hive 配置值的配置文件的目录的路径。如果同时设置和创建Iceberg目录时,`hive.metastore.warehouse.dir`from `/hive-site.xml`(或来自类路径的 hive 配置文件)的值将被该值覆盖。`warehouse``hive-conf-dir``warehouse` | +| inlong.metric.labels | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId=`{groupId}`&streamId=`{streamId}`&nodeId=`{nodeId}`。 | +| sink.multiple.enable | 可选 | false | Boolean | 是否开启多路写入 | +| sink.multiple.schema-update.policy | 可选 | TRY_IT_BEST | Enum | 遇到数据中schema和目标表不一致时的处理策略
TRY_IT_BEST:尽力而为,尽可能处理,处理不了的则忽略
IGNORE_WITH_LOG:忽略并且记录日志,后续该表数据不再处理
THROW_WITH_STOP:抛异常并且停止任务,直到用户手动处理schema不一致的情况 | +| sink.multiple.pk-auto-generated | 可选 | false | Boolean | 是否自动生成主键,对于多路写入自动建表时当源表无主键时是否将所有字段当作主键 | +| sink.multiple.typemap-compatible-with-spark | 可选 | false | Boolean | 是否适配spark的类型系统,对于多路写入自动建表时是否需要适配spark的类型系统 | +| sink.multiple.auto-create-table-when-snapshot | 可选 | false | Boolean | 是否在存量阶段自动创建 Iceberg 表 | ## 数据类型映射