diff --git a/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java b/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java
index b6a437d4..75124a10 100644
--- a/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java
+++ b/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java
@@ -15,18 +15,37 @@
import io.debezium.util.Strings;
/**
+ *
* Topic naming strategy where only the table name is added. This is used to avoid including
* the shard which is now part of the catalog of the table ID and would be included if
* the DefaultTopicNamingStrategy is being used.
+ *
+ * Additionally, supports some Vitess-specific configs:
+ *
+ * -
+ * overrideDataChangeTopicPrefix: in the case of mulitple connectors for the same keyspace,
+ * a unique `topic.prefix` is required for proper metric reporting, so in order for a consistent topic
+ * naming convention, the data change topic prefix can be set here (typically shared between connectors of the same
+ * keyspace
+ *
+ * -
+ * overrideSchemaChangeTopic: in the case of multiple connectors for the same keyspace and `include.schema.changes` being enabled,
+ * this is used to prevent the ddl events from being written to the topic named `topic.prefix`, which is the default behavior.
+ * The reason why this is necessary is that the `topic.prefix` may include the table name for uniqueness, so it may actually be the name
+ * of a data change topic.
+ *
+ *
*/
public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy {
private final String overrideDataChangeTopicPrefix;
+ private final String overrideSchemaChangeTopic;
public TableTopicNamingStrategy(Properties props) {
super(props);
Configuration config = Configuration.from(props);
this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX);
+ this.overrideSchemaChangeTopic = config.getString(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC);
}
public static TableTopicNamingStrategy create(CommonConnectorConfig config) {
@@ -44,4 +63,22 @@ public String dataChangeTopic(TableId id) {
}
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}
+
+ /**
+ * Return the schema change topic. There are two cases:
+ * 1. If override schema change topic is specified - use this as the topic name
+ * 2. If override schema change topic is not specified - call the super method to get the typical
+ * schema change topic name.
+ *
+ * @return String representing the schema change topic name.
+ */
+ @Override
+ public String schemaChangeTopic() {
+ if (!Strings.isNullOrBlank(overrideSchemaChangeTopic)) {
+ return overrideSchemaChangeTopic;
+ }
+ else {
+ return super.schemaChangeTopic();
+ }
+ }
}
diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
index 6437cee9..22817fc3 100644
--- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
+++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java
@@ -348,6 +348,20 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'false' (the default) omits the fields; "
+ "'true' converts the field into an implementation dependent binary representation.");
+ // Needed for backward compatibility, otherwise all upgraded connectors will start publishing schema change events
+ // by default.
+ public static final Field INCLUDE_SCHEMA_CHANGES = Field.create("include.schema.changes")
+ .withDisplayName("Include database schema changes")
+ .withType(Type.BOOLEAN)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 0))
+ .withWidth(Width.SHORT)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription("Whether the connector should publish changes in the database schema to a Kafka topic with "
+ + "the same name as the database server ID. Each schema change will be recorded using a key that "
+ + "contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s). "
+ + "The default is 'true'. This is independent of how the connector internally records database schema history.")
+ .withDefault(false);
+
public static final Field OFFSET_STORAGE_PER_TASK = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.per.task")
.withDisplayName("Store offsets per task")
.withType(Type.BOOLEAN)
@@ -368,6 +382,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");
+ public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic")
+ .withDisplayName("Override schema change topic name")
+ .withType(Type.STRING)
+ .withWidth(Width.MEDIUM)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withValidation(CommonConnectorConfig::validateTopicName)
+ .withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx).");
+
public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen")
.withDisplayName("Offset storage task key generation number")
.withType(Type.INT)
@@ -479,6 +501,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GRPC_MAX_INBOUND_MESSAGE_SIZE,
BINARY_HANDLING_MODE,
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX,
+ OVERRIDE_SCHEMA_CHANGE_TOPIC,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OFFSET_STORAGE_TASK_KEY_GEN,
@@ -553,6 +576,11 @@ public String getConnectorName() {
return Module.name();
}
+ @Override
+ public boolean isSchemaChangesHistoryEnabled() {
+ return getConfig().getBoolean(INCLUDE_SCHEMA_CHANGES);
+ }
+
@Override
public TemporalPrecisionMode getTemporalPrecisionMode() {
return TemporalPrecisionMode.parse(getConfig().getString(TIME_PRECISION_MODE));
diff --git a/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java b/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java
index b1656036..0b02eb5f 100644
--- a/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java
+++ b/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java
@@ -52,4 +52,26 @@ public void shouldUseTopicPrefixIfOverrideIsBlank() {
assertThat(topicName).isEqualTo("prefix.table");
}
+ @Test
+ public void shouldGetOverrideSchemaChangeTopic() {
+ TableId tableId = new TableId("shard", "keyspace", "table");
+ final Properties props = new Properties();
+ props.put("topic.prefix", "prefix");
+ props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "override-prefix");
+ TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
+ String topicName = strategy.schemaChangeTopic();
+ assertThat(topicName).isEqualTo("override-prefix");
+ }
+
+ @Test
+ public void shouldUseTopicPrefixIfOverrideSchemaIsBlank() {
+ TableId tableId = new TableId("shard", "keyspace", "table");
+ final Properties props = new Properties();
+ props.put("topic.prefix", "prefix");
+ props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "");
+ TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
+ String topicName = strategy.schemaChangeTopic();
+ assertThat(topicName).isEqualTo("prefix");
+ }
+
}
diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java
index ab583331..b9b8d7a5 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java
@@ -75,6 +75,43 @@ public void shouldBlankOverrideTopicPrefixFailValidation() {
assertThat(inputs.size()).isEqualTo(1);
}
+ @Test
+ public void shouldImproperOverrideSchemaTopicPrefixFailValidation() {
+ Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "hello@world").build();
+ VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
+ List inputs = new ArrayList<>();
+ Consumer printConsumer = (input) -> {
+ inputs.add(input);
+ };
+ connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
+ assertThat(inputs.size()).isEqualTo(1);
+ }
+
+ @Test
+ public void shouldUseSchemaTopicPrefix() {
+ Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC,
+ "__debezium-ddl.dev.msgdata.precomputed_channel_summary_partitioned").build();
+ VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
+ List inputs = new ArrayList<>();
+ Consumer printConsumer = (input) -> {
+ inputs.add(input);
+ };
+ connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
+ assertThat(inputs.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void shouldBlankOverrideSchemaTopicPrefixFailValidation() {
+ Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "").build();
+ VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
+ List inputs = new ArrayList<>();
+ Consumer printConsumer = (input) -> {
+ inputs.add(input);
+ };
+ connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
+ assertThat(inputs.size()).isEqualTo(1);
+ }
+
@Test
public void shouldExcludeEmptyShards() {
Configuration configuration = TestHelper.defaultConfig().with(
diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
index 21b4d5b5..0403cbea 100644
--- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
+++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
@@ -215,7 +215,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio
TestHelper.executeDDL("vitess_create_tables.ddl");
// startConnector();
startConnector(config -> config
- .with(CommonConnectorConfig.INCLUDE_SCHEMA_CHANGES, true),
+ .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true),
false);
assertConnectorIsRunning();
@@ -252,6 +252,50 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio
assertThat(consumer.isEmpty());
}
+ @Test
+ @FixFor("DBZ-8325")
+ public void shouldSupportOverrideDataChangeAndSchemaChangeTopics() throws Exception {
+ TestHelper.executeDDL("vitess_create_tables.ddl");
+ // startConnector();
+ String overrideDataChangeTopicPrefix = "data.alternate.prefix";
+ String overrideSchemaChangeTopic = "schema.alternate.topic";
+ startConnector(config -> config
+ .with(VitessConnectorConfig.TOPIC_NAMING_STRATEGY, TableTopicNamingStrategy.class)
+ .with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, overrideDataChangeTopicPrefix)
+ .with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, overrideSchemaChangeTopic)
+ .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true),
+ false);
+ assertConnectorIsRunning();
+
+ String dataChangeTopic = String.join(".", overrideDataChangeTopicPrefix, "ddl_table");
+
+ String ddl = "ALTER TABLE ddl_table ADD COLUMN new_column_name INT";
+ TestHelper.execute("INSERT INTO ddl_table (id, int_unsigned_col, json_col) VALUES (1, 2, '{\"1\":2}');");
+ TestHelper.execute(ddl);
+
+ int expectedDataChangeRecords = 1;
+ int expectedSchemaChangeRecords = 1;
+ int expectedTotalRecords = expectedDataChangeRecords + expectedSchemaChangeRecords;
+ consumer = testConsumer(expectedTotalRecords);
+ consumer.expects(expectedTotalRecords);
+ consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
+ for (int i = 0; i < expectedTotalRecords; i++) {
+ SourceRecord record = consumer.remove();
+ Struct value = (Struct) record.value();
+ Struct source = (Struct) value.get("source");
+ assertThat(source.getString("table")).isEqualTo("ddl_table");
+ assertThat(source.getString("shard")).isEqualTo(TEST_SHARD);
+ if (i == 1) {
+ assertThat(record.topic()).isEqualTo(overrideSchemaChangeTopic);
+ assertThat(value.getString("ddl")).isEqualToIgnoringCase(ddl);
+ }
+ else {
+ assertThat(record.topic()).isEqualTo(dataChangeTopic);
+ }
+ }
+ assertThat(consumer.isEmpty());
+ }
+
@Test
@FixFor("DBZ-8325")
public void shouldReceiveSchemaEventsShardedBeforeAnyDataEvents() throws Exception {