Skip to content

Commit

Permalink
DBZ-8325 Register correct schemas during snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 30, 2024
1 parent 0792f05 commit c55cc9b
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ public VitessSnapshotChangeEventSource(
this.connection = connectionFactory.mainConnection();
this.schema = schema;
if (connectorConfig.getVitessTaskKeyShards() == null) {
this.shards = new VitessMetadata(connectorConfig).getShards();
shards = new VitessMetadata(connectorConfig).getShards();
LOGGER.info("Single task mode, using all shards for snapshot {}", shards);
}
else {
this.shards = connectorConfig.getVitessTaskKeyShards();
shards = connectorConfig.getVitessTaskKeyShards();
LOGGER.info("Multi task mode, for task {} using shards {}", connectorConfig.getVitessTaskKey(), shards);
}
}

Expand All @@ -83,7 +85,7 @@ protected Set<TableId> getAllTableIds(RelationalSnapshotContext<VitessPartition,
catch (SQLException e) {
LOGGER.warn("\t skipping database '{}' due to error reading tables: {}", connectorConfig.getKeyspace(), e.getMessage());
}
LOGGER.debug("All table IDs {}", tableIds);
LOGGER.info("All table IDs {}", tableIds);
return tableIds;
}

Expand Down Expand Up @@ -117,12 +119,13 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
snapshotContext.partition, snapshotContext.offset, ddlMessage, connectorConfig.getKeyspace());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
LOGGER.info("Adding schema change event {}", schemaChangeEvent);
Table table = schema.tableFor(tableId);
if (table != null) {
LOGGER.info("Adding schema for table {}", table.id());
Table updatedTable = getTableWithShardAsCatalog(table, shard);
snapshotContext.tables.overwriteTable(updatedTable);
if (schemaChangeEvent.getTables().size() > 1) {
LOGGER.warn("Snapshot schema change event has tables size > 1 {}", schemaChangeEvent);
}
Table table = schemaChangeEvent.getTables().iterator().next();
Table updatedTable = getTableWithShardAsCatalog(table, shard);
LOGGER.info("Adding schema for table {}", updatedTable.id());
snapshotContext.tables.overwriteTable(updatedTable);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
*/
package io.debezium.connector.vitess;

import static io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode.FAIL;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Duration;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -19,10 +23,14 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.data.Json;
import io.debezium.jdbc.JdbcValueConverters;
Expand All @@ -31,6 +39,7 @@
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.ValueConverter;
import io.debezium.time.Year;
import io.debezium.util.Loggings;
import io.debezium.util.Strings;
import io.vitess.proto.Query;

Expand All @@ -44,6 +53,7 @@ public class VitessValueConverter extends JdbcValueConverters {

private final boolean includeUnknownDatatypes;
private final VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode;
private final CommonConnectorConfig.EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode;

private static final Pattern DATE_FIELD_PATTERN = Pattern.compile("([0-9]*)-([0-9]*)-([0-9]*)");
private static final Pattern TIME_FIELD_PATTERN = Pattern.compile("(\\-?[0-9]*):([0-9]*)(:([0-9]*))?(\\.([0-9]*))?");
Expand All @@ -55,6 +65,7 @@ public static VitessValueConverter getInstance(VitessConnectorConfig config) {
ZoneOffset.UTC,
config.binaryHandlingMode(),
config.includeUnknownDatatypes(),
config.getEventConvertingFailureHandlingMode(),
config.getBigIntUnsgnedHandlingMode());
}

Expand All @@ -64,10 +75,12 @@ public VitessValueConverter(
ZoneOffset defaultOffset,
BinaryHandlingMode binaryMode,
boolean includeUnknownDatatypes,
CommonConnectorConfig.EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode,
VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode) {
super(decimalMode, temporalPrecisionMode, defaultOffset, null, null, binaryMode);
this.includeUnknownDatatypes = includeUnknownDatatypes;
this.bigIntUnsignedHandlingMode = bigIntUnsignedHandlingMode;
this.eventConvertingFailureHandlingMode = eventConvertingFailureHandlingMode;
}

// Get Kafka connect schema from Debezium column.
Expand Down Expand Up @@ -115,6 +128,9 @@ public SchemaBuilder schemaBuilder(Column column) {
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
String typeName = column.typeName().toUpperCase();
if (matches(typeName, "JSON")) {
return (data) -> convertJson(column, fieldDefn, data);
}
if (matches(typeName, Query.Type.ENUM.name())) {
return (data) -> convertEnumToString(column.enumValues(), column, fieldDefn, data);
}
Expand Down Expand Up @@ -179,6 +195,49 @@ protected Object convertDurationToMicroseconds(Column column, Field fieldDefn, O
});
}

