Skip to content

Commit

Permalink
[Hotfix][MySQL-CDC] Fix read gbk varchar chinese garbled characters (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Jun 25, 2024
1 parent 77f6140 commit 4e4d2b8
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;

import java.sql.Connection;
import java.sql.SQLException;

import static io.debezium.connector.mysql.MySqlConnectorConfig.JDBC_DRIVER;

public class CustomMySqlConnectionConfiguration
extends MySqlConnection.MySqlConnectionConfiguration {

protected static final String URL_PATTERN =
"jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";

private final JdbcConnection.ConnectionFactory connectionFactory;

public CustomMySqlConnectionConfiguration(Configuration config) {
super(config);
String driverClassName =
config.getString(JDBC_DRIVER.name(), JDBC_DRIVER.defaultValueAsString());
connectionFactory =
JdbcConnection.patternBasedFactory(
URL_PATTERN, driverClassName, getClass().getClassLoader());
}

@Override
public JdbcConnection.ConnectionFactory factory() {
return new JdbcConnection.ConnectionFactory() {
@Override
public Connection connect(JdbcConfiguration config) throws SQLException {
return connectionFactory.connect(config);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;

import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -120,7 +121,8 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);

this.databaseSchema =
MySqlUtils.createMySqlDatabaseSchema(connectorConfig, tableIdCaseInsensitive);
MySqlConnectionUtils.createMySqlDatabaseSchema(
connectorConfig, tableIdCaseInsensitive);
this.offsetContext =
loadStartingOffsetState(
new MySqlOffsetContext.Loader(connectorConfig), sourceSplitBase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;

import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.CustomMySqlConnectionConfiguration;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
Expand All @@ -44,8 +45,7 @@ public class MySqlConnectionUtils {

/** Creates a new {@link MySqlConnection}, but not open the connection. */
public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) {
return new MySqlConnection(
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
return new MySqlConnection(new CustomMySqlConnectionConfiguration(dbzConfiguration));
}

/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,11 @@

import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
Expand Down Expand Up @@ -342,44 +334,6 @@ public static SeaTunnelRowType getSplitType(
return getSplitType(primaryKeys.get(0), dbzConnectorConfig);
}

/** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */
public static MySqlDatabaseSchema createMySqlDatabaseSchema(
MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig);
return new MySqlDatabaseSchema(
dbzMySqlConfig,
valueConverters,
topicSelector,
schemaNameAdjuster,
isTableIdCaseSensitive);
}

private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
TemporalPrecisionMode timePrecisionMode = dbzMySqlConfig.getTemporalPrecisionMode();
JdbcValueConverters.DecimalMode decimalMode = dbzMySqlConfig.getDecimalMode();
String bigIntUnsignedHandlingModeStr =
dbzMySqlConfig
.getConfig()
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
bigIntUnsignedHandlingModeStr);
JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
bigIntUnsignedHandlingMode.asBigIntUnsignedMode();

boolean timeAdjusterEnabled =
dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
return new MySqlValueConverters(
decimalMode,
timePrecisionMode,
bigIntUnsignedMode,
dbzMySqlConfig.binaryHandlingMode(),
timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
MySqlValueConverters::defaultParsingErrorHandler);
}

public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
return getBinlogPosition(dataRecord.sourceOffset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ CREATE TABLE mysql_cdc_e2e_source_table
`f_mediumtext` mediumtext,
`f_text` text,
`f_tinytext` tinytext,
`f_varchar` varchar(100) DEFAULT NULL,
`f_varchar` varchar(100) collate gbk_bin DEFAULT NULL,
`f_date` date DEFAULT NULL,
`f_datetime` datetime DEFAULT NULL,
`f_timestamp` timestamp NULL DEFAULT NULL,
Expand Down Expand Up @@ -333,7 +333,7 @@ VALUES ( 1, 0x616263740000000000000000000000000000000000000000000000000000000000
0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,
123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',
'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',
'This is a text field', 'This is a tiny text field', '中文测试', '2022-04-27', '2022-04-27 14:30:00',
'2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',
12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),
Expand Down

0 comments on commit 4e4d2b8

Please sign in to comment.