Skip to content

Commit

Permalink
DBZ-7698 Refactor to single factory interface/class/config
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn authored and jpechane committed Apr 19, 2024
1 parent be3e0cf commit 31fbfef
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public VitessDatabaseSchema(
schemaNameAdjuster,
config.customConverterRegistry(),
config.getSourceInfoStructMaker().schema(),
config.getTransactionStructMaker().getTransactionBlockSchema(),
config.getTransactionMetadataFactory().getTransactionStructMaker().getTransactionBlockSchema(),
config.getFieldNamer(),
false),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static VitessOffsetContext initialContext(
LOGGER.info("No previous offset exists. Use default VGTID.");
final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig);
// use the other transaction context
TransactionContext transactionContext = connectorConfig.getTransactionContext();
TransactionContext transactionContext = connectorConfig.getTransactionMetadataFactory().getTransactionContext();
VitessOffsetContext context = new VitessOffsetContext(
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext);
return context;
Expand Down Expand Up @@ -148,7 +148,8 @@ public Loader(VitessConnectorConfig connectorConfig) {
public VitessOffsetContext load(Map<String, ?> offset) {
LOGGER.info("Previous offset exists, load from {}", offset);
final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY);
TransactionContext transactionContext = connectorConfig.getTransactionContext().newTransactionContextFromOffsets(offset);
TransactionContext transactionContext = connectorConfig.getTransactionMetadataFactory()
.getTransactionContext().newTransactionContextFromOffsets(offset);
return new VitessOffsetContext(
connectorConfig,
Vgtid.of(vgtid),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.pipeline.txmetadata;

import io.debezium.config.Configuration;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.pipeline.txmetadata.spi.TransactionMetadataFactory;

public class VitessOrderedTransactionMetadataFactory implements TransactionMetadataFactory {

private final Configuration configuraiton;

public VitessOrderedTransactionMetadataFactory(Configuration configuration) {
this.configuraiton = configuration;
}

@Override
public TransactionContext getTransactionContext() {
return new VitessOrderedTransactionContext();
}

@Override
public TransactionStructMaker getTransactionStructMaker() {
return new VitessOrderedTransactionStructMaker(configuraiton);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import io.debezium.config.Field;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessRankProvider;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.converters.spi.CloudEventsMaker;
Expand Down Expand Up @@ -481,8 +481,7 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class)
.with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class)
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true),
true,
"-80,80-");
Expand Down Expand Up @@ -564,10 +563,9 @@ public void shouldIncrementEpochWhenFastForwardVgtidWithOrderedTransactionMetada
SourceInfo.VGTID_KEY, currentVgtid);
Map<Map<String, ?>, Map<String, ?>> offsets = Map.of(srcPartition, offsetId);
Configuration config = TestHelper.defaultConfig()
.with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class)
.with(CommonConnectorConfig.TOPIC_PREFIX, TEST_SERVER)
.with(VitessConnectorConfig.KEYSPACE, TEST_SHARDED_KEYSPACE)
.with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class)
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(VitessConnectorConfig.SHARD, "-80,80-")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void shouldResetToNewVGgtid() {
public void shouldLoadVitessOrderedTransactionContext() throws JsonProcessingException {
VitessConnectorConfig config = new VitessConnectorConfig(
TestHelper.defaultConfig()
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.with(VitessConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class)
.build());
VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(config);
ObjectMapper objectMapper = new ObjectMapper();
Expand Down Expand Up @@ -136,7 +137,7 @@ public void shouldLoadVitessOrderedTransactionContext() throws JsonProcessingExc
public void shouldGetInitialVitessOrderedTransactionContext() {
VitessConnectorConfig config = new VitessConnectorConfig(
TestHelper.defaultConfig()
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.with(VitessConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class)
.build());
VitessOffsetContext context = VitessOffsetContext.initialContext(config, Clock.system());
TransactionContext transactionContext = context.getTransactionContext();
Expand Down

0 comments on commit 31fbfef

Please sign in to comment.