Skip to content

Commit

Permalink
DBZ-6721 & DBZ-6722 Add transforms: filter transaction topic, remove …
Browse files Browse the repository at this point in the history
…field, add itests
  • Loading branch information
twthorn authored and jpechane committed Jun 4, 2024
1 parent fddc721 commit f7eb7f8
Show file tree
Hide file tree
Showing 6 changed files with 634 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.transforms;

import static io.debezium.config.CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE;

import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;

import io.debezium.Module;
import io.debezium.config.Configuration;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;

public class FilterTransactionTopicRecords<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {

private SchemaNameAdjuster schemaNameAdjuster;

@Override
public String version() {
return Module.version();
}

@Override
public R apply(R record) {
if (isTransactionTopicMessage(record)) {
return null;
}
return record;
}

private boolean isTransactionTopicMessage(R record) {
if (record.keySchema().equals(SchemaFactory.get().transactionKeySchema(schemaNameAdjuster)) &&
record.valueSchema().equals(SchemaFactory.get().transactionValueSchema(schemaNameAdjuster))) {
return true;
}
return false;
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props);
schemaNameAdjuster = VitessConnectorConfig.SchemaNameAdjustmentMode.parse(config.getString(SCHEMA_NAME_ADJUSTMENT_MODE))
.createAdjuster();
}
}
154 changes: 154 additions & 0 deletions src/main/java/io/debezium/connector/vitess/transforms/RemoveField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.transforms;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;

import io.debezium.Module;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.transforms.SmtManager;

public class RemoveField<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final String FIELD_DELIMITER = ",";

public static final String FIELD_NAMES_CONF = "field_names";

public static final Field FIELD_NAMES_FIELD = Field.create(FIELD_NAMES_CONF)
.withDisplayName("List of field names to remove, full path eg source.database or transaction.id")
.withType(ConfigDef.Type.LIST)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(RemoveField::validateRemoveFieldNames)
.withDescription(
"The comma-separated list of fields to remove, e.g., 'source.id', 'transaction.data_collection_order'");

protected List<String> fieldNames;

private static int validateRemoveFieldNames(Configuration configuration, Field field, Field.ValidationOutput problems) {
// ensure not empty and doesn't start with periods and doesn't end with periods
String fieldNames = configuration.getString(field);
if (fieldNames == null || fieldNames.isEmpty()) {
problems.accept(field, fieldNames, "Field names cannot be empty or null, must specify field names to drop");
return 1;
}
for (String fieldName : fieldNames.split(FIELD_DELIMITER)) {
if (fieldName.startsWith(".") || fieldName.endsWith(".")) {
problems.accept(field, fieldNames, "Field names cannot start or end with '.', must specify correct field name");
return 1;
}
}
return 0;
}

@Override
public R apply(R record) {
Struct value = (Struct) record.value();
Schema schema = record.valueSchema();
Schema updatedSchema = updateSchema("", schema);
Struct newValue = updateStruct("", updatedSchema, value);
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
updatedSchema, newValue, record.timestamp());
}

private Schema updateSchema(String fullName, Schema schema) {
// Create a schema builder
SchemaBuilder schemaBuilder = SchemaBuilder.struct().version(schema.version()).name(schema.name());
if (schema.isOptional()) {
schemaBuilder = schemaBuilder.optional();
}

// Iterate over fields in the original schema and add to the schema builder dynamically
for (org.apache.kafka.connect.data.Field field : schema.fields()) {
String currentFullName = !fullName.isEmpty() ? fullName + "." + field.name() : field.name();

if (field.schema().type() == Schema.Type.STRUCT) {
// If the field is a nested struct, recursively modify it and add its schema
Schema updatedNestedSchema = updateSchema(currentFullName, field.schema());
schemaBuilder.field(field.name(), updatedNestedSchema);
}
else {
if (!shouldExcludeField(currentFullName)) {
schemaBuilder.field(field.name(), field.schema());
}
}
}
return schemaBuilder.build();
}

private boolean shouldExcludeField(String fullFieldName) {
for (String fieldName : fieldNames) {
if (fullFieldName.equals(fieldName)) {
return true;
}
}
return false;
}

private Struct updateStruct(String fullName, Schema updatedSchema, Struct struct) {
// Create an updated struct
Struct updatedStruct = new Struct(updatedSchema);
for (org.apache.kafka.connect.data.Field field : updatedSchema.fields()) {
String currentFullName = fullName != "" ? fullName + "." + field.name() : field.name();
Object fieldValue = struct.get(field.name());
if (fieldValue instanceof Struct) {
// If a field is a struct recursively create its nested structs
Struct nestedStruct = (Struct) fieldValue;
Struct updatedNestedStruct = updateStruct(currentFullName, field.schema(), nestedStruct);
updatedStruct.put(field, updatedNestedStruct);
}
else {
if (!shouldExcludeField(currentFullName)) {
updatedStruct.put(field, fieldValue);
}
}
}
return updatedStruct;
}

@Override
public String version() {
return Module.version();
}

@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, FIELD_NAMES_FIELD);
return config;
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props);
SmtManager<R> smtManager = new SmtManager<>(config);
smtManager.validate(config, Field.setOf(FIELD_NAMES_FIELD));
fieldNames = determineRemoveFields(config);
}

private static List<String> determineRemoveFields(Configuration config) {
String fieldNamesString = config.getString(FIELD_NAMES_FIELD);
List<String> fieldNames = new ArrayList<>();
for (String fieldName : fieldNamesString.split(FIELD_DELIMITER)) {
fieldNames.add(fieldName);
}
return fieldNames;
}
}
133 changes: 133 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessRankProvider;
import io.debezium.connector.vitess.transforms.RemoveField;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope;
Expand Down Expand Up @@ -644,6 +645,138 @@ public void shouldUseLocalVgtid() throws Exception {
assertRecordEnd(expectedTxId1, expectedRecordsCount);
}

