Skip to content

Commit

Permalink
DBZ-8154 Make order metadata epoch handle changes to task parallelism…
Browse files Browse the repository at this point in the history
… & shard set
  • Loading branch information
twthorn committed Aug 15, 2024
1 parent 3e629b8 commit 3b4ae8f
Show file tree
Hide file tree
Showing 23 changed files with 1,940 additions and 363 deletions.
5 changes: 0 additions & 5 deletions src/main/java/io/debezium/connector/vitess/Vgtid.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.DebeziumException;

import binlogdata.Binlogdata;

/** Vitess source position coordinates. */
Expand Down Expand Up @@ -112,9 +110,6 @@ public Vgtid getLocalVgtid(String shard) {

public ShardGtid getShardGtid(String shard) {
ShardGtid shardGtid = shardNameToShardGtid.get(shard);
if (shardGtid == null) {
throw new DebeziumException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString());
}
return shardGtid;
}

Expand Down
222 changes: 108 additions & 114 deletions src/main/java/io/debezium/connector/vitess/VitessConnector.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
Expand Down Expand Up @@ -248,6 +249,17 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ " If not configured, the connector streams changes from the latest position for the given shard(s)."
+ " If snapshot.mode is INITIAL (default), the connector starts copying the tables for the given shard(s) first regardless of gtid value.");

public static final Field SHARD_EPOCH_MAP = Field.create(VITESS_CONFIG_GROUP_PREFIX + "shard.epoch.map")
.withDisplayName("shard.epoch.map")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withDefault("")
.withImportance(ConfigDef.Importance.LOW)
.withValidation(VitessConnectorConfig::validateShardEpochMap)
.withDescription(
"ShardEpochMap to use for the initial epoch values for the given shards. If not configured the connector streams changes" +
"from a default value of 0.");

public static final Field GTID = Field.create(VITESS_CONFIG_GROUP_PREFIX + "gtid")
.withDisplayName("gtid")
.withType(Type.STRING)
Expand Down Expand Up @@ -492,11 +504,16 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
// VitessConnector will populate the value of this param and pass on to VitessConnectorTask
protected static final String VITESS_TASK_SHARDS_CONFIG = "vitess.task.shards";

// The vitess.task.shard.to.epoch config, the value is a JSON map with vitess shard names mapping
// to epoch values. The vitess connector will populate the value of this param and pass on to
// VitessConnectorTask when the TransactionContextFactory is set to VitessOrderedTransactionContext
public static final String VITESS_TASK_SHARD_EPOCH_MAP_CONFIG = "vitess.task.shard.epoch.map";

// The vgtid assigned to the given task in the json format, this is the same format as we would see
// in the Kafka offset storage.
// e.g. [{\"keyspace\":\"ks\",\"shard\":\"-80\",\"gtid\":\"MySQL56/0001:1-114\"},
// {\"keyspace\":\"ks\",\"shard\":\"80-\",\"gtid\":\"MySQL56/0002:1-122\"}]
protected static final String VITESS_TASK_VGTID_CONFIG = "vitess.task.vgtid";
public static final String VITESS_TASK_VGTID_CONFIG = "vitess.task.vgtid";

