Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8163 Add inherit epoch feature #208

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
Expand Down Expand Up @@ -301,6 +302,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withDescription("Control StopOnReshard VStream flag."
+ " If set true, the old VStream will be stopped after a reshard operation.");

public static final Field INHERIT_EPOCH = Field.create(VITESS_CONFIG_GROUP_PREFIX + "inherit_epoch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final Field INHERIT_EPOCH = Field.create(VITESS_CONFIG_GROUP_PREFIX + "inherit_epoch")
public static final Field INHERIT_EPOCH = Field.create(VITESS_CONFIG_GROUP_PREFIX + "inherit.epoch")

.withDisplayName("Inherit epoch")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(VitessConnectorConfig::validateInheritEpoch)
.withDescription("Controls whether the epochs of a new shard after a re-shard operation inherits epochs from its parent shards");

public static final Field KEEPALIVE_INTERVAL_MS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "keepalive.interval.ms")
.withDisplayName("VStream gRPC keepalive interval (ms)")
.withType(Type.LONG)
Expand Down Expand Up @@ -622,6 +632,15 @@ private static int validateShardEpochMap(Configuration config, Field field, Vali
return 0;
}

private static int validateInheritEpoch(Configuration config, Field field, ValidationOutput problems) {
Boolean inheritEpoch = config.getBoolean(field);
String factory = config.getString(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY);
if (inheritEpoch && !factory.equals(VitessOrderedTransactionMetadataFactory.class.getName())) {
problems.accept(field, inheritEpoch, "Inherit epoch cannot be enabled without VitessOrderedTransactionMetadataFactory");
}
return 0;
}

public String getVtgateHost() {
return getConfig().getString(VTGATE_HOST);
}
Expand All @@ -646,6 +665,10 @@ public boolean getStopOnReshard() {
return getConfig().getBoolean(STOP_ON_RESHARD_FLAG);
}

public boolean getInheritEpoch() {
return getConfig().getBoolean(INHERIT_EPOCH);
}

public Duration getKeepaliveInterval() {
return getConfig().getDuration(KEEPALIVE_INTERVAL_MS, ChronoUnit.MILLIS);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.pipeline.txmetadata;

import java.util.Map;

/**
* Class used to determine which parents a shard range descended from. Used to set the epoch to the succeediing
* epoch of its parents.
*/
public class ShardLineage {

/**
* Return the epoch value of the shard, based on its parents epochs.
* If there are parents present, return the max of the parent epochs plus one.
* If there are no parents present, it returns zero.
*
* @param shardString The descendant shard to find parents of
* @param shardEpochMap The map to search for parents
* @return The epoch value of the descendant shard
*/
public static Long getInheritedEpoch(String shardString, ShardEpochMap shardEpochMap) {
Shard shard = new Shard(shardString);

Long maxParentEpoch = -1L;
for (Map.Entry<String, Long> shardEpoch : shardEpochMap.getMap().entrySet()) {
String currentShardString = shardEpoch.getKey();
Long currentEpoch = shardEpoch.getValue();
Shard currentShard = new Shard(currentShardString);
if (shard.overlaps(currentShard)) {
maxParentEpoch = Math.max(maxParentEpoch, currentEpoch);
}
}

return maxParentEpoch + 1;
}

private static class Shard {

// A string lexicographically less than all other strings
public static final String NEGATIVE_INFINITY = "";
// A string lexicographically greater than all other strings
public static final String POSITIVE_INFINITY = "\uFFFF";
Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4F long/big enough, do we need 8F? or have an assert if we sees the upper bound is more than 4 characters long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this \uFFFF is actually a single special char that is greater than all other chars. I used the Character package to make this clear (it is the MAX_VALUE). And it turns out there is MIN_VALUE which is another special char (more clear than empty string). I updated the comments to describe this too.


private final String lowerBound;
private final String upperBound;

Shard(String shard) {
String[] shardInterval = getShardInterval(shard.toLowerCase());
this.lowerBound = getLowerBound(shardInterval);
this.upperBound = getUpperBound(shardInterval);
validateBounds();
}

private void validateBounds() {
if (this.lowerBound.compareTo(this.upperBound) >= 0) {
throw new IllegalArgumentException("Invalid shard range " + this);
}
}

public boolean overlaps(Shard shard) {
return this.lowerBound.compareTo(shard.upperBound) < 0 && this.upperBound.compareTo(shard.lowerBound) > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the situation when the two ranges meet on the boundary ( == 0 case)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vitess ranges are have inclusive lower bounds and exclusive upper bounds so this should be correct (docs). We can think about each part:

  1. this.lowerBound.compareTo(shard.upperBound) == 0 the lower bound is the same as the upper bound we are comparing to. Since the upper bound is exclusive, this interval does not overlap.
  2. this.upperBound.compareTo(shard.lowerBound) == 0 the upper bound is the same as the lower bound we are comparing to. Since the upper bound is exclusive, these two intervals do not overlap.

I added a test to demonstrate this case.

}

private static String getLowerBound(String[] shardInterval) {
if (shardInterval.length < 1 || shardInterval[0].isEmpty()) {
return NEGATIVE_INFINITY;
}
return shardInterval[0];
}

private static String getUpperBound(String[] shardInterval) {
if (shardInterval.length != 2 || shardInterval[1].isEmpty()) {
return POSITIVE_INFINITY;
}
return shardInterval[1];
}

private static String[] getShardInterval(String shard) {
return shard.split("-");
}

@Override
public String toString() {
return "Shard{" +
"lowerBound=" + lowerBound +
", upperBound=" + upperBound +
"}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ public class VitessEpochProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class);
private ShardEpochMap shardEpochMap;
private boolean isFirstTransaction = true;
private boolean isInheritEpochEnabled = false;

public VitessEpochProvider() {
shardEpochMap = new ShardEpochMap();
}

public VitessEpochProvider(ShardEpochMap shardToEpoch) {
public VitessEpochProvider(ShardEpochMap shardToEpoch, boolean isInheritEpochEnabled) {
this.shardEpochMap = shardToEpoch;
this.isInheritEpochEnabled = isInheritEpochEnabled;
}

private static boolean isInvalidGtid(String gtid) {
Expand Down Expand Up @@ -76,7 +78,8 @@ public ShardEpochMap getShardEpochMap() {
*/
public static VitessEpochProvider initialize(VitessConnectorConfig config) {
ShardEpochMap shardEpochMap = VitessReplicationConnection.defaultShardEpochMap(config);
return new VitessEpochProvider(shardEpochMap);
boolean isInheritEpochEnabled = config.getInheritEpoch();
return new VitessEpochProvider(shardEpochMap, isInheritEpochEnabled);
}

public Map<String, Object> store(Map<String, Object> offset) {
Expand All @@ -90,11 +93,12 @@ public Map<String, Object> store(Map<String, Object> offset) {
*
* @param offsets Offsets to load
*/
public void load(Map<String, ?> offsets) {
public void load(Map<String, ?> offsets, VitessConnectorConfig config) {
String shardToEpochString = (String) offsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH);
if (!Strings.isNullOrEmpty(shardToEpochString)) {
shardEpochMap = ShardEpochMap.of(shardToEpochString);
}
isInheritEpochEnabled = config.getInheritEpoch();
}

public Long getEpoch(String shard, String previousVgtidString, String vgtidString) {
Expand All @@ -103,14 +107,15 @@ public Long getEpoch(String shard, String previousVgtidString, String vgtidStrin
}
Vgtid vgtid = Vgtid.of(vgtidString);
Vgtid previousVgtid = Vgtid.of(previousVgtidString);
processVgtid(previousVgtid, vgtid);
this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid);
if (isFirstTransaction) {
isFirstTransaction = false;
}
return shardEpochMap.get(shard);
}

private void processVgtid(Vgtid previousVgtid, Vgtid vgtid) {
private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) {
ShardEpochMap newShardEpochMap = new ShardEpochMap();
for (Vgtid.ShardGtid shardGtid : vgtid.getShardGtids()) {
String shard = shardGtid.getShard();
String gtid = shardGtid.getGtid();
Expand All @@ -125,17 +130,24 @@ private void processVgtid(Vgtid previousVgtid, Vgtid vgtid) {
shard, previousVgtid));
}
Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid, isFirstTransaction);
shardEpochMap.put(shard, epoch);
newShardEpochMap.put(shard, epoch);
}
else {
// A re-shard happened while we are streaming set the new value to zero
// TODO: Add support to inherit epoch from ancestor shard
shardEpochMap.put(shard, 0L);
// A re-shard happened while we are streaming
Long epoch;
if (isInheritEpochEnabled) {
epoch = ShardLineage.getInheritedEpoch(shard, shardEpochMap);
}
else {
epoch = 0L;
}
newShardEpochMap.put(shard, epoch);
}
}
// Note: we could purge all shards from the shard epoch map that are not present in the current vgtid.
// However, this poses some risk of losing epoch values, so we leave them as is. There may be dormant shards
// that we still have epoch values for, but that should be fine. Once we allow for epochs to be inherited from other shards
// we could reconsider purging them to ensure the epoch shard map does not grow too large.
return newShardEpochMap;
}

public boolean isInheritEpochEnabled() {
return isInheritEpochEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.math.BigDecimal;
import java.util.Map;

import io.debezium.annotation.VisibleForTesting;
import io.debezium.connector.vitess.SourceInfo;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessConnectorConfig;
Expand All @@ -22,6 +23,7 @@ public class VitessOrderedTransactionContext extends TransactionContext {
protected Long transactionEpoch = 0L;
protected BigDecimal transactionRank = null;
private VitessEpochProvider epochProvider = new VitessEpochProvider();
private VitessConnectorConfig config = null;

public VitessOrderedTransactionContext() {
}
Expand Down Expand Up @@ -76,14 +78,14 @@ public Map<String, Object> store(Map<String, Object> offset) {
*/
@Override
public TransactionContext newTransactionContextFromOffsets(Map<String, ?> offsets) {
return VitessOrderedTransactionContext.load(offsets);
return VitessOrderedTransactionContext.load(offsets, this.config);
}

public static VitessOrderedTransactionContext load(Map<String, ?> offsets) {
public static VitessOrderedTransactionContext load(Map<String, ?> offsets, VitessConnectorConfig config) {
TransactionContext transactionContext = TransactionContext.load(offsets);
VitessOrderedTransactionContext vitessOrderedTransactionContext = new VitessOrderedTransactionContext(transactionContext);
vitessOrderedTransactionContext.previousVgtid = (String) offsets.get(SourceInfo.VGTID_KEY);
vitessOrderedTransactionContext.epochProvider.load(offsets);
vitessOrderedTransactionContext.epochProvider.load(offsets, config);
return vitessOrderedTransactionContext;
}

Expand All @@ -99,6 +101,7 @@ public static VitessOrderedTransactionContext initialize(VitessConnectorConfig c
VitessOrderedTransactionContext vitessOrderedTransactionContext = new VitessOrderedTransactionContext();
vitessOrderedTransactionContext.epochProvider = VitessEpochProvider.initialize(config);
vitessOrderedTransactionContext.previousVgtid = VitessReplicationConnection.defaultVgtid(config).toString();
vitessOrderedTransactionContext.config = config;
return vitessOrderedTransactionContext;
}

Expand Down Expand Up @@ -141,4 +144,9 @@ public BigDecimal getTransactionRank() {
return transactionRank;
}

@VisibleForTesting
public VitessEpochProvider getEpochProvider() {
return epochProvider;
}

}
4 changes: 4 additions & 0 deletions src/test/java/io/debezium/connector/vitess/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public class TestHelper {
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\"}" +
"]";

public static final String VGTID_SINGLE_SHARD_JSON_TEMPLATE = "[" +
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" +
"]";

public static final String VGTID_JSON_TEMPLATE = "[" +
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import org.junit.Test;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaNameAdjuster;
Expand Down Expand Up @@ -116,4 +118,38 @@ public void shouldImproperShardEpochMapFailValidation() {
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldEnableInheritEpoch() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.INHERIT_EPOCH, true).build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
assertThat(connectorConfig.getInheritEpoch()).isTrue();
}

@Test
public void shouldValidateInheritEpochWithoutOrderedTransactionMetadata() {
Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.INHERIT_EPOCH, true).build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.INHERIT_EPOCH), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldValidateInheritEpochWithOrderedTransactionMetadata() {
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.INHERIT_EPOCH, true)
.with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.INHERIT_EPOCH), printConsumer);
assertThat(inputs.size()).isEqualTo(0);
}

}
Loading