Skip to content

Commit

Permalink
DBZ-8154 Create separate OffsetValueType class to consolidate logic f…
Browse files Browse the repository at this point in the history
…or GTID & epoch
  • Loading branch information
twthorn committed Aug 16, 2024
1 parent 93d3e8c commit 8b6b95a
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 120 deletions.
116 changes: 116 additions & 0 deletions src/main/java/io/debezium/connector/vitess/OffsetValueType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;

public enum OffsetValueType {

GTID(SourceInfo.VGTID_KEY, OffsetValueType::parseGtid,
OffsetValueType::getVgtid, OffsetValueType::getConfigGtidsPerShard),
EPOCH(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, OffsetValueType::parseEpoch,
OffsetValueType::getShardEpochMap, OffsetValueType::getConfigShardEpochMapPerShard);

public final String name;
public final Function<String, Map<String, Object>> parserFunction;
public final BiFunction<Map<String, Object>, String, Object> conversionFunction;
public final BiFunction<VitessConnectorConfig, List<String>, Map<String, Object>> configValuesFunction;

OffsetValueType(String typeName, Function<String, Map<String, Object>> parserFunction,
BiFunction<Map<String, Object>, String, Object> conversionFunction,
BiFunction<VitessConnectorConfig, List<String>, Map<String, Object>> configValuesFunction) {
this.name = typeName;
this.parserFunction = parserFunction;
this.conversionFunction = conversionFunction;
this.configValuesFunction = configValuesFunction;
}

private static Map<String, Object> parseGtid(String vgtidStr) {
Map<String, Object> shardToGtid = new HashMap<>();
List<Vgtid.ShardGtid> shardGtids = Vgtid.of(vgtidStr).getShardGtids();
for (Vgtid.ShardGtid shardGtid : shardGtids) {
shardToGtid.put(shardGtid.getShard(), shardGtid.getGtid());
}
return shardToGtid;
}

private static Map<String, Object> parseEpoch(String epochString) {
ShardEpochMap shardToEpoch = ShardEpochMap.of(epochString);
return (Map) shardToEpoch.getMap();
}

/**
* Get the {@link ShardEpochMap} from this map of shards to epochs.
*
* @param epochMap Map of shards to epoch values
* @param keyspace Needed to match the function signature of getVgtid, ignored
* @return The {@link ShardEpochMap}
*/
static ShardEpochMap getShardEpochMap(Map<String, ?> epochMap, String keyspace) {
return new ShardEpochMap((Map<String, Long>) epochMap);
}

static Vgtid getVgtid(Map<String, ?> gtidsPerShard, String keyspace) {
List<Vgtid.ShardGtid> shardGtids = new ArrayList();
for (Map.Entry<String, ?> entry : gtidsPerShard.entrySet()) {
shardGtids.add(new Vgtid.ShardGtid(keyspace, entry.getKey(), (String) entry.getValue()));
}
return Vgtid.of(shardGtids);
}

static Map<String, Object> getConfigShardEpochMapPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
String shardEpochMapString = connectorConfig.getShardEpochMap();
Function<Integer, Long> initEpoch = x -> 0L;
Map<String, Long> shardEpochMap;
if (shardEpochMapString.isEmpty()) {
shardEpochMap = buildMap(shards, initEpoch);
}
else {
shardEpochMap = ShardEpochMap.of(shardEpochMapString).getMap();
}
return (Map) shardEpochMap;
}

static Map<String, Object> getConfigGtidsPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
String gtids = connectorConfig.getVgtid();
Map<String, String> configGtidsPerShard = null;
if (shards != null && gtids.equals(Vgtid.EMPTY_GTID)) {
Function<Integer, String> emptyGtid = x -> Vgtid.EMPTY_GTID;
configGtidsPerShard = buildMap(shards, emptyGtid);
}
else if (shards != null && gtids.equals(Vgtid.CURRENT_GTID)) {
Function<Integer, String> currentGtid = x -> Vgtid.CURRENT_GTID;
configGtidsPerShard = buildMap(shards, currentGtid);
}
else if (shards != null) {
List<Vgtid.ShardGtid> shardGtids = Vgtid.of(gtids).getShardGtids();
Map<String, String> shardsToGtid = new HashMap<>();
for (Vgtid.ShardGtid shardGtid : shardGtids) {
shardsToGtid.put(shardGtid.getShard(), shardGtid.getGtid());
}
Function<Integer, String> shardGtid = (i -> shardsToGtid.get(shards.get(i)));
configGtidsPerShard = buildMap(shards, shardGtid);
}
return (Map) configGtidsPerShard;
}

