Skip to content

Commit

Permalink
DBZ-8325 Support override schema change topic & default include schem…
Browse files Browse the repository at this point in the history
…a changes to false
  • Loading branch information
twthorn committed Nov 8, 2024
1 parent 8f8c30e commit ad8dccb
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,37 @@
import io.debezium.util.Strings;

/**
* <p>
* Topic naming strategy where only the table name is added. This is used to avoid including
* the shard which is now part of the catalog of the table ID and would be included if
* the DefaultTopicNamingStrategy is being used.
*</p>
* Additionally, supports some Vitess-specific configs:
* <ul>
* <li>
* overrideDataChangeTopicPrefix: in the case of mulitple connectors for the same keyspace,
* a unique `topic.prefix` is required for proper metric reporting, so in order for a consistent topic
* naming convention, the data change topic prefix can be set here (typically shared between connectors of the same
* keyspace
* </li>
* <li>
* overrideSchemaChangeTopic: in the case of multiple connectors for the same keyspace and `include.schema.changes` being enabled,
* this is used to prevent the ddl events from being written to the topic named `topic.prefix`, which is the default behavior.
* The reason why this is necessary is that the `topic.prefix` may include the table name for uniqueness, so it may actually be the name
* of a data change topic.
* </li>
* </ul>
*/
public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy<TableId> {

private final String overrideDataChangeTopicPrefix;
private final String overrideSchemaChangeTopic;

public TableTopicNamingStrategy(Properties props) {
super(props);
Configuration config = Configuration.from(props);
this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX);
this.overrideSchemaChangeTopic = config.getString(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC);
}

public static TableTopicNamingStrategy create(CommonConnectorConfig config) {
Expand All @@ -44,4 +63,22 @@ public String dataChangeTopic(TableId id) {
}
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}

/**
* Return the schema change topic. There are two cases:
* 1. If override schema change topic is specified - use this as the topic name
* 2. If override schema change topic is not specified - call the super method to get the typical
* schema change topic name.
*
* @return String representing the schema change topic name.
*/
@Override
public String schemaChangeTopic() {
if (!Strings.isNullOrBlank(overrideSchemaChangeTopic)) {
return overrideSchemaChangeTopic;
}
else {
return super.schemaChangeTopic();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,20 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'false' (the default) omits the fields; "
+ "'true' converts the field into an implementation dependent binary representation.");

// Needed for backward compatibility, otherwise all upgraded connectors will start publishing schema change events
// by default.
public static final Field INCLUDE_SCHEMA_CHANGES = Field.create("include.schema.changes")
.withDisplayName("Include database schema changes")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 0))
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Whether the connector should publish changes in the database schema to a Kafka topic with "
+ "the same name as the database server ID. Each schema change will be recorded using a key that "
+ "contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s). "
+ "The default is 'true'. This is independent of how the connector internally records database schema history.")
.withDefault(false);

public static final Field OFFSET_STORAGE_PER_TASK = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.per.task")
.withDisplayName("Store offsets per task")
.withType(Type.BOOLEAN)
Expand All @@ -368,6 +382,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");

public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic")
.withDisplayName("Override schema change topic name")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx).");

public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen")
.withDisplayName("Offset storage task key generation number")
.withType(Type.INT)
Expand Down Expand Up @@ -479,6 +501,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GRPC_MAX_INBOUND_MESSAGE_SIZE,
BINARY_HANDLING_MODE,
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX,
OVERRIDE_SCHEMA_CHANGE_TOPIC,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OFFSET_STORAGE_TASK_KEY_GEN,
Expand Down Expand Up @@ -553,6 +576,11 @@ public String getConnectorName() {
return Module.name();
}

@Override
public boolean isSchemaChangesHistoryEnabled() {
return getConfig().getBoolean(INCLUDE_SCHEMA_CHANGES);
}

@Override
public TemporalPrecisionMode getTemporalPrecisionMode() {
return TemporalPrecisionMode.parse(getConfig().getString(TIME_PRECISION_MODE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,26 @@ public void shouldUseTopicPrefixIfOverrideIsBlank() {
assertThat(topicName).isEqualTo("prefix.table");
}

@Test
public void shouldGetOverrideSchemaChangeTopic() {
TableId tableId = new TableId("shard", "keyspace", "table");
final Properties props = new Properties();
props.put("topic.prefix", "prefix");
props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "override-prefix");
TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
String topicName = strategy.schemaChangeTopic();
assertThat(topicName).isEqualTo("override-prefix");
}

@Test
public void shouldUseTopicPrefixIfOverrideSchemaIsBlank() {
TableId tableId = new TableId("shard", "keyspace", "table");
final Properties props = new Properties();
props.put("topic.prefix", "prefix");
props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "");
TopicNamingStrategy strategy = new TableTopicNamingStrategy(props);
String topicName = strategy.schemaChangeTopic();
assertThat(topicName).isEqualTo("prefix");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,43 @@ public void shouldBlankOverrideTopicPrefixFailValidation() {
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldImproperOverrideSchemaTopicPrefixFailValidation() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "hello@world").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldUseSchemaTopicPrefix() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC,
"__debezium-ddl.dev.msgdata.precomputed_channel_summary_partitioned").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
assertThat(inputs.size()).isEqualTo(0);
}

@Test
public void shouldBlankOverrideSchemaTopicPrefixFailValidation() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldExcludeEmptyShards() {
Configuration configuration = TestHelper.defaultConfig().with(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio
TestHelper.executeDDL("vitess_create_tables.ddl");
// startConnector();
startConnector(config -> config
.with(CommonConnectorConfig.INCLUDE_SCHEMA_CHANGES, true),
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true),
false);
assertConnectorIsRunning();

Expand Down Expand Up @@ -252,6 +252,50 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void shouldSupportOverrideDataChangeAndSchemaChangeTopics() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
// startConnector();
String overrideDataChangeTopicPrefix = "data.alternate.prefix";
String overrideSchemaChangeTopic = "schema.alternate.topic";
startConnector(config -> config
.with(VitessConnectorConfig.TOPIC_NAMING_STRATEGY, TableTopicNamingStrategy.class)
.with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, overrideDataChangeTopicPrefix)
.with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, overrideSchemaChangeTopic)
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true),
false);
assertConnectorIsRunning();

String dataChangeTopic = String.join(".", overrideDataChangeTopicPrefix, "ddl_table");

String ddl = "ALTER TABLE ddl_table ADD COLUMN new_column_name INT";
TestHelper.execute("INSERT INTO ddl_table (id, int_unsigned_col, json_col) VALUES (1, 2, '{\"1\":2}');");
TestHelper.execute(ddl);

int expectedDataChangeRecords = 1;
int expectedSchemaChangeRecords = 1;
int expectedTotalRecords = expectedDataChangeRecords + expectedSchemaChangeRecords;
consumer = testConsumer(expectedTotalRecords);
consumer.expects(expectedTotalRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedTotalRecords; i++) {
SourceRecord record = consumer.remove();
Struct value = (Struct) record.value();
Struct source = (Struct) value.get("source");
assertThat(source.getString("table")).isEqualTo("ddl_table");
assertThat(source.getString("shard")).isEqualTo(TEST_SHARD);
if (i == 1) {
assertThat(record.topic()).isEqualTo(overrideSchemaChangeTopic);
assertThat(value.getString("ddl")).isEqualToIgnoringCase(ddl);
}
else {
assertThat(record.topic()).isEqualTo(dataChangeTopic);
}
}
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void shouldReceiveSchemaEventsShardedBeforeAnyDataEvents() throws Exception {
Expand Down

0 comments on commit ad8dccb

Please sign in to comment.