diff --git a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java index 4ff8343a..95ed6f9a 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java @@ -60,10 +60,12 @@ public VitessSnapshotChangeEventSource( this.connection = connectionFactory.mainConnection(); this.schema = schema; if (connectorConfig.getVitessTaskKeyShards() == null) { - this.shards = new VitessMetadata(connectorConfig).getShards(); + shards = new VitessMetadata(connectorConfig).getShards(); + LOGGER.info("Single task mode, using all shards for snapshot {}", shards); } else { - this.shards = connectorConfig.getVitessTaskKeyShards(); + shards = connectorConfig.getVitessTaskKeyShards(); + LOGGER.info("Multi task mode, for task {} using shards {}", connectorConfig.getVitessTaskKey(), shards); } } @@ -83,7 +85,7 @@ protected Set getAllTableIds(RelationalSnapshotContext 1) { + LOGGER.warn("Snapshot schema change event has tables size > 1 {}", schemaChangeEvent); } + Table table = schemaChangeEvent.getTables().iterator().next(); + Table updatedTable = getTableWithShardAsCatalog(table, shard); + LOGGER.info("Adding schema for table {}", updatedTable.id()); + snapshotContext.tables.overwriteTable(updatedTable); } } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java index fc8d4f13..ccab7b42 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java +++ b/src/main/java/io/debezium/connector/vitess/VitessValueConverter.java @@ -5,12 +5,16 @@ */ package io.debezium.connector.vitess; +import static io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode.FAIL; + +import java.io.IOException; import java.math.BigDecimal; import java.sql.Timestamp; import java.sql.Types; import java.time.Duration; import java.time.LocalDate; import java.time.ZoneOffset; +import java.util.Arrays; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -19,10 +23,14 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary; + import io.debezium.DebeziumException; +import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.data.Json; import io.debezium.jdbc.JdbcValueConverters; @@ -31,6 +39,7 @@ import io.debezium.relational.RelationalChangeRecordEmitter; import io.debezium.relational.ValueConverter; import io.debezium.time.Year; +import io.debezium.util.Loggings; import io.debezium.util.Strings; import io.vitess.proto.Query; @@ -44,6 +53,7 @@ public class VitessValueConverter extends JdbcValueConverters { private final boolean includeUnknownDatatypes; private final VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode; + private final CommonConnectorConfig.EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode; private static final Pattern DATE_FIELD_PATTERN = Pattern.compile("([0-9]*)-([0-9]*)-([0-9]*)"); private static final Pattern TIME_FIELD_PATTERN = Pattern.compile("(\\-?[0-9]*):([0-9]*)(:([0-9]*))?(\\.([0-9]*))?"); @@ -55,6 +65,7 @@ public static VitessValueConverter getInstance(VitessConnectorConfig config) { ZoneOffset.UTC, config.binaryHandlingMode(), config.includeUnknownDatatypes(), + config.getEventConvertingFailureHandlingMode(), config.getBigIntUnsgnedHandlingMode()); } @@ -64,10 +75,12 @@ public VitessValueConverter( ZoneOffset defaultOffset, BinaryHandlingMode binaryMode, boolean includeUnknownDatatypes, + CommonConnectorConfig.EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode, VitessConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode) { super(decimalMode, temporalPrecisionMode, defaultOffset, null, null, binaryMode); this.includeUnknownDatatypes = includeUnknownDatatypes; this.bigIntUnsignedHandlingMode = bigIntUnsignedHandlingMode; + this.eventConvertingFailureHandlingMode = eventConvertingFailureHandlingMode; } // Get Kafka connect schema from Debezium column. @@ -115,6 +128,9 @@ public SchemaBuilder schemaBuilder(Column column) { @Override public ValueConverter converter(Column column, Field fieldDefn) { String typeName = column.typeName().toUpperCase(); + if (matches(typeName, "JSON")) { + return (data) -> convertJson(column, fieldDefn, data); + } if (matches(typeName, Query.Type.ENUM.name())) { return (data) -> convertEnumToString(column.enumValues(), column, fieldDefn, data); } @@ -179,6 +195,49 @@ protected Object convertDurationToMicroseconds(Column column, Field fieldDefn, O }); } + /** + * Ported from MySqlConnector. + * TODO: Migrate to Binlog converters so we can reuse this logic + * + * Convert the {@link String} {@code byte[]} value to a string value used in a {@link SourceRecord}. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls + */ + protected Object convertJson(Column column, Field fieldDefn, Object data) { + return convertValue(column, fieldDefn, data, "{}", (r) -> { + if (data instanceof byte[]) { + // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility + // to parse the database's internal binary representation into a JSON string, using the standard formatter. + if (((byte[]) data).length == 0) { + r.deliver(column.isOptional() ? null : "{}"); + } + else { + try { + r.deliver(JsonBinary.parseAsString((byte[]) data)); + } + catch (IOException e) { + if (eventConvertingFailureHandlingMode == FAIL) { + Loggings.logErrorAndTraceRecord(logger, Arrays.toString((byte[]) data), + "Failed to parse and read a JSON value on '{}'", column, e); + throw new DebeziumException("Failed to parse and read a JSON value on '" + column + "'", e); + } + Loggings.logWarningAndTraceRecord(logger, Arrays.toString((byte[]) data), + "Failed to parse and read a JSON value on '{}'", column, e); + r.deliver(column.isOptional() ? null : "{}"); + } + } + } + else if (data instanceof String) { + // The SnapshotReader sees JSON values as UTF-8 encoded strings. + r.deliver(data); + } + }); + } + /** * Convert original value insertion of type 'BIGINT' into the correct BIGINT UNSIGNED representation * Note: Unsigned BIGINT (64-bit) is represented in 'BigDecimal' data type. Reference: https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 9bffb4d9..4e0f7cfe 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -415,8 +415,8 @@ public static Vgtid defaultVgtid(VitessConnectorConfig config) { if (config.offsetStoragePerTask()) { List shards = config.getVitessTaskKeyShards(); vgtid = config.getVitessTaskVgtid(); - LOGGER.info("VGTID '{}' is set for the keyspace: {} shards: {}", - vgtid, config.getKeyspace(), shards); + LOGGER.info("VGTID is set for the keyspace: {}, shards: {}, vgtid: {}", + config.getKeyspace(), shards, vgtid); } else { // If offset storage per task is disabled, then find the vgtid elsewhere diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index fddd40a1..e670a4c0 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -199,6 +199,7 @@ public void shouldOnlySnapshotSchemaOnce() throws Exception { startConnector(config -> config .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) @@ -221,6 +222,7 @@ public void shouldOnlySnapshotSchemaOnce() throws Exception { startConnector(config -> config .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER) .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) @@ -277,6 +279,7 @@ public void shouldMigrateToEnabledSchemaChanges() throws Exception { .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) // Enable schema change .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.RECOVERY) // Other configs from before .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) @@ -310,6 +313,7 @@ public void canDisableSchemaHistory() throws Exception { startConnector(config -> config .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false") + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER), true); assertConnectorIsRunning(); @@ -335,6 +339,7 @@ public void canDisableSchemaHistory() throws Exception { startConnector(config -> config .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false") + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER), true); @@ -366,6 +371,7 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception TestHelper.applyVSchema("vitess_vschema.json"); startConnector(config -> config .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) .with(VitessConnectorConfig.VTGATE_JDBC_PORT, VitessConnectorConfig.DEFAULT_VTGATE_JDBC_PORT) @@ -408,6 +414,7 @@ public void shouldSnapshotSchemaAndIgnoreOtherTables() throws Exception { TestHelper.applyVSchema("vitess_vschema.json"); startConnector(config -> config .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) @@ -440,6 +447,7 @@ public void shouldReceiveSnapshotAndSchemaChangeEvents() throws Exception { TestHelper.executeDDL("vitess_create_tables.ddl"); startConnector(config -> config .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.ddl_table") .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), @@ -474,6 +482,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio // startConnector(); startConnector(config -> config .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true) .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), false); assertConnectorIsRunning(); @@ -484,7 +493,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio TEST_UNSHARDED_KEYSPACE, "ddl_table"); - TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);"); + TestHelper.execute("INSERT INTO ddl_table (id, int_unsigned_col, json_col) VALUES (1, 2, '{\"1\":2}');"); TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;"); int expectedDataChangeRecords = 1; diff --git a/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java b/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java index 954c92e4..7ffc5f91 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java @@ -25,7 +25,9 @@ import io.debezium.connector.vitess.connection.VStreamOutputMessageDecoder; import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; import io.debezium.relational.Table; +import io.debezium.relational.TableEditor; import io.debezium.relational.ValueConverter; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.schema.SchemaNameAdjuster; @@ -51,6 +53,7 @@ public void before() { ZoneOffset.UTC, config.binaryHandlingMode(), config.includeUnknownDatatypes(), + config.getEventConvertingFailureHandlingMode(), config.getBigIntUnsgnedHandlingMode()); schema = new VitessDatabaseSchema( config, @@ -152,6 +155,29 @@ public void shouldConverterWorkForEnumColumnString() throws InterruptedException assertThat(valueConverter.convert(expectedValue)).isEqualTo(expectedValue); } + @Test + public void shouldConverterWorkForJson() throws InterruptedException { + ColumnEditor editor = io.debezium.relational.Column.editor() + .name("json_col") + .type(Query.Type.JSON.toString()) + .jdbcType(Types.OTHER) + .optional(true); + List cols = List.of(editor.create()); + TableEditor tableEditor = Table + .editor() + .addColumns(cols) + .tableId(TestHelper.defaultTableId()); + schema.applySchemaChangesForTable(tableEditor.create()); + Table table = schema.tableFor(TestHelper.defaultTableId()); + + Column column = table.columnWithName("json_col"); + Field field = new Field("json", 0, Schema.STRING_SCHEMA); + ValueConverter valueConverter = converter.converter(column, field); + String expectedValue = "a"; + assertThat(valueConverter.convert(expectedValue)).isInstanceOf(String.class); + assertThat(valueConverter.convert(expectedValue)).isEqualTo(expectedValue); + } + @Test public void shouldConverterReturnEmptyStringForInvalid() throws InterruptedException { decoder.processMessage(enumFieldEventString(), null, null, false, true); diff --git a/src/test/resources/vitess_create_tables.ddl b/src/test/resources/vitess_create_tables.ddl index 84b48cd3..8f7b8701 100644 --- a/src/test/resources/vitess_create_tables.ddl +++ b/src/test/resources/vitess_create_tables.ddl @@ -25,6 +25,7 @@ CREATE TABLE ddl_table ( id BIGINT NOT NULL AUTO_INCREMENT, int_unsigned_col INT UNSIGNED DEFAULT 0, + json_col JSON, PRIMARY KEY (id) ) PARTITION BY RANGE (id) (