From dfc2008e79498e5df3e1c6826faa4e45415e2de1 Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 15 Oct 2024 17:45:59 -0400 Subject: [PATCH] DBZ-8325 Emit DDL events --- pom.xml | 13 ++ .../debezium/connector/vitess/SourceInfo.java | 4 + .../VitessChangeEventSourceFactory.java | 10 +- .../vitess/VitessConnectorConfig.java | 80 +++++++++- .../connector/vitess/VitessConnectorTask.java | 13 +- .../vitess/VitessDatabaseSchema.java | 150 +++++++++++++++++- .../connector/vitess/VitessMetadata.java | 2 +- .../vitess/VitessOffsetRetriever.java | 2 +- .../VitessSnapshotChangeEventSource.java | 114 +++++++++++-- .../vitess/VitessSourceInfoStructMaker.java | 6 +- .../VitessStreamingChangeEventSource.java | 27 +++- .../vitess/connection/DdlMessage.java | 17 +- .../vitess/connection/HeartbeatMessage.java | 5 + .../vitess/connection/OtherMessage.java | 5 + .../vitess/connection/ReplicationMessage.java | 2 + .../connection/TransactionalMessage.java | 5 + .../VStreamOutputMessageDecoder.java | 2 +- .../VStreamOutputReplicationMessage.java | 5 + .../VitessReplicationConnection.java | 15 ++ .../vitess/jdbc/VitessConnection.java | 35 ++++ .../vitess/AbstractVitessConnectorTest.java | 4 +- .../connector/vitess/SourceInfoTest.java | 4 +- .../debezium/connector/vitess/TestHelper.java | 13 ++ .../vitess/VitessBigIntUnsignedTest.java | 3 +- .../vitess/VitessChangeRecordEmitterTest.java | 13 +- .../connector/vitess/VitessConnectorIT.java | 133 ++++++++++++++-- .../vitess/VitessConnectorTaskTest.java | 14 +- .../VitessSourceInfoStructMakerTest.java | 4 +- .../connector/vitess/VitessTestCleanup.java | 39 +++++ .../vitess/VitessValueConverterTest.java | 3 +- .../vitess/connection/DdlMessageTest.java | 44 +++++ .../VStreamOutputMessageDecoderTest.java | 4 +- src/test/resources/vitess_create_tables.ddl | 10 ++ 33 files changed, 727 insertions(+), 73 deletions(-) create mode 100644 src/main/java/io/debezium/connector/vitess/jdbc/VitessConnection.java create mode 100644 src/test/java/io/debezium/connector/vitess/VitessTestCleanup.java create mode 100644 src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java diff --git a/pom.xml b/pom.xml index 64f32a59..afc87e8f 100644 --- a/pom.xml +++ b/pom.xml @@ -217,6 +217,19 @@ debezium-revapi provided + + + io.debezium + debezium-connector-mysql + ${version.debezium} + + + io.debezium + debezium-connector-binlog + ${version.debezium} + + + ch.qos.logback diff --git a/src/main/java/io/debezium/connector/vitess/SourceInfo.java b/src/main/java/io/debezium/connector/vitess/SourceInfo.java index 53e87513..75a793c1 100644 --- a/src/main/java/io/debezium/connector/vitess/SourceInfo.java +++ b/src/main/java/io/debezium/connector/vitess/SourceInfo.java @@ -114,4 +114,8 @@ public String toString() { + restartVgtid + '}'; } + + public String table() { + return tableId == null ? null : tableId.table(); + } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java b/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java index 3804997f..406abd79 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java +++ b/src/main/java/io/debezium/connector/vitess/VitessChangeEventSourceFactory.java @@ -6,7 +6,8 @@ package io.debezium.connector.vitess; import io.debezium.connector.vitess.connection.ReplicationConnection; -import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory; +import io.debezium.connector.vitess.jdbc.VitessConnection; +import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; @@ -31,15 +32,18 @@ public class VitessChangeEventSourceFactory implements ChangeEventSourceFactory< private final VitessDatabaseSchema schema; private final ReplicationConnection replicationConnection; private final SnapshotterService snapshotterService; + private final MainConnectionProvidingConnectionFactory connectionFactory; public VitessChangeEventSourceFactory( VitessConnectorConfig connectorConfig, + MainConnectionProvidingConnectionFactory connectionFactory, ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, VitessDatabaseSchema schema, ReplicationConnection replicationConnection, SnapshotterService snapshotterService) { this.connectorConfig = connectorConfig; + this.connectionFactory = connectionFactory; this.errorHandler = errorHandler; this.dispatcher = dispatcher; this.clock = clock; @@ -54,11 +58,11 @@ public SnapshotChangeEventSource getSnapsh // A dummy SnapshotChangeEventSource, snapshot is skipped. return new VitessSnapshotChangeEventSource( connectorConfig, - new DefaultMainConnectionProvidingConnectionFactory<>(() -> null), + this.connectionFactory, dispatcher, schema, clock, - null, + snapshotProgressListener, notificationService, snapshotterService); } diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 6437cee9..a4163fe1 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -30,6 +30,7 @@ import io.debezium.config.Field; import io.debezium.config.Field.ValidationOutput; import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.connector.mysql.charset.MySqlCharsetRegistryServiceProvider; import io.debezium.connector.vitess.connection.VitessTabletType; import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; @@ -39,15 +40,20 @@ import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.ColumnFilterMode; +import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.schema.SchemaNameAdjuster; import io.debezium.spi.topic.TopicNamingStrategy; +import io.debezium.util.Collect; /** * Vitess connector configuration, including its specific configurations and the common * configurations from Debezium. */ -public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig { +public class VitessConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig { public static final String CSV_DELIMITER = ","; @@ -55,6 +61,30 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig { private static final String VITESS_CONFIG_GROUP_PREFIX = "vitess."; private static final int DEFAULT_VTGATE_PORT = 15_991; + private static final int DEFAULT_VTGATE_JDBC_PORT = 15_306; + + /** + * Set of all built-in database names that will generally be ignored by the connector. + */ + protected static final Set BUILT_IN_DB_NAMES = Collect.unmodifiableSet( + "mysql", "performance_schema", "sys", "information_schema"); + + @Override + public JdbcConfiguration getJdbcConfig() { + JdbcConfiguration jdbcConfiguration = super.getJdbcConfig(); + JdbcConfiguration updatedConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit() + .with(JdbcConfiguration.DATABASE, getKeyspace()) + .with(JdbcConfiguration.PORT, getVtgateJdbcPort()) + .build()); + return updatedConfig; + } + + @Override + protected HistoryRecordComparator getHistoryRecordComparator() { + return new HistoryRecordComparator() { + + }; + } /** * The set of predefined SnapshotMode options or aliases. @@ -66,6 +96,8 @@ public enum SnapshotMode implements EnumeratedValue { */ INITIAL("initial"), + NO_DATA("no_data"), + /** * Never perform an initial snapshot and only receive new data changes. */ @@ -203,6 +235,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue .withValidation(Field::isInteger) .withDescription("Port of the Vitess VTGate gRPC server."); + public static final Field VTGATE_JDBC_PORT = Field.create(DATABASE_CONFIG_PREFIX + "jdbc." + JdbcConfiguration.PORT) + .withDisplayName("Vitess JDBC database port") + .withType(Type.INT) + .withWidth(Width.SHORT) + .withDefault(DEFAULT_VTGATE_JDBC_PORT) + .withImportance(ConfigDef.Importance.HIGH) + .withValidation(Field::isInteger) + .withDescription("Port of the Vitess VTGate JDBC server."); + public static final Field VTGATE_USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER) .withDisplayName("User") .withType(Type.STRING) @@ -470,6 +511,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field, GTID, VTGATE_HOST, VTGATE_PORT, + VTGATE_JDBC_PORT, VTGATE_USER, VTGATE_PASSWORD, TABLET_TYPE, @@ -536,11 +578,16 @@ public static ConfigDef configDef() { public VitessConnectorConfig(Configuration config) { super( + VitessConnector.class, config, - null, x -> x.schema() + "." + x.table(), + Tables.TableFilter.fromPredicate(VitessConnectorConfig::isNotBuiltInTable), + x -> x.schema() + "." + x.table(), + true, -1, ColumnFilterMode.SCHEMA, - true); + false); + + getServiceRegistry().registerServiceProvider(new MySqlCharsetRegistryServiceProvider()); } @Override @@ -649,6 +696,10 @@ public int getVtgatePort() { return getConfig().getInteger(VTGATE_PORT); } + public int getVtgateJdbcPort() { + return getConfig().getInteger(VTGATE_JDBC_PORT); + } + public String getVtgateUsername() { return getConfig().getString(VTGATE_USER); } @@ -754,6 +805,29 @@ public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, Schema return new VitessHeartbeatImpl(getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(), getLogicalName(), schemaNameAdjuster); } + /** + * Checks whether the {@link TableId} refers to a built-in table. + * + * @param tableId the relational table identifier, should not be null + * @return true if the reference refers to a built-in table + */ + public static boolean isNotBuiltInTable(TableId tableId) { + return !isBuiltInDatabase(tableId.catalog()); + } + + /** + * Check whether the specified database name is a built-in database. + * + * @param databaseName the database name to check + * @return true if the database is a built-in database; false otherwise + */ + public static boolean isBuiltInDatabase(String databaseName) { + if (databaseName == null) { + return false; + } + return BUILT_IN_DB_NAMES.contains(databaseName.toLowerCase()); + } + public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() { return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE), BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString()); diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java index 64609f91..d26db8ab 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java @@ -23,7 +23,11 @@ import io.debezium.connector.common.BaseSourceTask; import io.debezium.connector.vitess.connection.ReplicationConnection; import io.debezium.connector.vitess.connection.VitessReplicationConnection; +import io.debezium.connector.vitess.jdbc.VitessConnection; import io.debezium.connector.vitess.metrics.VitessChangeEventSourceMetricsFactory; +import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.ChangeEventSourceCoordinator; import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; @@ -37,6 +41,7 @@ import io.debezium.schema.SchemaNameAdjuster; import io.debezium.service.spi.ServiceRegistry; import io.debezium.snapshot.SnapshotterService; +import io.debezium.snapshot.SnapshotterServiceProvider; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; import io.debezium.util.LoggingContext; @@ -121,13 +126,18 @@ protected ChangeEventSourceCoordinator sta NotificationService notificationService = new NotificationService<>(getNotificationChannels(), connectorConfig, SchemaFactory.get(), dispatcher::enqueueNotification); + JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig(); + + MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( + () -> new VitessConnection(jdbcConfig)); + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( previousOffsets, errorHandler, VitessConnector.class, connectorConfig, new VitessChangeEventSourceFactory( - connectorConfig, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService), + connectorConfig, connectionFactory, errorHandler, dispatcher, clock, schema, replicationConnection, snapshotterService), connectorConfig.offsetStoragePerTask() ? new VitessChangeEventSourceMetricsFactory() : new DefaultChangeEventSourceMetricsFactory<>(), dispatcher, schema, @@ -225,6 +235,7 @@ protected Iterable getAllConfigurationFields() { @Override protected void registerServiceProviders(ServiceRegistry serviceRegistry) { + serviceRegistry.registerServiceProvider(new SnapshotterServiceProvider()); serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider()); } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java index 36c26130..e318e001 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java +++ b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java @@ -6,15 +6,24 @@ package io.debezium.connector.vitess; import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.apache.kafka.connect.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.relational.RelationalDatabaseSchema; +import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.relational.HistorizedRelationalDatabaseSchema; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.TableSchemaBuilder; +import io.debezium.relational.ddl.DdlChanges; +import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.ddl.DdlParserListener; +import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaNameAdjuster; import io.debezium.spi.topic.TopicNamingStrategy; @@ -22,9 +31,11 @@ * Logical in-memory representation of Vitess schema (a.k.a Vitess keyspace). It is used to create * kafka connect {@link Schema} for all tables. */ -public class VitessDatabaseSchema extends RelationalDatabaseSchema { +public class VitessDatabaseSchema extends HistorizedRelationalDatabaseSchema { private static final Logger LOGGER = LoggerFactory.getLogger(VitessDatabaseSchema.class); + private final DdlParser ddlParser; + public VitessDatabaseSchema( VitessConnectorConfig config, SchemaNameAdjuster schemaNameAdjuster, @@ -50,6 +61,12 @@ public VitessDatabaseSchema( false), false, config.getKeyMapper()); + this.ddlParser = new MySqlAntlrDdlParser( + true, + false, + config.isSchemaCommentsHistoryEnabled(), + getTableFilter(), + config.getServiceRegistry().getService(BinlogCharsetRegistry.class)); } /** Applies schema changes for the specified table. */ @@ -99,4 +116,133 @@ public static TableId parse(String table) { public static TableId buildTableId(String shard, String keyspace, String table) { return new TableId(shard, keyspace, table); } + + private String getDatabaseWithShard(String shard, String database) { + return String.format("%s.%s", shard, database); + } + + public List parseDdl(VitessPartition partition, VitessOffsetContext offset, String ddlStatement, + String databaseName, String shard) { + final List schemaChangeEvents = new ArrayList<>(1); + DdlChanges ddlChanges = ddlParser.getDdlChanges(); + ddlChanges.reset(); + ddlParser.setCurrentDatabase(getDatabaseWithShard(shard, databaseName)); + ddlParser.parse(ddlStatement, tables()); + if (!ddlChanges.isEmpty()) { + ddlChanges.getEventsByDatabase((String dbName, List events) -> { + events.forEach(event -> { + final TableId tableId = getTableId(event); + SchemaChangeEvent.SchemaChangeEventType type = switch (event.type()) { + case CREATE_TABLE -> SchemaChangeEvent.SchemaChangeEventType.CREATE; + case DROP_TABLE -> SchemaChangeEvent.SchemaChangeEventType.DROP; + case ALTER_TABLE-> SchemaChangeEvent.SchemaChangeEventType.ALTER; + case TRUNCATE_TABLE -> SchemaChangeEvent.SchemaChangeEventType.TRUNCATE; + default -> { + LOGGER.info("Skipped DDL event type {}: {}", event.type(), ddlStatement); + yield null; + } + }; + emitChangeEvent( + partition, + offset, + schemaChangeEvents, + databaseName, + event, + tableId, + type, + false); + }); + }); + return schemaChangeEvents; + } + return Collections.emptyList(); + } + + private TableId getTableId(DdlParserListener.Event event) { + if (event instanceof DdlParserListener.TableEvent) { + DdlParserListener.TableEvent tableEvent = (DdlParserListener.TableEvent) event; + return tableEvent.tableId(); + } + return null; + } + + private void emitChangeEvent(VitessPartition partition, VitessOffsetContext offset, List schemaChangeEvents, + String sanitizedDbName, DdlParserListener.Event event, TableId tableId, + SchemaChangeEvent.SchemaChangeEventType type, boolean snapshot) { + SchemaChangeEvent schemaChangeEvent; + if (type.equals(SchemaChangeEvent.SchemaChangeEventType.ALTER) && event instanceof DdlParserListener.TableAlteredEvent + && ((DdlParserListener.TableAlteredEvent) event).previousTableId() != null) { + schemaChangeEvent = SchemaChangeEvent.ofRename( + partition, + offset, + sanitizedDbName, + null, + event.statement(), + tableId != null ? tables().forTable(tableId) : null, + ((DdlParserListener.TableAlteredEvent) event).previousTableId()); + } + else { + Table table = getTable(tableId, type); + schemaChangeEvent = SchemaChangeEvent.of( + type, + partition, + offset, + sanitizedDbName, + null, + event.statement(), + table, + snapshot); + } + schemaChangeEvents.add(schemaChangeEvent); + } + + private Table getTable(TableId tableId, SchemaChangeEvent.SchemaChangeEventType type) { + if (tableId == null) { + return null; + } + if (SchemaChangeEvent.SchemaChangeEventType.DROP == type) { + // DROP events don't have information about tableChanges, so we are creating a Table object + // with just the tableId to be used during blocking snapshot to filter out drop events not + // related to table to be snapshotted. + return Table.editor().tableId(tableId).create(); + } + return tables().forTable(tableId); + } + + public DdlParser getDdlParser() { + return ddlParser; + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChange) { + switch (schemaChange.getType()) { + case CREATE: + case ALTER: + schemaChange.getTableChanges().forEach(x -> buildAndRegisterSchema(x.getTable())); + break; + case DROP: + schemaChange.getTableChanges().forEach(x -> removeSchema(x.getId())); + break; + default: + } + + // Record the DDL statement so that we can later recover them. + // This is done _after_ writing the schema change records so that failure recovery (which is based on + // the schema history) won't lose schema change records. + // + // We either store: + // - all DDLs if configured + // - or global SEt variables + // - or DDLs for captured objects + if (!storeOnlyCapturedTables() || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase()) + || schemaChange.getTables().stream().map(Table::id).anyMatch(getTableFilter()::isIncluded)) { + LOGGER.debug("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl()); + record(schemaChange, schemaChange.getTableChanges()); + } + + } + + public boolean isGlobalSetVariableStatement(String ddl, String databaseName) { + return (databaseName == null || databaseName.isEmpty()) && ddl != null && ddl.toUpperCase().startsWith("SET "); + } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessMetadata.java b/src/main/java/io/debezium/connector/vitess/VitessMetadata.java index b3175496..dba2a367 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessMetadata.java +++ b/src/main/java/io/debezium/connector/vitess/VitessMetadata.java @@ -193,7 +193,7 @@ protected static List getNonEmptyShards(List> vitessTabletR } @VisibleForTesting - protected static List flattenAndConcat(List> nestedList) { + public static List flattenAndConcat(List> nestedList) { return nestedList.stream() .map(innerList -> String.join("", innerList)) .collect(Collectors.toList()); diff --git a/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java b/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java index 2aac4abc..d5feeb9a 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java +++ b/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java @@ -23,7 +23,7 @@ */ public class VitessOffsetRetriever { - private static final Logger LOGGER = LoggerFactory.getLogger(VitessConnector.class); + private static final Logger LOGGER = LoggerFactory.getLogger(VitessOffsetRetriever.class); private final int numTasks; private final int gen; diff --git a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java index a238c387..f3f35e06 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSnapshotChangeEventSource.java @@ -5,31 +5,40 @@ */ package io.debezium.connector.vitess; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; -import io.debezium.jdbc.JdbcConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.vitess.connection.VitessReplicationConnection; +import io.debezium.connector.vitess.jdbc.VitessConnection; import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.spi.SnapshotProgressListener; -import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalSnapshotChangeEventSource; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; import io.debezium.snapshot.SnapshotterService; import io.debezium.util.Clock; +import io.vitess.proto.Vtgate; /** Always skip snapshot for now */ public class VitessSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { + private static final Logger LOGGER = LoggerFactory.getLogger(VitessSnapshotChangeEventSource.class); + + private final VitessConnectorConfig connectorConfig; + private final VitessDatabaseSchema schema; + public VitessSnapshotChangeEventSource( - RelationalDatabaseConnectorConfig connectorConfig, - MainConnectionProvidingConnectionFactory connectionFactory, + VitessConnectorConfig connectorConfig, + MainConnectionProvidingConnectionFactory connectionFactory, EventDispatcher dispatcher, VitessDatabaseSchema schema, Clock clock, @@ -44,57 +53,132 @@ public VitessSnapshotChangeEventSource( snapshotProgressListener, notificationService, snapshotterService); + this.connectorConfig = connectorConfig; + this.schema = schema; } @Override protected Set getAllTableIds(RelationalSnapshotContext snapshotContext) { - return null; + // TODO: Switch to using the connection factory + // TODO: Handle case of empty shard when that config exclude empty shards is enabled + List shards = new VitessMetadata(connectorConfig).getShards(); + try (VitessReplicationConnection connection = new VitessReplicationConnection(connectorConfig, schema)) { + Vtgate.ExecuteResponse response = connection.executeInKeyspace("SHOW TABLES"); + List> rows = VitessMetadata.parseRows(response.getResult().getRowsList()); + List tables = VitessMetadata.flattenAndConcat(rows); + Set tableIds = new HashSet<>(); + // TODO: Switch to querying all shards? + // Set the table ID for each shard, assume the DDLs are the same to avoid many queries on Vitess + for (String table : tables) { + for (String shard : shards) { + tableIds.add(new TableId(shard, connectorConfig.getKeyspace(), table)); + } + } + return tableIds; + } + catch (Exception e) { + throw new RuntimeException(e); + } } @Override protected void lockTablesForSchemaSnapshot( ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext) { + // TODO: Do we need to do any locking? + LOGGER.info("lock tables"); } @Override - protected void determineSnapshotOffset(RelationalSnapshotContext snapshotContext, VitessOffsetContext offsetContext) { + protected void determineSnapshotOffset(RelationalSnapshotContext context, VitessOffsetContext previousOffset) { + LOGGER.info("determine offset"); + context.offset = VitessOffsetContext.initialContext(connectorConfig, Clock.system()); } @Override protected void readTableStructure(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext, VitessOffsetContext offsetContext, SnapshottingTask snapshottingTask) { + LOGGER.info("Reading table structure"); + Set capturedSchemaTables = snapshotContext.capturedSchemaTables; + List shards = new VitessMetadata(connectorConfig).getShards(); + + for (TableId tableId : capturedSchemaTables) { + String sql = "SHOW CREATE TABLE " + quote(tableId); + try (VitessReplicationConnection connection = new VitessReplicationConnection(connectorConfig, schema)) { + Vtgate.ExecuteResponse response = connection.executeInKeyspace(sql); + List> rows = VitessMetadata.parseRows(response.getResult().getRowsList()); + if (rows.size() == 0) { + LOGGER.error("No rows {}", response); + } + String ddlStatement = rows.get(0).get(1); + // TODO: Switch to getting the create table from each shard? + // For now, iterate over all shards and add the create tables to the table in-memory representation + for (String shard : shards) { + List schemaChangeEvents = schema.parseDdl( + snapshotContext.partition, snapshotContext.offset, ddlStatement, connectorConfig.getKeyspace(), shard); + for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { + LOGGER.info("Adding schema change event {}", schemaChangeEvent); + Table table = schema.tableFor(tableId); + if (table != null) { + LOGGER.info("Adding schema for table {}", table.id()); + Table updatedTable = getTableWithShardAsCatalog(table, shard); + snapshotContext.tables.overwriteTable(updatedTable); + } + } + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + } + + private Table getTableWithShardAsCatalog(Table table, String shard) { + String[] shardAndDatabase = table.id().catalog().split("\\."); + String database = shardAndDatabase[1]; + String tableName = table.id().table(); + return table.edit().tableId(new TableId(shard, database, tableName)).create(); + } + + private String quote(TableId id) { + return quote(id.schema()) + "." + quote(id.table()); + } + + private String quote(String dbOrTableName) { + return "`" + dbOrTableName + "`"; } @Override protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext snapshotContext) { + LOGGER.info("release schema locks"); } @Override protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext, Table table) { - return null; + return SchemaChangeEvent.ofSnapshotCreate( + snapshotContext.partition, + snapshotContext.offset, + snapshotContext.catalogName, + table); } @Override protected Optional getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId, List columns) { + LOGGER.info("get snapshot select"); return Optional.empty(); } - @Override - public SnapshottingTask getSnapshottingTask(VitessPartition partition, VitessOffsetContext previousOffset) { - boolean snapshotSchema = false; - boolean snapshotData = false; - return new SnapshottingTask(snapshotSchema, snapshotData, List.of(), Map.of(), false); - } - @Override protected SnapshotContext prepare(VitessPartition partition, boolean onDemand) { + LOGGER.info("snapshot context"); return new RelationalSnapshotContext<>(partition, "", onDemand); } @Override protected VitessOffsetContext copyOffset(RelationalSnapshotContext snapshotContext) { + LOGGER.info("copy offset"); return null; } diff --git a/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java b/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java index 94dce398..cda99947 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSourceInfoStructMaker.java @@ -22,8 +22,8 @@ public void init(String connector, String version, CommonConnectorConfig connect this.schema = commonSchemaBuilder() .name("io.debezium.connector.vitess.Source") .field(SourceInfo.KEYSPACE_NAME_KEY, Schema.STRING_SCHEMA) - .field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA) - .field(SourceInfo.SHARD_KEY, Schema.STRING_SCHEMA) + .field(SourceInfo.TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.SHARD_KEY, Schema.OPTIONAL_STRING_SCHEMA) .field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA) .build(); } @@ -37,7 +37,7 @@ public Schema schema() { public Struct struct(SourceInfo sourceInfo) { final Struct res = super.commonStruct(sourceInfo) .put(SourceInfo.KEYSPACE_NAME_KEY, sourceInfo.keyspace()) - .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()) + .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.table()) .put(SourceInfo.SHARD_KEY, sourceInfo.shard()) .put(SourceInfo.VGTID_KEY, sourceInfo.getCurrentVgtid().toString()); return res; diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java index e7ba94c4..1a7da739 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java @@ -5,12 +5,15 @@ */ package io.debezium.connector.vitess; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.DebeziumException; +import io.debezium.connector.vitess.connection.DdlMessage; import io.debezium.connector.vitess.connection.ReplicationConnection; import io.debezium.connector.vitess.connection.ReplicationMessage; import io.debezium.connector.vitess.connection.ReplicationMessageProcessor; @@ -19,6 +22,7 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.schema.SchemaChangeEvent; import io.debezium.util.Clock; import io.debezium.util.DelayStrategy; @@ -108,10 +112,29 @@ else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) { } return; } - else if (message.getOperation() == ReplicationMessage.Operation.DDL || message.getOperation() == ReplicationMessage.Operation.OTHER) { - // DDL event or OTHER event + else if (message.getOperation() == ReplicationMessage.Operation.OTHER) { offsetContext.rotateVgtid(newVgtid, message.getCommitTime()); } + else if (message.getOperation() == ReplicationMessage.Operation.DDL) { + offsetContext.rotateVgtid(newVgtid, message.getCommitTime()); + offsetContext.setShard(message.getShard()); + + DdlMessage ddlMessage = (DdlMessage) message; + List schemaChangeEvents = schema.parseDdl( + partition, offsetContext, ddlMessage.getStatement(), + connectorConfig.getKeyspace(), ddlMessage.getShard()); + for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { + final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id(); + dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableId, (receiver) -> { + try { + receiver.schemaChangeEvent(schemaChangeEvent); + } + catch (Exception e) { + throw new DebeziumException(e); + } + }); + } + } else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) { dispatcher.dispatchHeartbeatEvent(partition, offsetContext); } diff --git a/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java b/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java index 293ce4f7..a8f384cd 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java @@ -14,11 +14,15 @@ public class DdlMessage implements ReplicationMessage { private final String transactionId; private final Instant commitTime; private final Operation operation; + private final String statement; + private final String shard; - public DdlMessage(String transactionId, Instant commitTime) { + public DdlMessage(String transactionId, Instant commitTime, String statement, String shard) { this.transactionId = transactionId; this.commitTime = commitTime; this.operation = Operation.DDL; + this.statement = statement; + this.shard = shard; } @Override @@ -41,9 +45,14 @@ public String getTable() { throw new UnsupportedOperationException(); } + @Override + public String getStatement() { + return statement; + } + @Override public String getShard() { - throw new UnsupportedOperationException(); + return shard; } @Override @@ -67,8 +76,12 @@ public String toString() { + "transactionId='" + transactionId + '\'' + + ", shard=" + + shard + ", commitTime=" + commitTime + + ", statement=" + + statement + ", operation=" + operation + '}'; diff --git a/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java b/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java index b3c8a4c7..bb63f31d 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java @@ -44,6 +44,11 @@ public String getShard() { throw new UnsupportedOperationException(); } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public List getOldTupleList() { throw new UnsupportedOperationException(); diff --git a/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java b/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java index 2b5179ea..8b47adf2 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java @@ -41,6 +41,11 @@ public String getTable() { throw new UnsupportedOperationException(); } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public String getShard() { throw new UnsupportedOperationException(); diff --git a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java index e453d188..d900f174 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java @@ -80,6 +80,8 @@ interface ColumnValue { List getNewTupleList(); + String getStatement(); + default boolean isTransactionalMessage() { return getOperation() == Operation.BEGIN || getOperation() == Operation.COMMIT; } diff --git a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java index 76c38796..1124208e 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java @@ -48,6 +48,11 @@ public String getShard() { return shard; } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public List getOldTupleList() { throw new UnsupportedOperationException(); diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index 7bcf61f7..869f5368 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -100,7 +100,7 @@ private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor pro this.transactionId = newVgtid.toString(); } processor.process( - new DdlMessage(transactionId, eventTimestamp), newVgtid, false); + new DdlMessage(transactionId, eventTimestamp, vEvent.getStatement(), vEvent.getShard()), newVgtid, false); } private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java index cb806417..ccbfcf29 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java @@ -64,6 +64,11 @@ public String getShard() { return shard; } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public List getOldTupleList() { return oldColumns; diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 9bffb4d9..ac7419e5 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -77,6 +77,21 @@ public Vtgate.ExecuteResponse execute(String sqlStatement) { return newBlockingStub(channel).execute(request); } + public Vtgate.ExecuteResponse executeInKeyspace(String sqlStatement) { + LOGGER.info("Executing sqlStament {}", sqlStatement); + ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize()); + managedChannel.compareAndSet(null, channel); + + String target = String.format("%s", config.getKeyspace()); + Vtgate.Session session = Vtgate.Session.newBuilder().setTargetString(target).setAutocommit(true).build(); + LOGGER.debug("Autocommit {}", session.getAutocommit()); + Vtgate.ExecuteRequest request = Vtgate.ExecuteRequest.newBuilder() + .setQuery(Proto.bindQuery(sqlStatement, Collections.emptyMap())) + .setSession(session) + .build(); + return newBlockingStub(channel).execute(request); + } + public Vtgate.ExecuteResponse execute(String sqlStatement, String shard) { LOGGER.info("Executing sqlStament {}", sqlStatement); ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize()); diff --git a/src/main/java/io/debezium/connector/vitess/jdbc/VitessConnection.java b/src/main/java/io/debezium/connector/vitess/jdbc/VitessConnection.java new file mode 100644 index 00000000..f73a1483 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/jdbc/VitessConnection.java @@ -0,0 +1,35 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.jdbc; + +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; + +/** + * Needed to maintain compatibility with RelationalSnapshotChangeEventSource + * + * TODO: Move all query-based interactions with Vitess onto this class. + * Currently we do these with VitessReplicationConnection instead + * + * @author Thomas Thornton + */ +public class VitessConnection extends JdbcConnection { + + private static final String URL = "jdbc:mysql://${hostname}:${port}/${dbname}?maxAllowedPacket=512000"; + + protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(URL); + + private static final String QUOTED_CHARACTER = "`"; + + public VitessConnection(JdbcConfiguration config) { + super(config, resolveConnectionFactory(), QUOTED_CHARACTER, QUOTED_CHARACTER); + } + + private static ConnectionFactory resolveConnectionFactory() { + return FACTORY; + } +} diff --git a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java index 664a1b0b..86a94121 100644 --- a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java +++ b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java @@ -500,9 +500,7 @@ protected class TestConsumer { protected TestConsumer(int expectedRecordsCount, String... topicPrefixes) { this.expectedRecordsCount = expectedRecordsCount; this.records = new ConcurrentLinkedQueue<>(); - this.topicPrefixes = Arrays.stream(topicPrefixes) - .map(p -> TestHelper.TEST_SERVER + "." + p) - .collect(Collectors.toList()); + this.topicPrefixes = Arrays.stream(topicPrefixes).toList(); } public void setIgnoreExtraRecords(boolean ignoreExtraRecords) { diff --git a/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java b/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java index 03e19af4..5b43af2a 100644 --- a/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java +++ b/src/test/java/io/debezium/connector/vitess/SourceInfoTest.java @@ -128,8 +128,8 @@ public void schemaIsCorrect() { .field("ts_us", Schema.OPTIONAL_INT64_SCHEMA) .field("ts_ns", Schema.OPTIONAL_INT64_SCHEMA) .field("keyspace", Schema.STRING_SCHEMA) - .field("table", Schema.STRING_SCHEMA) - .field("shard", Schema.STRING_SCHEMA) + .field("table", Schema.OPTIONAL_STRING_SCHEMA) + .field("shard", Schema.OPTIONAL_STRING_SCHEMA) .field("vgtid", Schema.STRING_SCHEMA) .build(); diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index ca0b77fa..43795946 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -33,6 +33,7 @@ import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; +import io.debezium.relational.history.MemorySchemaHistory; import io.vitess.proto.Query; import io.vitess.proto.Query.Field; @@ -87,6 +88,17 @@ public static Configuration.Builder defaultConfig() { return defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TEST_SHARD, null, null); } + public static String getTopicPrefix(boolean hasMultipleShards) { + String keyspace; + if (hasMultipleShards) { + keyspace = TEST_SHARDED_KEYSPACE; + } + else { + keyspace = TEST_UNSHARDED_KEYSPACE; + } + return String.join(".", TEST_SERVER, keyspace); + } + /** * Get the default configuration of the connector * @@ -148,6 +160,7 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards, .with(VitessConnectorConfig.VTGATE_HOST, VTGATE_HOST) .with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT) .with(VitessConnectorConfig.VTGATE_USER, USERNAME) + .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class) .with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD) .with(VitessConnectorConfig.POLL_INTERVAL_MS, 100); if (!Strings.isNullOrEmpty(tableInclude)) { diff --git a/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java b/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java index b8420b2e..6d4b8278 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java @@ -31,7 +31,7 @@ import io.debezium.util.Clock; import io.vitess.proto.Query; -public class VitessBigIntUnsignedTest { +public class VitessBigIntUnsignedTest extends VitessTestCleanup { private static final Logger LOGGER = LoggerFactory.getLogger(VitessChangeRecordEmitterTest.class); protected static Object getJavaValue(VitessConnectorConfig.BigIntUnsignedHandlingMode mode) { @@ -74,7 +74,6 @@ protected static Struct defaultStruct(TableSchema tableSchema, VitessConnectorCo protected void handleInsert(VitessConnectorConfig.BigIntUnsignedHandlingMode mode) throws Exception { VitessConnectorConfig connectorConfig; - VitessDatabaseSchema schema; VStreamOutputMessageDecoder decoder; Configuration.Builder builder = TestHelper.defaultConfig(); diff --git a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java index 89d49d19..1fdfccb3 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java @@ -7,7 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,15 +22,14 @@ import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; -public class VitessChangeRecordEmitterTest { +public class VitessChangeRecordEmitterTest extends VitessTestCleanup { private static final Logger LOGGER = LoggerFactory.getLogger(VitessChangeRecordEmitterTest.class); - private static VitessConnectorConfig connectorConfig; - private static VitessDatabaseSchema schema; - private static VStreamOutputMessageDecoder decoder; + private VitessConnectorConfig connectorConfig; + private VStreamOutputMessageDecoder decoder; - @BeforeClass - public static void beforeClass() throws Exception { + @Before + public void before() throws Exception { connectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().build()); schema = new VitessDatabaseSchema( connectorConfig, diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 33c21b83..1e0b68bd 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -16,6 +16,7 @@ import static io.debezium.connector.vitess.TestHelper.TEST_SHARD_TO_EPOCH; import static io.debezium.connector.vitess.TestHelper.TEST_UNSHARDED_KEYSPACE; import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE; +import static io.debezium.connector.vitess.TestHelper.getTopicPrefix; import static junit.framework.TestCase.assertEquals; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; @@ -73,6 +74,7 @@ import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; +import io.debezium.relational.history.MemorySchemaHistory; import io.debezium.util.Collect; import io.debezium.util.Testing; @@ -180,6 +182,112 @@ public void shouldReceiveHeartbeatEvents() throws Exception { assertThat(records.recordsForTopic(topic).size()).isEqualTo(expectedHeartbeatRecords); } + @Test + @FixFor("DBZ-7962") + public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception { + String keyspace = TEST_SHARDED_KEYSPACE; + String table = keyspace + ".ddl_table"; + TestHelper.executeDDL("vitess_create_tables.ddl", keyspace); + startConnector(config -> config + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true) + // .with(VitessConnectorConfig.KEYSPACE.name(), keyspace) + // .with(VitessConnectorConfig.SNAPSHOT_MODE_TABLES, "test_sharded_keyspace.ddl_table") + .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table) + .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) + .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), + true); + assertConnectorIsRunning(); + + String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX); + + TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE); + TestHelper.execute("ALTER TABLE ddl_table ADD PARTITION (PARTITION p2 VALUES LESS THAN (2000));", TEST_SHARDED_KEYSPACE); + TestHelper.execute("TRUNCATE TABLE ddl_table;", TEST_SHARDED_KEYSPACE); + TestHelper.execute("DROP TABLE ddl_table;", TEST_SHARDED_KEYSPACE); + TestHelper.execute("CREATE TABLE ddl_table (id BIGINT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id));", TEST_SHARDED_KEYSPACE); + + // 1 for the snapshot (create table ddls) + // 5 for the changes above + // 2 shards, so (5 + 1) * 2 = 12 + int expectedSchemaChangeRecords = 12; + Awaitility + .await() + .atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> consumeRecordsByTopic(expectedSchemaChangeRecords).allRecordsInOrder().size() >= expectedSchemaChangeRecords); + + AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(expectedSchemaChangeRecords, 1); + // assertThat(records.recordsForTopic(schemaChangeTopic).size()).isEqualTo(expectedSchemaChangeRecords); + } + + @Test + @FixFor("DBZ-7962") + public void shouldReceiveSnapshotAndSchemaChangeEvents() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + startConnector(config -> config + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true) + .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.ddl_table") + // .with(VitessConnectorConfig.SNAPSHOT_MODE_TABLES "test_unsharded_keyspace.ddl_table") + .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA) + .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), + false); + assertConnectorIsRunning(); + + String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX); + + TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;"); + TestHelper.execute("ALTER TABLE ddl_table ADD PARTITION (PARTITION p2 VALUES LESS THAN (2000));"); + TestHelper.execute("TRUNCATE TABLE ddl_table;"); + TestHelper.execute("DROP TABLE ddl_table;"); + TestHelper.execute("CREATE TABLE ddl_table (id BIGINT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id));"); + + // 1 is the snapshot + // 5 are the changes above + int expectedSchemaChangeRecords = 6; + Awaitility + .await() + .atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> consumeRecordsByTopic(expectedSchemaChangeRecords).allRecordsInOrder().size() >= expectedSchemaChangeRecords); + + AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(expectedSchemaChangeRecords, 1); + // assertThat(records.recordsForTopic(schemaChangeTopic).size()).isEqualTo(expectedSchemaChangeRecords); + } + + @Test + @FixFor("DBZ-7962") + public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + // startConnector(); + startConnector(config -> config + .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES.name(), true) + .with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class), + false); + assertConnectorIsRunning(); + + String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX); + String dataChangeTopic = String.join(".", + TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX), + TEST_UNSHARDED_KEYSPACE, + "ddl_table"); + + TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);"); + TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;"); + + int expectedDataChangeRecords = 1; + int expectedSchemaChangeRecords = 1; + int expectedTotalRecords = expectedDataChangeRecords + expectedSchemaChangeRecords; + Awaitility + .await() + .atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> consumeRecordsByTopic(expectedTotalRecords).allRecordsInOrder().size() >= expectedTotalRecords); + + AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(expectedTotalRecords, 1); + // assertThat(records.recordsForTopic(schemaChangeTopic).size()).isEqualTo(expectedSchemaChangeRecords); + // assertThat(records.recordsForTopic(dataChangeTopic).size()).isEqualTo(expectedDataChangeRecords); + } + @Test @FixFor("DBZ-7962") public void shouldReceiveHeartbeatEventsShardedKeyspace() throws Exception { @@ -430,7 +538,7 @@ public void shouldSchemaUpdatedAfterOnlineDdl() throws Exception { startConnector(); assertConnectorIsRunning(); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount); + consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false)); assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD); // Add a column using online ddl and wait until it is finished String ddlId = TestHelper.applyOnlineDdl("ALTER TABLE numeric_table ADD COLUMN foo INT", TEST_UNSHARDED_KEYSPACE); @@ -686,7 +794,7 @@ public void shouldUseLocalVgtid() throws Exception { Vgtid baseVgtid = TestHelper.getCurrentVgtid(); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount + 2); + consumer = testConsumer(expectedRecordsCount + 2, getTopicPrefix(true), TEST_SERVER + ".transaction"); String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)"; String insertQuery = "INSERT INTO numeric_table (" @@ -1223,7 +1331,7 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti assertConnectorIsRunning(); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount); + consumer = testConsumer(expectedRecordsCount, getTopicPrefix(hasMultipleShards)); assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, hasMultipleShards); } @@ -1646,7 +1754,7 @@ public void testSnapshotForTableWithEnums() throws Exception { } // We should receive a record written before starting the connector. - consumer = testConsumer(totalRecordsCount); + consumer = testConsumer(totalRecordsCount, getTopicPrefix(false)); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); for (int i = 1; i <= totalRecordsCount; i++) { SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_TYPE_STMT), TestHelper.PK_FIELD); @@ -1691,7 +1799,7 @@ public void testSnapshotForTableWithEnumsAmbiguous() throws Exception { } // We should receive a record written before starting the connector. - consumer = testConsumer(totalRecordsCount); + consumer = testConsumer(totalRecordsCount, getTopicPrefix(false)); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); for (int i = 1; i <= totalRecordsCount; i++) { SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_ENUM_AMBIGUOUS_TYPE_STMT), TestHelper.PK_FIELD); @@ -1730,7 +1838,7 @@ public void testVgtidIncludesLastPkDuringTableCopy() throws Exception { -1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD); // We should receive a record written before starting the connector. - consumer = testConsumer(expectedSnapshotRecordsCount); + consumer = testConsumer(expectedSnapshotRecordsCount, getTopicPrefix(false)); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); for (int i = 1; i <= expectedSnapshotRecordsCount; i++) { SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD); @@ -1790,11 +1898,12 @@ public void testMidSnapshotRecoveryLargeTable() throws Exception { startConnector(Function.identity(), false, false, 1, -1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD); - consumer = testConsumer(1, tableInclude); + String topicPrefix = TEST_SERVER + "." + tableInclude; + consumer = testConsumer(1, topicPrefix, getTopicPrefix(false)); consumer.await(TestHelper.waitTimeForRecords(), 0, TimeUnit.SECONDS); stopConnector(); // Upper bound is the total size of the table so set that to prevent early termination - consumer = testConsumer(expectedSnapshotRecordsCount, tableInclude); + consumer = testConsumer(expectedSnapshotRecordsCount, topicPrefix); int recordCount = consumer.countRecords(5, TimeUnit.SECONDS); // Assert snapshot is partially complete assertThat(recordCount).isPositive(); @@ -1855,7 +1964,7 @@ public void testCopyNoRecordsAndReplicateTable() throws Exception { startConnector(Function.identity(), false, false, 1, -1, -1, tableInclude, null, null); int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount); + consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false)); // We should receive record from numeric_table assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD); @@ -1873,7 +1982,7 @@ public void testInitialSnapshotModeHaveMultiShard() throws Exception { // We should receive a record written before starting the connector. int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount); + consumer = testConsumer(expectedRecordsCount, getTopicPrefix(hasMultipleShards)); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT, TEST_SHARDED_KEYSPACE), TestHelper.PK_FIELD); assertSourceInfo(record, TEST_SERVER, TEST_SHARDED_KEYSPACE, "numeric_table"); @@ -1896,7 +2005,7 @@ public void testCopyTableAndRestart() throws Exception { // We should receive a record written before starting the connector. int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount); + consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false)); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD); assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table"); @@ -1920,7 +2029,7 @@ public void testCopyAndReplicatePerTaskOffsetStorage() throws Exception { // We should receive a record written before starting the connector. int expectedRecordsCount = 1; - consumer = testConsumer(expectedRecordsCount); + consumer = testConsumer(expectedRecordsCount, getTopicPrefix(false)); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(INSERT_NUMERIC_TYPES_STMT), TestHelper.PK_FIELD); assertSourceInfo(record, TEST_SERVER, TEST_UNSHARDED_KEYSPACE, "numeric_table"); diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java index 658f6282..7e4b6bae 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java @@ -32,7 +32,7 @@ import io.debezium.util.Collect; import io.debezium.util.Testing; -public class VitessConnectorTaskTest { +public class VitessConnectorTaskTest extends VitessTestCleanup { private static final LogInterceptor logInterceptor = new LogInterceptor(BaseSourceTask.class); private static final LogInterceptor vitessLogInterceptor = new LogInterceptor(VitessConnectorTask.class); @@ -45,7 +45,7 @@ public void shouldStartWithTaskOffsetStorageEnabledAndNoOffsets() { .with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1) .with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "0") .build(); - VitessConnectorTask task = new VitessConnectorTask(); + task = new VitessConnectorTask(); ContextHelper helper = new ContextHelper(); task.initialize(helper.getSourceTaskContext()); ChangeEventSourceCoordinator coordinator = task.start(config); @@ -58,7 +58,7 @@ public void shouldStartWithTaskOffsetStorageDisabledAndNoOffsets() { .with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1) .with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "0") .build(); - VitessConnectorTask task = new VitessConnectorTask(); + task = new VitessConnectorTask(); ContextHelper helper = new ContextHelper(); task.initialize(helper.getSourceTaskContext()); ChangeEventSourceCoordinator coordinator = task.start(config); @@ -71,7 +71,7 @@ public void shouldReadOffsetsWhenTaskOffsetStorageDisabled() { .with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1) .with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "0") .build(); - VitessConnectorTask task = new VitessConnectorTask(); + task = new VitessConnectorTask(); ContextHelper helper = new ContextHelper(); helper.storeOffsets(VGTID_JSON, null); task.initialize(helper.getSourceTaskContext()); @@ -98,7 +98,7 @@ public void shouldReadCurrentGenOffsets() { .with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1) .with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, "-80,80-") .build(); - VitessConnectorTask task = new VitessConnectorTask(); + task = new VitessConnectorTask(); ContextHelper helper = new ContextHelper(); helper.storeOffsets(null, Map.of(taskKey, VGTID_JSON)); task.initialize(helper.getSourceTaskContext()); @@ -128,7 +128,7 @@ public void shouldReadPreviousGenOffsets() { .with(VitessConnectorConfig.VITESS_TASK_SHARDS_CONFIG, shards) .with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 2) .build(); - VitessConnectorTask task = new VitessConnectorTask(); + task = new VitessConnectorTask(); task.initialize(helper.getSourceTaskContext()); task.start(config); String expectedMessage = "Using offsets from previous gen"; @@ -155,7 +155,7 @@ public void shouldReadConfiguredOffsets() { .with(VitessConnectorConfig.VITESS_TOTAL_TASKS_CONFIG, 1) .with(VitessConnectorConfig.VGTID, VGTID_JSON) .build(); - VitessConnectorTask task = new VitessConnectorTask(); + task = new VitessConnectorTask(); ContextHelper helper = new ContextHelper(); task.initialize(helper.getSourceTaskContext()); task.start(config); diff --git a/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java b/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java index 596a798f..34e5decb 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessSourceInfoStructMakerTest.java @@ -41,9 +41,9 @@ public void shouldGetCorrectSourceInfoSchema() { assertThat(structMaker.schema().field(SourceInfo.KEYSPACE_NAME_KEY).schema()) .isEqualTo(Schema.STRING_SCHEMA); assertThat(structMaker.schema().field(SourceInfo.SHARD_KEY).schema()) - .isEqualTo(Schema.STRING_SCHEMA); + .isEqualTo(Schema.OPTIONAL_STRING_SCHEMA); assertThat(structMaker.schema().field(SourceInfo.TABLE_NAME_KEY).schema()) - .isEqualTo(Schema.STRING_SCHEMA); + .isEqualTo(Schema.OPTIONAL_STRING_SCHEMA); assertThat(structMaker.schema().field(SourceInfo.VGTID_KEY).schema()) .isEqualTo(Schema.STRING_SCHEMA); assertThat(structMaker.schema()).isNotNull(); diff --git a/src/test/java/io/debezium/connector/vitess/VitessTestCleanup.java b/src/test/java/io/debezium/connector/vitess/VitessTestCleanup.java new file mode 100644 index 00000000..ff12bc96 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/VitessTestCleanup.java @@ -0,0 +1,39 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess; + +import org.junit.After; + +/** + * @author Thomas Thornton + */ +public class VitessTestCleanup { + + public VitessDatabaseSchema schema; + public VitessConnectorTask task; + + @After + public void afterEach() { + if (schema != null) { + try { + schema.close(); + } + finally { + schema = null; + } + } + if (task != null) { + try { + task.doStop(); + } + finally { + task = null; + } + } + } + +} diff --git a/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java b/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java index 308cdce2..954c92e4 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessValueConverterTest.java @@ -36,9 +36,8 @@ import binlogdata.Binlogdata; -public class VitessValueConverterTest { +public class VitessValueConverterTest extends VitessTestCleanup { - private VitessDatabaseSchema schema; private VitessConnectorConfig config; private VitessValueConverter converter; private VStreamOutputMessageDecoder decoder; diff --git a/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java b/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java new file mode 100644 index 00000000..86a05180 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java @@ -0,0 +1,44 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.connection; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; + +import org.junit.Test; + +/** + * @author Thomas Thornton + */ +public class DdlMessageTest { + + @Test + public void shouldSetQuery() { + String statement = "ALTER TABLE foo RENAME TO bar"; + ReplicationMessage replicationMessage = new DdlMessage("gtid", Instant.EPOCH, statement, "0"); + assertThat(replicationMessage.getStatement()).isEqualTo(statement); + } + + @Test + public void shouldSetShard() { + String statement = "ALTER TABLE foo RENAME TO bar"; + String shard = "-80"; + ReplicationMessage replicationMessage = new DdlMessage("gtid", Instant.EPOCH, statement, shard); + assertThat(replicationMessage.getShard()).isEqualTo(shard); + } + + @Test + public void shouldConvertToString() { + String statement = "ALTER TABLE foo RENAME TO bar"; + String shard = "-80"; + ReplicationMessage replicationMessage = new DdlMessage("gtid", Instant.EPOCH, statement, shard); + assertThat(replicationMessage.toString()).isEqualTo( + "DdlMessage{transactionId='gtid', shard=-80, commitTime=1970-01-01T00:00:00Z, statement=ALTER TABLE foo RENAME TO bar, operation=DDL}"); + } + +} diff --git a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java index 98cf3e7b..4301afb8 100644 --- a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java +++ b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java @@ -21,6 +21,7 @@ import io.debezium.connector.vitess.VgtidTest; import io.debezium.connector.vitess.VitessConnectorConfig; import io.debezium.connector.vitess.VitessDatabaseSchema; +import io.debezium.connector.vitess.VitessTestCleanup; import io.debezium.doc.FixFor; import io.debezium.relational.Table; import io.debezium.relational.TableId; @@ -31,11 +32,10 @@ import binlogdata.Binlogdata; -public class VStreamOutputMessageDecoderTest { +public class VStreamOutputMessageDecoderTest extends VitessTestCleanup { private static final Logger LOGGER = LoggerFactory.getLogger(VStreamOutputMessageDecoderTest.class); private VitessConnectorConfig connectorConfig; - private VitessDatabaseSchema schema; private VStreamOutputMessageDecoder decoder; @Before diff --git a/src/test/resources/vitess_create_tables.ddl b/src/test/resources/vitess_create_tables.ddl index c79212e4..7411f043 100644 --- a/src/test/resources/vitess_create_tables.ddl +++ b/src/test/resources/vitess_create_tables.ddl @@ -20,6 +20,16 @@ CREATE TABLE numeric_table PRIMARY KEY (id) ); +DROP TABLE IF EXISTS ddl_table; +CREATE TABLE ddl_table +( + id BIGINT NOT NULL AUTO_INCREMENT, + PRIMARY KEY (id) +) +PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (1000) +); + DROP TABLE IF EXISTS string_table; CREATE TABLE string_table (