Skip to content

Commit

Permalink
DBZ-8541 Handle partially overridden transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Dec 19, 2024
1 parent a8130d9 commit fea0cbc
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
LOGGER.info("No previous offset found");
}
else {
LOGGER.info("Found previous offset {}", previousOffset);
LOGGER.info("Found task {} previous offset {}", config.getString(VitessConnectorConfig.TASK_ID), previousOffset);
}

replicationConnection = new VitessReplicationConnection(connectorConfig, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.debezium.connector.vitess.pipeline.txmetadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -77,15 +78,7 @@ private void parseGtid(String transactionId) {
}
}

public boolean isHostSetEqual(Gtid hosts) {
return this.hosts.equals(hosts.hosts);
}

public boolean isHostSetSupersetOf(Gtid previousHosts) {
return this.hosts.containsAll(previousHosts.hosts);
}

public boolean isHostSetSubsetOf(Gtid previousHosts) {
return previousHosts.hosts.containsAll(this.hosts);
public boolean isHostSetSupersetOf(Gtid otherHosts) {
return this.hosts.containsAll(otherHosts.hosts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public class VitessEpochProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class);
private ShardEpochMap shardEpochMap;
private boolean isFirstTransaction = true;
private boolean isInheritEpochEnabled = false;

public VitessEpochProvider() {
Expand All @@ -33,34 +32,48 @@ public VitessEpochProvider(ShardEpochMap shardToEpoch, boolean isInheritEpochEna
this.isInheritEpochEnabled = isInheritEpochEnabled;
}

private static boolean isInvalidGtid(String gtid) {
private static boolean isGtidOverridden(String gtid) {
return gtid.equals(Vgtid.CURRENT_GTID) || gtid.equals(Vgtid.EMPTY_GTID);
}

public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString, boolean isFirstTransaction) {
if (isFirstTransaction && isInvalidGtid(previousGtidString)) {
private static boolean isStandardGtid(String gtid) {
return !isGtidOverridden(gtid);
}

public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString) {
if (isGtidOverridden(previousGtidString) && isGtidOverridden(gtidString)) {
// GTID was overridden, and the current GTID is an overridden value, still waiting for first transaction
return previousEpoch;
} else if (isGtidOverridden(previousGtidString) && !isGtidOverridden(gtidString)) {
// GTID was overridden, received first transaction, increment epoch
LOGGER.info("Incrementing epoch: {}", getLogMessageForGtid(previousEpoch, previousGtidString, gtidString));
return previousEpoch + 1;
} else if (isStandardGtid(previousGtidString) && isGtidOverridden(gtidString)) {
// previous GTID is standard, current GTID is overridden, should not be possible, raise exception
String message = String.format("Current GTID cannot be override value if previous is standard: %s",
getLogMessageForGtid(previousEpoch, previousGtidString, gtidString));
LOGGER.error(message);
throw new DebeziumException(message);
} else {
// Both GTIDs are standard so parse them
return getEpochForStandardGtid(previousEpoch, previousGtidString, gtidString);
}
else if (isInvalidGtid(previousGtidString)) {
throw new DebeziumException("Invalid GTID: The previous GTID cannot be one of current or empty after the first transaction " + gtidString);
}
if (isInvalidGtid(gtidString)) {
throw new DebeziumException("Invalid GTID: The current GTID cannot be one of current or empty " + gtidString);
}
}

private static Long getEpochForStandardGtid(Long previousEpoch, String previousGtidString, String gtidString) {
Gtid previousGtid = new Gtid(previousGtidString);
Gtid gtid = new Gtid(gtidString);
if (previousGtid.isHostSetEqual(gtid) || gtid.isHostSetSupersetOf(previousGtid)) {
if (gtid.isHostSetSupersetOf(previousGtid)) {
return previousEpoch;
}
else if (gtid.isHostSetSubsetOf(previousGtid)) {
} else {
// Any other case (disjoint set, previous is a superset), VStream has interpreted the previous GTID correctly and sent some new GTID
// in a continuous stream, so simply increment the epoch
return previousEpoch + 1;
}
else {
LOGGER.error(
"Error determining epoch, previous host set: {}, host set: {}",
previousGtid, gtid);
throw new RuntimeException("Can't determine epoch");
}
}

private static String getLogMessageForGtid(Long previousEpoch, String previousGtidString, String gtidString) {
return String.format("GTID: %s, previous GTID: %s, previous Epoch: %s", gtidString, previousGtidString, previousEpoch);
}

public ShardEpochMap getShardEpochMap() {
Expand Down Expand Up @@ -105,16 +118,19 @@ public Long getEpoch(String shard, String previousVgtidString, String vgtidStrin
if (previousVgtidString == null) {
throw new DebeziumException(String.format("Previous vgtid string cannot be null shard %s current %s", shard, vgtidString));
}
Vgtid vgtid = Vgtid.of(vgtidString);
Vgtid previousVgtid = Vgtid.of(previousVgtidString);
this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid);
if (isFirstTransaction) {
isFirstTransaction = false;
try {
Vgtid vgtid = Vgtid.of(vgtidString);
Vgtid previousVgtid = Vgtid.of(previousVgtidString);
this.shardEpochMap = getNewShardEpochMap(previousVgtid, vgtid, shard);
return shardEpochMap.get(shard);
}
catch (Exception e) {
LOGGER.error("Error providing epoch with shard {}, previousVgtid {}, vgtid {}", shard, previousVgtidString, vgtidString, e);
throw e;
}
return shardEpochMap.get(shard);
}

private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) {
private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid, String transactionShard) {
ShardEpochMap newShardEpochMap = new ShardEpochMap();
for (Vgtid.ShardGtid shardGtid : vgtid.getShardGtids()) {
String shard = shardGtid.getShard();
Expand All @@ -129,7 +145,7 @@ private ShardEpochMap getNewShardEpochMap(Vgtid previousVgtid, Vgtid vgtid) {
"Previous epoch cannot be null for shard %s when shard present in previous vgtid %s",
shard, previousVgtid));
}
Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid, isFirstTransaction);
Long epoch = getEpochForGtid(previousEpoch, previousGtid, gtid);
newShardEpochMap.put(shard, epoch);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,53 @@

public class GtidTest {

private static final String EXPECTED_VERSION = "MySQL56";
private static final String HOST_SET1 = "/host1:1,host2:2-10";
private static final String GTID1 = EXPECTED_VERSION + HOST_SET1;

@Test
public void shouldInit() {
String expectedVersion = "MySQL56";
Gtid gtid = new Gtid(expectedVersion + "/host1:1-4,host2:2-10");
assertThat(gtid.getVersion()).isEqualTo(expectedVersion);
Gtid gtid = new Gtid(EXPECTED_VERSION + "/host1:1-4,host2:2-10");
assertThat(gtid.getVersion()).isEqualTo(EXPECTED_VERSION);
assertThat(gtid.getSequenceValues()).isEqualTo(List.of("4", "10"));
assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2"));
}

@Test
public void shouldHandleSingleValue() {
String expectedVersion = "MySQL56";
Gtid gtid = new Gtid(expectedVersion + "/host1:1,host2:2-10");
assertThat(gtid.getVersion()).isEqualTo(expectedVersion);
Gtid gtid = new Gtid(GTID1);
assertThat(gtid.getVersion()).isEqualTo(EXPECTED_VERSION);
assertThat(gtid.getSequenceValues()).isEqualTo(List.of("1", "10"));
assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2"));
}

@Test
public void testHostSupersetWithLargerSet() {
Gtid gtid = new Gtid(GTID1);
Gtid gtidSuperset = new Gtid(EXPECTED_VERSION + "/host1:1,host2:2-10,host3:1-5");
assertThat(gtidSuperset.isHostSetSupersetOf(gtid)).isTrue();
assertThat(gtid.isHostSetSupersetOf(gtidSuperset)).isFalse();
}

@Test
public void testHostSupersetWithEqualSet() {
Gtid gtid = new Gtid(GTID1);
Gtid gtid2 = new Gtid(GTID1);
assertThat(gtid.isHostSetSupersetOf(gtid2)).isTrue();
assertThat(gtid2.isHostSetSupersetOf(gtid)).isTrue();
}

@Test
public void shouldThrowExceptionOnEmptyStringWithPrefix() {
String expectedVersion = "MySQL56";
assertThatThrownBy(() -> {
Gtid gtid = new Gtid(expectedVersion + "/");
Gtid gtid = new Gtid(EXPECTED_VERSION + "/");
}).isInstanceOf(DebeziumException.class);
}

@Test
public void shouldThrowExceptionOnVersionOnly() {
String expectedVersion = "MySQL56";
assertThatThrownBy(() -> {
Gtid gtid = new Gtid(expectedVersion);
Gtid gtid = new Gtid(EXPECTED_VERSION);
}).isInstanceOf(DebeziumException.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VgtidTest;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.junit.logging.LogInterceptor;

public class VitessEpochProviderTest {

Expand All @@ -44,6 +45,7 @@ public class VitessEpochProviderTest {
private String txIdVersion8 = "MySQL82/" + String.join(",", host1Tx2);

private List<String> shards = List.of(VgtidTest.TEST_SHARD, VgtidTest.TEST_SHARD2);
private String errorOnCurrentOverrideValue = "Current GTID cannot be override value if previous is standard";

String vgtidJsonCurrent = String.format(
VGTID_JSON_TEMPLATE,
Expand All @@ -56,7 +58,7 @@ public class VitessEpochProviderTest {

@Test
public void testGetEpochSameHostSet() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId);
assertThat(epoch).isEqualTo(0);
}

Expand Down Expand Up @@ -136,7 +138,9 @@ public void fastForwardVgtidIncrementsEpoch() {
public void currentVgtidIncrementsEpochForAllShards() {
VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false);
Long epochShard1 = provider.getEpoch(VgtidTest.TEST_SHARD, vgtidJsonCurrent, VGTID_JSON);
assertThat(provider.getShardEpochMap()).isEqualTo(new ShardEpochMap(Map.of(VgtidTest.TEST_SHARD, 1L, VgtidTest.TEST_SHARD2, 1L)));
Long epochShard2 = provider.getEpoch(VgtidTest.TEST_SHARD2, VGTID_JSON, VGTID_JSON);
assertThat(provider.getShardEpochMap()).isEqualTo(new ShardEpochMap(Map.of(VgtidTest.TEST_SHARD, 1L, VgtidTest.TEST_SHARD2, 1L)));
assertThat(epochShard1).isEqualTo(1L);
assertThat(epochShard2).isEqualTo(1L);
}
Expand Down Expand Up @@ -218,6 +222,57 @@ public void missingEpochWithPreviousVgtidShouldThrowException() {
}).isInstanceOf(DebeziumException.class).hasMessageContaining("Previous epoch cannot be null");
}

@Test
public void testGtidPartialCurrent() {
VitessEpochProvider provider = new VitessEpochProvider();
VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty());
provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, new ShardEpochMap(Map.of("f0-f8", 1L, "30-38", 1L, "b0-b8", 1L, "70-78", 1L)).toString()), config);
String shard = "f0-f8";
String vgtidAllCurrent = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"current\",\"table_p_ks\":[]}]";
String vgtidOneShardWithGtid = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525090\",\"table_p_ks\":[]}]";
String vgtidOneShardCurrent = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588997\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]";
String vgtidOneShardCurrentNewGtid = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588998\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"current\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]";
String vgtidNoShardCurrent = "[" +
"{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"MySQL56/host1:1-450588997\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"70-78\",\"gtid\":\"MySQL56/host2:1-368122129\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"b0-b8\",\"gtid\":\"host4:1-3\",\"table_p_ks\":[]}," +
"{\"keyspace\":\"keyspace1\",\"shard\":\"f0-f8\",\"gtid\":\"MySQL56/host3:1-144525093\",\"table_p_ks\":[]}]";
// The first transaction will have at least one shard with an actual GTID
provider.getEpoch(shard, vgtidAllCurrent, vgtidOneShardWithGtid);
// Eventually all but one shard will have a GTID
provider.getEpoch(shard, vgtidOneShardWithGtid, vgtidOneShardCurrent);
// We have received a legit GTID for all shards except one, that one can still be current
provider.getEpoch(shard, vgtidOneShardCurrent, vgtidOneShardCurrentNewGtid);
// We can continue to receive current for that GTID indefinitely
provider.getEpoch(shard, vgtidOneShardCurrentNewGtid, vgtidOneShardCurrentNewGtid);
// Eventually, we receive a GTID for that shard
provider.getEpoch("b0-b8", vgtidOneShardCurrentNewGtid, vgtidNoShardCurrent);
// After that if we received current again that would be an error
assertThatThrownBy(() -> {
provider.getEpoch("b0-b8", vgtidNoShardCurrent, vgtidOneShardCurrent);
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
// Assert that if we received current again even for a non-transaction shard, that would be an error
assertThatThrownBy(() -> {
provider.getEpoch(shard, vgtidNoShardCurrent, vgtidOneShardCurrent);
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
}

@Test
public void matchingGtidReturnsInitialEpoch() {
VitessEpochProvider provider = new VitessEpochProvider(ShardEpochMap.init(shards), false);
Expand All @@ -242,7 +297,7 @@ public void testInvalidCurrentGtid() {
Vgtid.EMPTY_GTID);
assertThatThrownBy(() -> {
provider.getEpoch("-80", VGTID_JSON, vgtidJsonCurrent);
}).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid");
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
}

@Test
Expand All @@ -261,31 +316,31 @@ public void testInvalidEmptyGtid() {
Vgtid.EMPTY_GTID);
assertThatThrownBy(() -> {
provider.getEpoch("-80", VGTID_JSON, vgtidJsonEmpty);
}).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid");
}).isInstanceOf(DebeziumException.class).hasMessageContaining(errorOnCurrentOverrideValue);
}

@Test
public void testGetEpochShrunkHostSet() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txIdShrunk, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txIdShrunk);
assertThat(epoch).isEqualTo(1);
}

@Test
public void testGetEpochExpandHostSet() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId);
assertThat(epoch).isEqualTo(0);
}

@Test
public void testGetEpochDisjointThrowsException() {
Assertions.assertThatThrownBy(() -> {
VitessEpochProvider.getEpochForGtid(0L, previousTxId, "foo:1-2,bar:2-4", false);
}).isInstanceOf(RuntimeException.class);
public void testGetEpochDisjointIncrementsEpoch() {
long previousEpoch = 0L;
long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, "foo:1-2,bar:2-4");
assertThat(epoch).isEqualTo(previousEpoch + 1);
}

@Test
public void testVersionUpgradeDoesNotAffectEpoch() {
Long epoch = VitessEpochProvider.getEpochForGtid(0L, txIdVersion5, txIdVersion8, false);
Long epoch = VitessEpochProvider.getEpochForGtid(0L, txIdVersion5, txIdVersion8);
assertThat(epoch).isEqualTo(0L);
}
}

0 comments on commit fea0cbc

Please sign in to comment.