diff --git a/pom.xml b/pom.xml
index afc87e8f..51c14455 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,6 +228,10 @@
debezium-connector-binlog
${version.debezium}
+
+ com.mysql
+ mysql-connector-j
+
@@ -263,11 +267,6 @@
debezium-embedded
test
-
- com.mysql
- mysql-connector-j
- test
-
io.confluent
kafka-connect-avro-converter
diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
index 7631c00c..27fef4a6 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
@@ -98,6 +98,8 @@ public enum SnapshotMode implements EnumeratedValue {
NO_DATA("no_data"),
+ CONFIGURATION_BASED("configuration_based"),
+
/**
* Never perform an initial snapshot and only receive new data changes.
*/
diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
index fde0407c..e8235aee 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
@@ -241,6 +241,126 @@ public void shouldOnlySnapshotSchemaOnce() throws Exception {
assertThat(consumer.isEmpty());
}
+ @Test
+ @FixFor("DBZ-8325")
+ public void shouldMigrateToEnabledSchemaChanges() throws Exception {
+ String keyspace = TEST_SHARDED_KEYSPACE;
+ String table = keyspace + ".ddl_table";
+ TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
+ TestHelper.applyVSchema("vitess_vschema.json");
+
+ startConnector(config -> config
+ .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
+ .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER)
+ .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
+ .with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);
+ assertConnectorIsRunning();
+
+ TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);", TEST_SHARDED_KEYSPACE);
+ String dataChangeTopic = String.join(".",
+ TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX),
+ keyspace,
+ "ddl_table");
+
+ int expectedDataChangeRecords = 1;
+ consumer = testConsumer(expectedDataChangeRecords);
+ consumer.expects(expectedDataChangeRecords);
+ consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
+ for (int i = 0; i < expectedDataChangeRecords; i++) {
+ SourceRecord record = consumer.remove();
+ assertThat(record.topic()).isEqualTo(dataChangeTopic);
+ }
+ assertThat(consumer.isEmpty());
+
+ stopConnector();
+
+ startConnector(config -> config
+ .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
+ // Enable schema change
+ .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
+ .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.CONFIGURATION_BASED)
+ .with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR, true)
+ .with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, true)
+ .with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM, true)
+ // Other configs from before
+ .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
+ .with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);
+
+ // After startup, we need to make sure we recover the schema history (ie with snapshot disabled)
+ // so that we can properly parse & emit a schema change event for this alter table statement.
+ TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);
+
+ String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);
+
+ int expectedTotalRecords = 4;
+ consumer = testConsumer(expectedTotalRecords);
+ consumer.expects(expectedTotalRecords);
+ consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
+ for (int i = 0; i < expectedTotalRecords; i++) {
+ SourceRecord record = consumer.remove();
+ assertThat(record.topic()).isEqualTo(schemaChangeTopic);
+ }
+ assertThat(consumer.isEmpty());
+ }
+
+ @Test
+ @FixFor("DBZ-8325")
+ public void canDisableSchemaHistory() throws Exception {
+ String keyspace = TEST_SHARDED_KEYSPACE;
+ String table = keyspace + ".ddl_table";
+ TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
+ TestHelper.applyVSchema("vitess_vschema.json");
+
+ startConnector(config -> config
+ .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
+ .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
+ .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER),
+ true);
+ assertConnectorIsRunning();
+
+ TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);", TEST_SHARDED_KEYSPACE);
+ String dataChangeTopic = String.join(".",
+ TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX),
+ keyspace,
+ "ddl_table");
+
+ int expectedDataChangeRecords = 1;
+ consumer = testConsumer(expectedDataChangeRecords);
+ consumer.expects(expectedDataChangeRecords);
+ consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
+ for (int i = 0; i < expectedDataChangeRecords; i++) {
+ SourceRecord record = consumer.remove();
+ assertThat(record.topic()).isEqualTo(dataChangeTopic);
+ }
+ assertThat(consumer.isEmpty());
+
+ stopConnector();
+
+ startConnector(config -> config
+ .with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
+ .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
+ .with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER),
+ true);
+
+ // After startup, we need to make sure we recover the schema history (ie with snapshot disabled)
+ // so that we can properly parse & emit a schema change event for this alter table statement.
+ TestHelper.execute("INSERT INTO ddl_table (id) VALUES (2);", TEST_SHARDED_KEYSPACE);
+ TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);
+
+ // We should not receive any DDLs
+ String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);
+
+ int expectedTotalRecords = 1;
+ consumer = testConsumer(expectedTotalRecords);
+ consumer.expects(expectedTotalRecords);
+ consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
+ for (int i = 0; i < expectedTotalRecords; i++) {
+ SourceRecord record = consumer.remove();
+ assertThat(record.topic()).isEqualTo(dataChangeTopic);
+ }
+ assertThat(consumer.isEmpty());
+ }
+
@Test
@FixFor("DBZ-8325")
public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception {