Skip to content

Commit

Permalink
DBZ-8325 Support store only capture tables setting
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 22, 2024
1 parent af9dbc5 commit e7ca2f5
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public List<SchemaChangeEvent> parseDdl(VitessPartition partition, VitessOffsetC
Instant timestsamp = ddlMessage.getCommitTime();
DdlChanges ddlChanges = ddlParser.getDdlChanges();
ddlChanges.reset();
// Include the shard in the database to ensure it is parsed separately for each shard
ddlParser.setCurrentDatabase(getDatabaseWithShard(shard, databaseName));
ddlParser.parse(ddlStatement, tables());
if (!ddlChanges.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class VitessSnapshotChangeEventSource extends RelationalSnapshotChangeEve
private final VitessConnectorConfig connectorConfig;
private final VitessDatabaseSchema schema;
private final VitessConnection connection;
private List<String> shards;

public VitessSnapshotChangeEventSource(
VitessConnectorConfig connectorConfig,
Expand All @@ -58,11 +59,11 @@ public VitessSnapshotChangeEventSource(
this.connectorConfig = connectorConfig;
this.connection = connectionFactory.mainConnection();
this.schema = schema;
this.shards = new VitessMetadata(connectorConfig).getShards();
}

@Override
protected Set<TableId> getAllTableIds(RelationalSnapshotContext<VitessPartition, VitessOffsetContext> snapshotContext) {
List<String> shards = new VitessMetadata(connectorConfig).getShards();
Set<TableId> tableIds = new HashSet<>();
try {
connection.query("SHOW TABLES", rs -> {
Expand Down Expand Up @@ -97,7 +98,6 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
VitessOffsetContext offsetContext, SnapshottingTask snapshottingTask)
throws Exception {
Set<TableId> capturedSchemaTables = snapshotContext.capturedSchemaTables;
List<String> shards = new VitessMetadata(connectorConfig).getShards();

for (TableId tableId : capturedSchemaTables) {
String sql = "SHOW CREATE TABLE " + quote(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) {
connectorConfig.getKeyspace());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id();
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableId, (receiver) -> {
TableId tableIdWithShard = VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), connectorConfig.getKeyspace(), tableId.table());
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, tableIdWithShard, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ public void shouldImproperOverrideSchemaTopicPrefixFailValidation() {
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldUseSchemaTopicPrefix() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC,
"__debezium-ddl.dev.msgdata.precomputed_channel_summary_partitioned").build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer);
assertThat(inputs.size()).isEqualTo(0);
}

@Test
public void shouldBlankOverrideSchemaTopicPrefixFailValidation() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "").build();
Expand Down
35 changes: 35 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,41 @@ public void shouldSnapshotSchemaAndReceiveSchemaEventsSharded() throws Exception
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void shouldSnapshotSchemaAndIgnoreOtherTables() throws Exception {
String keyspace = TEST_SHARDED_KEYSPACE;
String table = keyspace + ".ddl_table";
TestHelper.executeDDL("vitess_create_tables.ddl", keyspace);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(VitessConnectorConfig.TABLE_INCLUDE_LIST, table)
.with(VitessConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with(VitessConnectorConfig.SNAPSHOT_MODE, VitessConnectorConfig.SnapshotMode.NO_DATA)
.with(VitessConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class),
true);
assertConnectorIsRunning();

String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX);

TestHelper.execute("ALTER TABLE ddl_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);
TestHelper.execute("ALTER TABLE numeric_table ADD COLUMN new_column_name INT;", TEST_SHARDED_KEYSPACE);

// 1 for the snapshot (create table ddls)
// 1 for the change above (should ignore ddl of other table)
// 2 shards, so (1 + 1) * 2 = 4
int expectedSchemaChangeRecords = 4;
consumer = testConsumer(expectedSchemaChangeRecords);
consumer.expects(expectedSchemaChangeRecords);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
for (int i = 0; i < expectedSchemaChangeRecords; i++) {
SourceRecord record = consumer.remove();
assertThat(record.topic()).isEqualTo(schemaChangeTopic);
}
assertThat(consumer.isEmpty());
}

@Test
@FixFor("DBZ-8325")
public void shouldReceiveSnapshotAndSchemaChangeEvents() throws Exception {
Expand Down

0 comments on commit e7ca2f5

Please sign in to comment.