private static <T> Map<String, T> buildMap(List<String> keys, Function<Integer, T> function) {
return IntStream.range(0, keys.size())
.boxed()
.collect(Collectors.toMap(keys::get, function));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ public List<Map<String, String>> taskConfigs(int maxTasks, List<String> currentS
connectorConfig, tasks, gen, false, context().offsetStorageReader());

Map<String, String> gtidsPerShard = currentGen.getGtidPerShard();
validateCurrentGen(currentGen, gtidsPerShard, currentShards, VitessOffsetRetriever.ValueType.GTID);
validateCurrentGen(currentGen, gtidsPerShard, currentShards, OffsetValueType.GTID);
List<String> shards = determineShards(prevGtidsPerShard, gtidsPerShard, currentShards);

if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) {
Map<String, Long> prevEpochsPerShard = previousGen.getEpochPerShard();
validateNoLostShardData(prevEpochsPerShard, currentShards, "epochs");
Map<String, Long> epochsPerShard = currentGen.getEpochPerShard();
validateCurrentGen(currentGen, epochsPerShard, currentShards, VitessOffsetRetriever.ValueType.EPOCH);
validateCurrentGen(currentGen, epochsPerShard, currentShards, OffsetValueType.EPOCH);
List<String> shardsFromEpoch = determineShards(prevEpochsPerShard, epochsPerShard, currentShards);
if (!shardsFromEpoch.equals(shards)) {
throw new IllegalArgumentException(String.format(
Expand Down Expand Up @@ -180,7 +180,7 @@ private static void validateGeneration(Map<String, String> prevGtidsPerShard, in
}

private Map<String, ?> validateCurrentGen(VitessOffsetRetriever retriever, Map<String, ?> valuePerShard, List<String> currentShards,
VitessOffsetRetriever.ValueType valueType) {
OffsetValueType valueType) {
if (valuePerShard != null && !hasSameShards(valuePerShard.keySet(), currentShards)) {
LOGGER.warn("Some shards {} for the current generation {} are not persisted. Expected shards: {}",
valueType.name(), valuePerShard.keySet(), currentShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,15 @@
*/
package io.debezium.connector.vitess;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
Expand All @@ -29,7 +24,6 @@
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.metrics.VitessChangeEventSourceMetricsFactory;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
Expand Down Expand Up @@ -154,17 +148,17 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
public Configuration getConfigWithOffsets(Configuration config) {
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config);
if (connectorConfig.offsetStoragePerTask()) {
Object vgtid = getVitessTaskValuePerShard(connectorConfig, VitessOffsetRetriever.ValueType.GTID);
Object vgtid = getVitessTaskValuePerShard(connectorConfig, OffsetValueType.GTID);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, vgtid).build();
if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) {
Object shardEpochMap = getVitessTaskValuePerShard(connectorConfig, VitessOffsetRetriever.ValueType.EPOCH);
Object shardEpochMap = getVitessTaskValuePerShard(connectorConfig, OffsetValueType.EPOCH);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_SHARD_EPOCH_MAP_CONFIG, shardEpochMap).build();
}
}
return config;
}

private Object getVitessTaskValuePerShard(VitessConnectorConfig connectorConfig, VitessOffsetRetriever.ValueType valueType) {
private Object getVitessTaskValuePerShard(VitessConnectorConfig connectorConfig, OffsetValueType valueType) {
int gen = connectorConfig.getOffsetStorageTaskKeyGen();
int prevGen = gen - 1;
VitessOffsetRetriever prevGenRetriever = new VitessOffsetRetriever(
Expand All @@ -184,15 +178,7 @@ private Object getVitessTaskValuePerShard(VitessConnectorConfig connectorConfig,
Map<String, ?> curGenValuesPerShard = retriever.getValuePerShardFromStorage(valueType);
LOGGER.info("{} per shard {}", valueType.name(), curGenValuesPerShard);
List<String> shards = connectorConfig.getVitessTaskKeyShards();
Map<String, ?> configValuesPerShard = null;
switch (valueType) {
case GTID:
configValuesPerShard = getConfigGtidsPerShard(connectorConfig, shards);
break;
case EPOCH:
configValuesPerShard = getConfigShardEpochMapPerShard(connectorConfig, shards);
break;
}
Map<String, Object> configValuesPerShard = valueType.configValuesFunction.apply(connectorConfig, shards);
LOGGER.info("config {} per shard {}", valueType.name(), configValuesPerShard);
final String keyspace = connectorConfig.getKeyspace();

Expand All @@ -213,71 +199,7 @@ else if (prevGenValuesPerShard != null && prevGenValuesPerShard.containsKey(shar
}
valuesPerShard.put(shard, value);
}
switch (valueType) {
case GTID:
Map<String, String> gtidsPerShard = (Map) valuesPerShard;
return getVgtid(gtidsPerShard, keyspace);
case EPOCH:
Map<String, Long> epochsPerShard = (Map) valuesPerShard;
return getShardEpochMap(epochsPerShard);
default:
throw new DebeziumException(String.format("Unknown value type %s", valueType.name()));
}
}

private ShardEpochMap getShardEpochMap(Map<String, Long> epochMap) {
return new ShardEpochMap(epochMap);
}

private Vgtid getVgtid(Map<String, String> gtidsPerShard, String keyspace) {
List<Vgtid.ShardGtid> shardGtids = new ArrayList();
for (Map.Entry<String, String> entry : gtidsPerShard.entrySet()) {
shardGtids.add(new Vgtid.ShardGtid(keyspace, entry.getKey(), entry.getValue()));
}
return Vgtid.of(shardGtids);
}

private static Map<String, Long> getConfigShardEpochMapPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
String shardEpochMapString = connectorConfig.getShardEpochMap();
Function<Integer, Long> initEpoch = x -> 0L;
Map<String, Long> shardEpochMap;
if (shardEpochMapString.isEmpty()) {
shardEpochMap = buildMap(shards, initEpoch);
}
else {
shardEpochMap = ShardEpochMap.of(shardEpochMapString).getMap();
}
return shardEpochMap;
}

private static Map<String, String> getConfigGtidsPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
String gtids = connectorConfig.getVgtid();
Map<String, String> configGtidsPerShard = null;
if (shards != null && gtids.equals(Vgtid.EMPTY_GTID)) {
Function<Integer, String> emptyGtid = x -> Vgtid.EMPTY_GTID;
configGtidsPerShard = buildMap(shards, emptyGtid);
}
else if (shards != null && gtids.equals(Vgtid.CURRENT_GTID)) {
Function<Integer, String> currentGtid = x -> Vgtid.CURRENT_GTID;
configGtidsPerShard = buildMap(shards, currentGtid);
}
else if (shards != null) {
List<Vgtid.ShardGtid> shardGtids = Vgtid.of(gtids).getShardGtids();
Map<String, String> shardsToGtid = new HashMap<>();
for (Vgtid.ShardGtid shardGtid : shardGtids) {
shardsToGtid.put(shardGtid.getShard(), shardGtid.getGtid());
}
Function<Integer, String> shardGtid = (i -> shardsToGtid.get(shards.get(i)));
configGtidsPerShard = buildMap(shards, shardGtid);
}
LOGGER.info("Found GTIDs per shard in config {}", configGtidsPerShard);
return configGtidsPerShard;
}

private static <T> Map<String, T> buildMap(List<String> keys, Function<Integer, T> function) {
return IntStream.range(0, keys.size())
.boxed()
.collect(Collectors.toMap(keys::get, function));
return valueType.conversionFunction.apply(valuesPerShard, keyspace);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package io.debezium.connector.vitess;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
Expand All @@ -16,8 +15,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;

/**
Expand Down Expand Up @@ -50,43 +47,16 @@ public void setExpectsOffset(boolean expectsOffset) {
this.expectsOffset = expectsOffset;
}

public enum ValueType {
GTID(SourceInfo.VGTID_KEY, ValueType::parseGtid),
EPOCH(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, ValueType::parseEpoch);

private final String typeName;
private final Function<String, Map<String, Object>> parserFunction;

ValueType(String typeName, Function<String, Map<String, Object>> parserFunction) {
this.typeName = typeName;
this.parserFunction = parserFunction;
}

private static Map<String, Object> parseGtid(String vgtidStr) {
Map<String, Object> shardToGtid = new HashMap<>();
List<Vgtid.ShardGtid> shardGtids = Vgtid.of(vgtidStr).getShardGtids();
for (Vgtid.ShardGtid shardGtid : shardGtids) {
shardToGtid.put(shardGtid.getShard(), shardGtid.getGtid());
}
return shardToGtid;
}

private static Map<String, Object> parseEpoch(String epochString) {
ShardEpochMap shardToEpoch = ShardEpochMap.of(epochString);
return (Map) shardToEpoch.getMap();
}
}

public Map<String, String> getGtidPerShard() {
return (Map) getValuePerShardFromStorage(ValueType.GTID);
return (Map) getValuePerShardFromStorage(OffsetValueType.GTID);
}

public Map<String, Long> getEpochPerShard() {
return (Map) getValuePerShardFromStorage(ValueType.EPOCH);
return (Map) getValuePerShardFromStorage(OffsetValueType.EPOCH);
}

public Map<String, ?> getValuePerShardFromStorage(ValueType valueType) {
String key = valueType.typeName;
public Map<String, ?> getValuePerShardFromStorage(OffsetValueType valueType) {
String key = valueType.name;
Function<String, Map<String, Object>> valueReader = valueType.parserFunction;
return getValuePerShardFromStorage(
key,
Expand Down

0 comments on commit 8b6b95a

Please sign in to comment.