Skip to content

Commit

Permalink
DBZ-8325 Support disabling schema change events
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 23, 2024
1 parent e7ca2f5 commit f4935f9
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 5 deletions.
9 changes: 4 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@
<artifactId>debezium-connector-binlog</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>


<!-- Testing -->
Expand Down Expand Up @@ -263,11 +267,6 @@
<artifactId>debezium-embedded</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public enum SnapshotMode implements EnumeratedValue {

NO_DATA("no_data"),

CONFIGURATION_BASED("configuration_based"),

/**
* Never perform an initial snapshot and only receive new data changes.
*/
Expand Down
120 changes: 120 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,126 @@ public void shouldOnlySnapshotSchemaOnce() throws Exception {
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void shouldMigrateToEnabledSchemaChanges() throws Exception {
String keyspace = TEST_SHARDED_KEYSPACE;
String table = keyspace + ".ddl_table";
TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
TestHelper.applyVSchema("vitess_vschema.json");

startConnector(config -> config
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);
assertConnectorIsRunning();

TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);", TEST_SHARDED_KEYSPACE);
String dataChangeTopic = String.join(".",
TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX),
keyspace,
"ddl_table");

int expectedDataChangeRecords = 1;
consumer = testConsumer(expectedDataChangeRecords);
consumer.expects(expectedDataChangeRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedDataChangeRecords; i++) {
SourceRecord record = consumer.remove();
assertThat(record.topic()).isEqualTo(dataChangeTopic);
}
assertThat(consumer.isEmpty());

stopConnector();

startConnector(config -> config
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
// Enable schema change
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.CONFIGURATION_BASED)
.with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM, true)
// Other configs from before
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(VitessConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class), true);

// After startup, we need to make sure we recover the schema history (ie with snapshot disabled)
// so that we can properly parse & emit a schema change event for this alter table statement.
TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);

String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);

int expectedTotalRecords = 4;
consumer = testConsumer(expectedTotalRecords);
consumer.expects(expectedTotalRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedTotalRecords; i++) {
SourceRecord record = consumer.remove();
assertThat(record.topic()).isEqualTo(schemaChangeTopic);
}
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void canDisableSchemaHistory() throws Exception {
String keyspace = TEST_SHARDED_KEYSPACE;
String table = keyspace + ".ddl_table";
TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
TestHelper.applyVSchema("vitess_vschema.json");

startConnector(config -> config
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER),
true);
assertConnectorIsRunning();

TestHelper.execute("INSERT INTO ddl_table (id) VALUES (1);", TEST_SHARDED_KEYSPACE);
String dataChangeTopic = String.join(".",
TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX),
keyspace,
"ddl_table");

int expectedDataChangeRecords = 1;
consumer = testConsumer(expectedDataChangeRecords);
consumer.expects(expectedDataChangeRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedDataChangeRecords; i++) {
SourceRecord record = consumer.remove();
assertThat(record.topic()).isEqualTo(dataChangeTopic);
}
assertThat(consumer.isEmpty());

stopConnector();

startConnector(config -> config
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NEVER),
true);

// After startup, we need to make sure we recover the schema history (ie with snapshot disabled)
// so that we can properly parse & emit a schema change event for this alter table statement.
TestHelper.execute("INSERT INTO ddl_table (id) VALUES (2);", TEST_SHARDED_KEYSPACE);
TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);

// We should not receive any DDLs
String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);

int expectedTotalRecords = 1;
consumer = testConsumer(expectedTotalRecords);
consumer.expects(expectedTotalRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedTotalRecords; i++) {
SourceRecord record = consumer.remove();
assertThat(record.topic()).isEqualTo(dataChangeTopic);
}
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception {
Expand Down

0 comments on commit f4935f9

Please sign in to comment.