/**
* The set of {@link Field}s defined as part of this configuration.
Expand Down Expand Up @@ -553,6 +570,10 @@ public String getVgtid() {
return (value != null && !VGTID.defaultValueAsString().equals(value)) ? value : Vgtid.CURRENT_GTID;
}

public String getShardEpochMap() {
return getConfig().getString(SHARD_EPOCH_MAP);
}

public boolean excludeEmptyShards() {
return getConfig().getBoolean(EXCLUDE_EMPTY_SHARDS);
}
Expand Down Expand Up @@ -585,6 +606,22 @@ private static int validateVgtids(Configuration config, Field field, ValidationO
return 0;
}

private static int validateShardEpochMap(Configuration config, Field field, ValidationOutput problems) {
// Get the GTID as a string so that the default value is used if GTID is not set
String shardEpochMapString = config.getString(field);
if (shardEpochMapString.isEmpty()) {
return 0;
}
try {
ShardEpochMap shardEpochMap = ShardEpochMap.of(shardEpochMapString);
}
catch (IllegalStateException e) {
problems.accept(field, shardEpochMapString, "Shard epoch map string improperly formatted");
return 1;
}
return 0;
}

public String getVtgateHost() {
return getConfig().getString(VTGATE_HOST);
}
Expand Down Expand Up @@ -667,6 +704,10 @@ public List<String> getVitessTaskKeyShards() {
return getConfig().getStrings(VITESS_TASK_SHARDS_CONFIG, CSV_DELIMITER);
}

public ShardEpochMap getVitessTaskShardEpochMap() {
return ShardEpochMap.of(getConfig().getString(VITESS_TASK_SHARD_EPOCH_MAP_CONFIG));
}

public Vgtid getVitessTaskVgtid() {
String vgtidStr = getConfig().getString(VITESS_TASK_VGTID_CONFIG);
return vgtidStr == null ? null : Vgtid.of(vgtidStr);
Expand Down
136 changes: 95 additions & 41 deletions src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -27,6 +28,7 @@
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.metrics.VitessChangeEventSourceMetricsFactory;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
Expand Down Expand Up @@ -148,52 +150,104 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
}

@VisibleForTesting
protected Configuration getConfigWithOffsets(Configuration config) {
public Configuration getConfigWithOffsets(Configuration config) {
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config);
if (connectorConfig.offsetStoragePerTask()) {
int gen = connectorConfig.getOffsetStorageTaskKeyGen();
int prevGen = gen - 1;
Map<String, String> prevGtidsPerShard = VitessConnector.getGtidPerShardFromStorage(
context.offsetStorageReader(),
connectorConfig,
connectorConfig.getPrevNumTasks(),
prevGen,
true);
LOGGER.info("prevGtidsPerShard {}", prevGtidsPerShard);
Map<String, String> gtidsPerShard = VitessConnector.getGtidPerShardFromStorage(
context.offsetStorageReader(),
connectorConfig,
connectorConfig.getVitessTotalTasksConfig(),
gen,
false);
LOGGER.info("gtidsPerShard {}", gtidsPerShard);
List<String> shards = connectorConfig.getVitessTaskKeyShards();
Map<String, String> configGtidsPerShard = getConfigGtidsPerShard(connectorConfig, shards);
LOGGER.info("configGtidsPerShard {}", configGtidsPerShard);
final String keyspace = connectorConfig.getKeyspace();

List<Vgtid.ShardGtid> shardGtids = new ArrayList<>();
for (String shard : shards) {
String gtidStr;
if (gtidsPerShard.containsKey(shard)) {
gtidStr = gtidsPerShard.get(shard);
LOGGER.info("Using offsets from current gen: shard {}, gen {}, gtid {}", shard, gen, gtidStr);
}
else if (prevGtidsPerShard.containsKey(shard)) {
gtidStr = prevGtidsPerShard.get(shard);
LOGGER.warn("Using offsets from previous gen: shard {}, gen {}, gtid {}", shard, gen, gtidStr);
}
else {
gtidStr = configGtidsPerShard.getOrDefault(shard, null);
LOGGER.warn("Using offsets from config: shard {}, gtid {}", shard, gtidStr);
}
shardGtids.add(new Vgtid.ShardGtid(keyspace, shard, gtidStr));
config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.GTID);
if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) {
config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.EPOCH);
}
}
return config;
}

private Configuration getVitessTaskValuePerShard(Configuration config, VitessConnectorConfig connectorConfig, VitessOffsetRetriever.ValueType valueType) {
int gen = connectorConfig.getOffsetStorageTaskKeyGen();
int prevGen = gen - 1;
VitessOffsetRetriever prevGenRetriever = new VitessOffsetRetriever(
connectorConfig,
connectorConfig.getPrevNumTasks(),
prevGen,
true,
context.offsetStorageReader());
Map<String, ?> prevGenValuesPerShard = prevGenRetriever.getValuePerShardFromStorage(valueType);
LOGGER.info("{} per shard: {}", valueType.name(), prevGenValuesPerShard);
VitessOffsetRetriever retriever = new VitessOffsetRetriever(
connectorConfig,
connectorConfig.getVitessTotalTasksConfig(),
gen,
false,
context.offsetStorageReader());
Map<String, ?> curGenValuesPerShard = retriever.getValuePerShardFromStorage(valueType);
LOGGER.info("{} per shard {}", valueType.name(), curGenValuesPerShard);
List<String> shards = connectorConfig.getVitessTaskKeyShards();
Map<String, ?> configValuesPerShard = null;
switch (valueType) {
case GTID:
configValuesPerShard = getConfigGtidsPerShard(connectorConfig, shards);
break;
case EPOCH:
configValuesPerShard = getConfigShardEpochMapPerShard(connectorConfig, shards);
break;
}
LOGGER.info("config {} per shard {}", valueType.name(), configValuesPerShard);
final String keyspace = connectorConfig.getKeyspace();

Map<String, Object> valuesPerShard = new TreeMap();
for (String shard : shards) {
Object value;
if (curGenValuesPerShard != null && curGenValuesPerShard.containsKey(shard)) {
value = curGenValuesPerShard.get(shard);
LOGGER.info("Using offsets from current gen: shard {}, gen {}, {} {}", shard, gen, valueType.name(), value);
}
else if (prevGenValuesPerShard != null && prevGenValuesPerShard.containsKey(shard)) {
value = prevGenValuesPerShard.get(shard);
LOGGER.warn("Using offsets from previous gen: shard {}, gen {}, {} {}", shard, gen, valueType.name(), value);
}
else {
value = configValuesPerShard.getOrDefault(shard, null);
LOGGER.warn("Using offsets from config: shard {}, {} {}", shard, valueType.name(), valueType.name(), value);
}
return config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, Vgtid.of(shardGtids)).build();
valuesPerShard.put(shard, value);
}
switch (valueType) {
case GTID:
Map<String, String> gtidsPerShard = (Map) valuesPerShard;
Vgtid vgtid = getVgtid(gtidsPerShard, keyspace);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, vgtid).build();
break;
case EPOCH:
Map<String, Long> epochsPerShard = (Map) valuesPerShard;
ShardEpochMap shardEpochMap = getShardEpochMap(epochsPerShard);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_SHARD_EPOCH_MAP_CONFIG, shardEpochMap).build();
break;
}
return config;
}

private ShardEpochMap getShardEpochMap(Map<String, Long> epochMap) {
return new ShardEpochMap(epochMap);
}

private Vgtid getVgtid(Map<String, String> gtidsPerShard, String keyspace) {
List<Vgtid.ShardGtid> shardGtids = new ArrayList();
for (Map.Entry<String, String> entry : gtidsPerShard.entrySet()) {
shardGtids.add(new Vgtid.ShardGtid(keyspace, entry.getKey(), entry.getValue()));
}
return Vgtid.of(shardGtids);
}

private static Map<String, Long> getConfigShardEpochMapPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
String shardEpochMapString = connectorConfig.getShardEpochMap();
Function<Integer, Long> initEpoch = x -> 0L;
Map<String, Long> shardEpochMap;
if (shardEpochMapString.isEmpty()) {
shardEpochMap = buildMap(shards, initEpoch);
}
else {
return config;
shardEpochMap = ShardEpochMap.of(shardEpochMapString).getMap();
}
return shardEpochMap;
}

private static Map<String, String> getConfigGtidsPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
Expand All @@ -220,7 +274,7 @@ else if (shards != null) {
return configGtidsPerShard;
}

private static Map<String, String> buildMap(List<String> keys, Function<Integer, String> function) {
private static <T> Map<String, T> buildMap(List<String> keys, Function<Integer, T> function) {
return IntStream.range(0, keys.size())
.boxed()
.collect(Collectors.toMap(keys::get, function));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ public VitessOffsetContext(
this.transactionContext = transactionContext;
}

/** Initialize VitessOffsetContext if no previous offset exists */
/**
* Initialize VitessOffsetContext if no previous offset exists. This happens if either
* 1. This is a new connector
* 2. The generation has changed
*
* @param connectorConfig
* @param clock
* @return
*/
public static VitessOffsetContext initialContext(
VitessConnectorConfig connectorConfig, Clock clock) {
LOGGER.info("No previous offset exists. Use default VGTID.");
Expand Down Expand Up @@ -144,6 +152,14 @@ public Loader(VitessConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}

/**
* Loads the previously stored offsets for vgtid & transaction context (e.g., epoch).
* If offset storage per task mode is enabled, this is called only if there is no generation change.
* If offset storage per task mode is disabled, this is called only if previous offsets exist.
*
* @param offset
* @return
*/
@Override
public VitessOffsetContext load(Map<String, ?> offset) {
LOGGER.info("Previous offset exists, load from {}", offset);
Expand Down
Loading

0 comments on commit 3b4ae8f

Please sign in to comment.