/**
* Ported from MySqlConnector.
* TODO: Migrate to Binlog converters so we can reuse this logic
*
* Convert the {@link String} {@code byte[]} value to a string value used in a {@link SourceRecord}.
*
* @param column the column in which the value appears
* @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never null
* @param data the data; may be null
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertJson(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, "{}", (r) -> {
if (data instanceof byte[]) {
// The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility
// to parse the database's internal binary representation into a JSON string, using the standard formatter.
if (((byte[]) data).length == 0) {
r.deliver(column.isOptional() ? null : "{}");
}
else {
try {
r.deliver(JsonBinary.parseAsString((byte[]) data));
}
catch (IOException e) {
if (eventConvertingFailureHandlingMode == FAIL) {
Loggings.logErrorAndTraceRecord(logger, Arrays.toString((byte[]) data),
"Failed to parse and read a JSON value on '{}'", column, e);
throw new DebeziumException("Failed to parse and read a JSON value on '" + column + "'", e);
}
Loggings.logWarningAndTraceRecord(logger, Arrays.toString((byte[]) data),
"Failed to parse and read a JSON value on '{}'", column, e);
r.deliver(column.isOptional() ? null : "{}");
}
}
}
else if (data instanceof String) {
// The SnapshotReader sees JSON values as UTF-8 encoded strings.
r.deliver(data);
}
});
}

/**
* Convert original value insertion of type 'BIGINT' into the correct BIGINT UNSIGNED representation
* Note: Unsigned BIGINT (64-bit) is represented in 'BigDecimal' data type. Reference: https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ public static Vgtid defaultVgtid(VitessConnectorConfig config) {
if (config.offsetStoragePerTask()) {
List<String> shards = config.getVitessTaskKeyShards();
vgtid = config.getVitessTaskVgtid();
LOGGER.info("VGTID '{}' is set for the keyspace: {} shards: {}",
vgtid, config.getKeyspace(), shards);
LOGGER.info("VGTID is set for the keyspace: {}, shards: {}, vgtid: {}",
config.getKeyspace(), shards, vgtid);
}
else {
// If offset storage per task is disabled, then find the vgtid elsewhere
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public void shouldOnlySnapshotSchemaOnce() throws Exception {

startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
Expand All @@ -221,6 +222,7 @@ public void shouldOnlySnapshotSchemaOnce() throws Exception {

startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
Expand Down Expand Up @@ -277,6 +279,7 @@ public void shouldMigrateToEnabledSchemaChanges() throws Exception {
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
// Enable schema change
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.RECOVERY)
// Other configs from before
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
Expand Down Expand Up @@ -310,6 +313,7 @@ public void canDisableSchemaHistory() throws Exception {
startConnector(config -> config
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER),
true);
assertConnectorIsRunning();
Expand All @@ -335,6 +339,7 @@ public void canDisableSchemaHistory() throws Exception {
startConnector(config -> config
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER),
true);

Expand Down Expand Up @@ -366,6 +371,7 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(VitessConnectorConfig.VTGATE_JDBC_PORT, VitessConnectorConfig.DEFAULT_VTGATE_JDBC_PORT)
Expand Down Expand Up @@ -408,6 +414,7 @@ public void shouldSnapshotSchemaAndIgnoreOtherTables() throws Exception {
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
Expand Down Expand Up @@ -440,6 +447,7 @@ public void shouldReceiveSnapshotAndSchemaChangeEvents() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.ddl_table")
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
Expand Down Expand Up @@ -474,6 +482,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio
// startConnector();
startConnector(config -> config
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
false);
assertConnectorIsRunning();
Expand All @@ -484,7 +493,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio
TEST_UNSHARDED_KEYSPACE,
"ddl_table");

TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);");
TestHelper.execute("INSERT INTO ddl_table (id, int_unsigned_col, json_col) VALUES (1, 2, '{\"1\":2}');");
TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;");

int expectedDataChangeRecords = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.debezium.connector.vitess.connection.VStreamOutputMessageDecoder;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.ValueConverter;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaNameAdjuster;
Expand All @@ -51,6 +53,7 @@ public void before() {
ZoneOffset.UTC,
config.binaryHandlingMode(),
config.includeUnknownDatatypes(),
config.getEventConvertingFailureHandlingMode(),
config.getBigIntUnsgnedHandlingMode());
schema = new VitessDatabaseSchema(
config,
Expand Down Expand Up @@ -152,6 +155,29 @@ public void shouldConverterWorkForEnumColumnString() throws InterruptedException
assertThat(valueConverter.convert(expectedValue)).isEqualTo(expectedValue);
}

@Test
public void shouldConverterWorkForJson() throws InterruptedException {
ColumnEditor editor = io.debezium.relational.Column.editor()
.name("json_col")
.type(Query.Type.JSON.toString())
.jdbcType(Types.OTHER)
.optional(true);
List<Column> cols = List.of(editor.create());
TableEditor tableEditor = Table
.editor()
.addColumns(cols)
.tableId(TestHelper.defaultTableId());
schema.applySchemaChangesForTable(tableEditor.create());
Table table = schema.tableFor(TestHelper.defaultTableId());

Column column = table.columnWithName("json_col");
Field field = new Field("json", 0, Schema.STRING_SCHEMA);
ValueConverter valueConverter = converter.converter(column, field);
String expectedValue = "a";
assertThat(valueConverter.convert(expectedValue)).isInstanceOf(String.class);
assertThat(valueConverter.convert(expectedValue)).isEqualTo(expectedValue);
}

@Test
public void shouldConverterReturnEmptyStringForInvalid() throws InterruptedException {
decoder.processMessage(enumFieldEventString(), null, null, false, true);
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/vitess_create_tables.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ CREATE TABLE ddl_table
(
id BIGINT NOT NULL AUTO_INCREMENT,
int_unsigned_col INT UNSIGNED DEFAULT 0,
json_col JSON,
PRIMARY KEY (id)
)
PARTITION BY RANGE (id) (
Expand Down

0 comments on commit c55cc9b

Please sign in to comment.