Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8154 Make order metadata epoch handle changes to task parallelism & shard set #207

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/main/java/io/debezium/connector/vitess/Vgtid.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.DebeziumException;

import binlogdata.Binlogdata;

/** Vitess source position coordinates. */
Expand Down Expand Up @@ -112,9 +110,6 @@ public Vgtid getLocalVgtid(String shard) {

public ShardGtid getShardGtid(String shard) {
ShardGtid shardGtid = shardNameToShardGtid.get(shard);
if (shardGtid == null) {
throw new DebeziumException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString());
}
return shardGtid;
}

Expand Down
222 changes: 108 additions & 114 deletions src/main/java/io/debezium/connector/vitess/VitessConnector.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
Expand Down Expand Up @@ -248,6 +249,17 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ " If not configured, the connector streams changes from the latest position for the given shard(s)."
+ " If snapshot.mode is INITIAL (default), the connector starts copying the tables for the given shard(s) first regardless of gtid value.");

public static final Field SHARD_EPOCH_MAP = Field.create(VITESS_CONFIG_GROUP_PREFIX + "shard.epoch.map")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be a big map to pass in and potentially error-prone, when will people pass in this epoch on command line? It this config mainly used for carry the current epoch values from a running system?

Copy link
Contributor Author

@twthorn twthorn Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has two benefits

  1. The logic between the config VGTID (which is similarly large & error prone) and this config can be the same, so our refactored functions can have the same logic
  2. If someone is migrating to a new connector for whatever reason, and they do not want to lose all epoch values (ie preserve consistency, no need to re-bootstrap everything), they can pass in the shard epoch map.

.withDisplayName("shard.epoch.map")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withDefault("")
.withImportance(ConfigDef.Importance.LOW)
.withValidation(VitessConnectorConfig::validateShardEpochMap)
.withDescription(
"ShardEpochMap to use for the initial epoch values for the given shards. If not configured the connector streams changes" +
"from a default value of 0.");

public static final Field GTID = Field.create(VITESS_CONFIG_GROUP_PREFIX + "gtid")
.withDisplayName("gtid")
.withType(Type.STRING)
Expand Down Expand Up @@ -492,11 +504,16 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
// VitessConnector will populate the value of this param and pass on to VitessConnectorTask
protected static final String VITESS_TASK_SHARDS_CONFIG = "vitess.task.shards";

// The vitess.task.shard.to.epoch config, the value is a JSON map with vitess shard names mapping
// to epoch values. The vitess connector will populate the value of this param and pass on to
// VitessConnectorTask when the TransactionContextFactory is set to VitessOrderedTransactionContext
public static final String VITESS_TASK_SHARD_EPOCH_MAP_CONFIG = "vitess.task.shard.epoch.map";

// The vgtid assigned to the given task in the json format, this is the same format as we would see
// in the Kafka offset storage.
// e.g. [{\"keyspace\":\"ks\",\"shard\":\"-80\",\"gtid\":\"MySQL56/0001:1-114\"},
// {\"keyspace\":\"ks\",\"shard\":\"80-\",\"gtid\":\"MySQL56/0002:1-122\"}]
protected static final String VITESS_TASK_VGTID_CONFIG = "vitess.task.vgtid";
public static final String VITESS_TASK_VGTID_CONFIG = "vitess.task.vgtid";