@Test
public void shouldProvideTransactionMetadataWithoutIdOrTransactionTopic() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with("transforms", "filterTransactionTopicRecords,removeField")
.with("transforms.filterTransactionTopicRecords.type",
"io.debezium.connector.vitess.transforms.FilterTransactionTopicRecords")
.with("transforms.removeField.type", "io.debezium.connector.vitess.transforms.RemoveField")
.with("transforms.removeField." + RemoveField.FIELD_NAMES_CONF, "transaction.id")
.with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class)
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true),
true,
"-80,80-");
assertConnectorIsRunning();

Vgtid baseVgtid = TestHelper.getCurrentVgtid();
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);

String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)";
String insertQuery = "INSERT INTO numeric_table ("
+ "tinyint_col,"
+ "tinyint_unsigned_col,"
+ "smallint_col,"
+ "smallint_unsigned_col,"
+ "mediumint_col,"
+ "mediumint_unsigned_col,"
+ "int_col,"
+ "int_unsigned_col,"
+ "bigint_col,"
+ "bigint_unsigned_col,"
+ "bigint_unsigned_overflow_col,"
+ "float_col,"
+ "double_col,"
+ "decimal_col,"
+ "boolean_col)"
+ " VALUES " + rowValue;
StringBuilder insertRows = new StringBuilder().append(insertQuery);
for (int i = 1; i < expectedRecordsCount; i++) {
insertRows.append(", ").append(rowValue);
}

String insertRowsStatement = insertRows.toString();

// exercise SUT
executeAndWait(insertRowsStatement, TEST_SHARDED_KEYSPACE);
// First transaction.
Long expectedEpoch = 0L;
for (int i = 1; i <= expectedRecordsCount; i++) {
SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD);
Struct source = (Struct) ((Struct) record.value()).get("source");
String shard = source.getString("shard");
String vgtid = source.getString("vgtid");
Vgtid actualVgtid = Vgtid.of(vgtid);
final Struct txn = ((Struct) record.value()).getStruct("transaction");
assertThat(txn.schema().field("id")).isNull();
assertThat(txn.get("transaction_epoch")).isEqualTo(expectedEpoch);
BigDecimal expectedRank = VitessRankProvider.getRank(actualVgtid.getShardGtid(shard).getGtid());
assertThat(txn.get("transaction_rank")).isEqualTo(expectedRank);
}
}

@Test
public void shouldProvideTransactionMetadataWithoutIdOrTransactionTopicAndUseLocalVgtid() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with("transforms", "filterTransactionTopicRecords,removeField,useLocalVgtid")
.with("transforms.filterTransactionTopicRecords.type",
"io.debezium.connector.vitess.transforms.FilterTransactionTopicRecords")
.with("transforms.removeField.type", "io.debezium.connector.vitess.transforms.RemoveField")
.with("transforms.removeField." + RemoveField.FIELD_NAMES_CONF, "transaction.id")
.with("transforms.useLocalVgtid.type", "io.debezium.connector.vitess.transforms.UseLocalVgtid")
.with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class)
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true),
true,
"-80,80-");
assertConnectorIsRunning();

Vgtid baseVgtid = TestHelper.getCurrentVgtid();
int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);

String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)";
String insertQuery = "INSERT INTO numeric_table ("
+ "tinyint_col,"
+ "tinyint_unsigned_col,"
+ "smallint_col,"
+ "smallint_unsigned_col,"
+ "mediumint_col,"
+ "mediumint_unsigned_col,"
+ "int_col,"
+ "int_unsigned_col,"
+ "bigint_col,"
+ "bigint_unsigned_col,"
+ "bigint_unsigned_overflow_col,"
+ "float_col,"
+ "double_col,"
+ "decimal_col,"
+ "boolean_col)"
+ " VALUES " + rowValue;
StringBuilder insertRows = new StringBuilder().append(insertQuery);
for (int i = 1; i < expectedRecordsCount; i++) {
insertRows.append(", ").append(rowValue);
}

String insertRowsStatement = insertRows.toString();

// exercise SUT
executeAndWait(insertRowsStatement, TEST_SHARDED_KEYSPACE);
// First transaction.
Long expectedEpoch = 0L;
for (int i = 1; i <= expectedRecordsCount; i++) {
SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD);
Struct source = (Struct) ((Struct) record.value()).get("source");
String shard = source.getString("shard");
Vgtid sourceVgtid = Vgtid.of(source.getString("vgtid"));
final Struct txn = ((Struct) record.value()).getStruct("transaction");
assertThat(txn.schema().field("id")).isNull();
assertThat(txn.get("transaction_epoch")).isEqualTo(expectedEpoch);
BigDecimal expectedRank = VitessRankProvider.getRank(sourceVgtid.getShardGtid(shard).getGtid());
assertThat(txn.get("transaction_rank")).isEqualTo(expectedRank);
assertThat(txn.get("total_order")).isEqualTo(1L);
// We have two shards for multi-shard keyspace, a local vgtid should only have one shard
assertThat(sourceVgtid.getShardGtids().size()).isEqualTo(1);
assertThat(sourceVgtid.getShardGtids().get(0).getShard()).isEqualTo(source.getString("shard"));
}
}

@Test
public void shouldIncrementEpochWhenFastForwardVgtidWithOrderedTransactionMetadata() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
Expand Down
Loading

0 comments on commit f7eb7f8

Please sign in to comment.