/**
* The set of {@link Field}s defined as part of this configuration.
Expand Down Expand Up @@ -553,6 +570,10 @@ public String getVgtid() {
return (value != null && !VGTID.defaultValueAsString().equals(value)) ? value : Vgtid.CURRENT_GTID;
}

public String getShardEpochMap() {
return getConfig().getString(SHARD_EPOCH_MAP);
}

public boolean excludeEmptyShards() {
return getConfig().getBoolean(EXCLUDE_EMPTY_SHARDS);
}
Expand Down Expand Up @@ -585,6 +606,22 @@ private static int validateVgtids(Configuration config, Field field, ValidationO
return 0;
}

private static int validateShardEpochMap(Configuration config, Field field, ValidationOutput problems) {
// Get the GTID as a string so that the default value is used if GTID is not set
String shardEpochMapString = config.getString(field);
if (shardEpochMapString.isEmpty()) {
return 0;
}
try {
ShardEpochMap shardEpochMap = ShardEpochMap.of(shardEpochMapString);
}
catch (IllegalStateException e) {
problems.accept(field, shardEpochMapString, "Shard epoch map string improperly formatted");
return 1;
}
return 0;
}

public String getVtgateHost() {
return getConfig().getString(VTGATE_HOST);
}
Expand Down Expand Up @@ -667,6 +704,10 @@ public List<String> getVitessTaskKeyShards() {
return getConfig().getStrings(VITESS_TASK_SHARDS_CONFIG, CSV_DELIMITER);
}

public ShardEpochMap getVitessTaskShardEpochMap() {
return ShardEpochMap.of(getConfig().getString(VITESS_TASK_SHARD_EPOCH_MAP_CONFIG));
}

public Vgtid getVitessTaskVgtid() {
String vgtidStr = getConfig().getString(VITESS_TASK_VGTID_CONFIG);
return vgtidStr == null ? null : Vgtid.of(vgtidStr);
Expand Down
136 changes: 95 additions & 41 deletions src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -27,6 +28,7 @@
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.metrics.VitessChangeEventSourceMetricsFactory;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
Expand Down Expand Up @@ -148,52 +150,104 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
}

@VisibleForTesting
protected Configuration getConfigWithOffsets(Configuration config) {
public Configuration getConfigWithOffsets(Configuration config) {
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config);
if (connectorConfig.offsetStoragePerTask()) {
int gen = connectorConfig.getOffsetStorageTaskKeyGen();
int prevGen = gen - 1;
Map<String, String> prevGtidsPerShard = VitessConnector.getGtidPerShardFromStorage(
context.offsetStorageReader(),
connectorConfig,
connectorConfig.getPrevNumTasks(),
prevGen,
true);
LOGGER.info("prevGtidsPerShard {}", prevGtidsPerShard);
Map<String, String> gtidsPerShard = VitessConnector.getGtidPerShardFromStorage(
context.offsetStorageReader(),
connectorConfig,
connectorConfig.getVitessTotalTasksConfig(),
gen,
false);
LOGGER.info("gtidsPerShard {}", gtidsPerShard);
List<String> shards = connectorConfig.getVitessTaskKeyShards();
Map<String, String> configGtidsPerShard = getConfigGtidsPerShard(connectorConfig, shards);
LOGGER.info("configGtidsPerShard {}", configGtidsPerShard);
final String keyspace = connectorConfig.getKeyspace();

List<Vgtid.ShardGtid> shardGtids = new ArrayList<>();
for (String shard : shards) {
String gtidStr;
if (gtidsPerShard.containsKey(shard)) {
gtidStr = gtidsPerShard.get(shard);
LOGGER.info("Using offsets from current gen: shard {}, gen {}, gtid {}", shard, gen, gtidStr);
}
else if (prevGtidsPerShard.containsKey(shard)) {
gtidStr = prevGtidsPerShard.get(shard);
LOGGER.warn("Using offsets from previous gen: shard {}, gen {}, gtid {}", shard, gen, gtidStr);
}
else {
gtidStr = configGtidsPerShard.getOrDefault(shard, null);
LOGGER.warn("Using offsets from config: shard {}, gtid {}", shard, gtidStr);
}
shardGtids.add(new Vgtid.ShardGtid(keyspace, shard, gtidStr));
config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.GTID);
if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like should be if ... else structure otherwise line 156 was executed in vain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getVitessTaskValuePerShard actually modifies the map (since it passes it in as an arg). However, that is not clear (not well named) and better to not have hidden side effects on the config. I updated so it simply returns the value and then the config editing is done in this function.

config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.EPOCH);
}
}
return config;
}

private Configuration getVitessTaskValuePerShard(Configuration config, VitessConnectorConfig connectorConfig, VitessOffsetRetriever.ValueType valueType) {
int gen = connectorConfig.getOffsetStorageTaskKeyGen();
int prevGen = gen - 1;
VitessOffsetRetriever prevGenRetriever = new VitessOffsetRetriever(
connectorConfig,
connectorConfig.getPrevNumTasks(),
prevGen,
true,
context.offsetStorageReader());
Map<String, ?> prevGenValuesPerShard = prevGenRetriever.getValuePerShardFromStorage(valueType);
LOGGER.info("{} per shard: {}", valueType.name(), prevGenValuesPerShard);
VitessOffsetRetriever retriever = new VitessOffsetRetriever(
connectorConfig,
connectorConfig.getVitessTotalTasksConfig(),
gen,
false,
context.offsetStorageReader());
Map<String, ?> curGenValuesPerShard = retriever.getValuePerShardFromStorage(valueType);
LOGGER.info("{} per shard {}", valueType.name(), curGenValuesPerShard);
List<String> shards = connectorConfig.getVitessTaskKeyShards();
Map<String, ?> configValuesPerShard = null;
switch (valueType) {
case GTID:
configValuesPerShard = getConfigGtidsPerShard(connectorConfig, shards);
break;
case EPOCH:
configValuesPerShard = getConfigShardEpochMapPerShard(connectorConfig, shards);
break;
}
LOGGER.info("config {} per shard {}", valueType.name(), configValuesPerShard);
final String keyspace = connectorConfig.getKeyspace();

Map<String, Object> valuesPerShard = new TreeMap();
for (String shard : shards) {
Object value;
if (curGenValuesPerShard != null && curGenValuesPerShard.containsKey(shard)) {
value = curGenValuesPerShard.get(shard);
LOGGER.info("Using offsets from current gen: shard {}, gen {}, {} {}", shard, gen, valueType.name(), value);
}
else if (prevGenValuesPerShard != null && prevGenValuesPerShard.containsKey(shard)) {
value = prevGenValuesPerShard.get(shard);
LOGGER.warn("Using offsets from previous gen: shard {}, gen {}, {} {}", shard, gen, valueType.name(), value);
}
else {
value = configValuesPerShard.getOrDefault(shard, null);
LOGGER.warn("Using offsets from config: shard {}, {} {}", shard, valueType.name(), valueType.name(), value);
}
return config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, Vgtid.of(shardGtids)).build();
valuesPerShard.put(shard, value);
}
switch (valueType) {
case GTID:
Map<String, String> gtidsPerShard = (Map) valuesPerShard;
Vgtid vgtid = getVgtid(gtidsPerShard, keyspace);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, vgtid).build();
break;
case EPOCH:
Map<String, Long> epochsPerShard = (Map) valuesPerShard;
ShardEpochMap shardEpochMap = getShardEpochMap(epochsPerShard);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_SHARD_EPOCH_MAP_CONFIG, shardEpochMap).build();
break;
}
return config;
}

private ShardEpochMap getShardEpochMap(Map<String, Long> epochMap) {
return new ShardEpochMap(epochMap);
}

private Vgtid getVgtid(Map<String, String> gtidsPerShard, String keyspace) {
List<Vgtid.ShardGtid> shardGtids = new ArrayList();
for (Map.Entry<String, String> entry : gtidsPerShard.entrySet()) {
shardGtids.add(new Vgtid.ShardGtid(keyspace, entry.getKey(), entry.getValue()));
}
return Vgtid.of(shardGtids);
}

private static Map<String, Long> getConfigShardEpochMapPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
String shardEpochMapString = connectorConfig.getShardEpochMap();
Function<Integer, Long> initEpoch = x -> 0L;
Map<String, Long> shardEpochMap;
if (shardEpochMapString.isEmpty()) {
shardEpochMap = buildMap(shards, initEpoch);
}
else {
return config;
shardEpochMap = ShardEpochMap.of(shardEpochMapString).getMap();
}
return shardEpochMap;
}

private static Map<String, String> getConfigGtidsPerShard(VitessConnectorConfig connectorConfig, List<String> shards) {
Expand All @@ -220,7 +274,7 @@ else if (shards != null) {
return configGtidsPerShard;
}

private static Map<String, String> buildMap(List<String> keys, Function<Integer, String> function) {
private static <T> Map<String, T> buildMap(List<String> keys, Function<Integer, T> function) {
return IntStream.range(0, keys.size())
.boxed()
.collect(Collectors.toMap(keys::get, function));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ public VitessOffsetContext(
this.transactionContext = transactionContext;
}

/** Initialize VitessOffsetContext if no previous offset exists */
/**
* Initialize VitessOffsetContext if no previous offset exists. This happens if either
* 1. This is a new connector
* 2. The generation has changed
*
* @param connectorConfig
* @param clock
* @return
*/
public static VitessOffsetContext initialContext(
VitessConnectorConfig connectorConfig, Clock clock) {
LOGGER.info("No previous offset exists. Use default VGTID.");
Expand Down Expand Up @@ -144,6 +152,14 @@ public Loader(VitessConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}

/**
* Loads the previously stored offsets for vgtid & transaction context (e.g., epoch).
* If offset storage per task mode is enabled, this is called only if there is no generation change.
* If offset storage per task mode is disabled, this is called only if previous offsets exist.
*
* @param offset
* @return
*/
@Override
public VitessOffsetContext load(Map<String, ?> offset) {
LOGGER.info("Previous offset exists, load from {}", offset);
Expand Down
Loading