Skip to content

Commit

Permalink
DBZ-8354 Fix table filter logic for vstream
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Oct 31, 2024
1 parent cbbc6e8 commit 4fb9b61
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
20 changes: 8 additions & 12 deletions src/main/java/io/debezium/connector/vitess/VitessConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -28,7 +27,7 @@
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
import io.debezium.relational.Tables;
import io.grpc.StatusRuntimeException;

/** Vitess Connector entry point */
Expand Down Expand Up @@ -273,17 +272,15 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
}
}

public static List<String> getIncludedTables(String keyspace, String tableIncludeList, List<String> allTables) {
public static List<String> getIncludedTables(VitessConnectorConfig connectorConfig, List<String> allTables) {
// table.include.list are list of patterns, filter all the tables in the keyspace through those patterns
// to get the list of table names.
final List<Pattern> patterns = Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE);
List<String> includedTables = new ArrayList<>();
for (String ksTable : allTables) {
for (Pattern pattern : patterns) {
if (pattern.asPredicate().test(String.format("%s.%s", keyspace, ksTable))) {
includedTables.add(ksTable);
break;
}
String keyspace = connectorConfig.getKeyspace();
Tables.TableFilter filter = new Filters(connectorConfig).tableFilter();
for (String table : allTables) {
if (filter.isIncluded(new TableId("", keyspace, table))) {
includedTables.add(table);
}
}
return includedTables;
Expand Down Expand Up @@ -330,8 +327,7 @@ public List<TableId> getMatchingCollections(Configuration configuration) {
VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(configuration);
String keyspace = vitessConnectorConfig.getKeyspace();
List<String> allTables = vitessMetadata.getTables();
List<String> includedTables = getIncludedTables(keyspace,
vitessConnectorConfig.tableIncludeList(), allTables);
List<String> includedTables = getIncludedTables(vitessConnectorConfig, allTables);
return includedTables.stream()
.map(table -> new TableId(keyspace, null, table))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ private void setError(String msg) {
if (!Strings.isNullOrEmpty(config.tableIncludeList())) {
final String keyspace = config.getKeyspace();
final List<String> allTables = new VitessMetadata(config).getTables();
List<String> includedTables = VitessConnector.getIncludedTables(config.getKeyspace(),
config.tableIncludeList(), allTables);
List<String> includedTables = VitessConnector.getIncludedTables(config, allTables);
for (String table : includedTables) {
String sql = "select * from `" + table + "`";
// See rule in: https://github.com/vitessio/vitess/blob/release-14.0/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go#L316
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1646,11 +1646,29 @@ public void testTableIncludeList() {
String keyspace = "ks";
List<String> allTables = Arrays.asList("t1", "t22", "t3");
String tableIncludeList = new String("ks.t1,ks.t2.*");
List<String> includedTables = VitessConnector.getIncludedTables(keyspace, tableIncludeList, allTables);
Configuration config = Configuration.from(Map.of(
VitessConnectorConfig.TABLE_INCLUDE_LIST.name(), tableIncludeList,
VitessConnectorConfig.KEYSPACE.name(), keyspace));
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config);
List<String> includedTables = VitessConnector.getIncludedTables(connectorConfig, allTables);
List<String> expectedTables = Arrays.asList("t1", "t22");
assertEquals(expectedTables, includedTables);
}

@Test
public void testTableIncludeListShouldExcludeTablesWithSuffix() {
String keyspace = "ks";
List<String> allTables = Arrays.asList("t1", "t2", "t22", "t13");
String tableIncludeList = new String("ks.t1,ks.t2");
Configuration config = Configuration.from(Map.of(
VitessConnectorConfig.TABLE_INCLUDE_LIST.name(), tableIncludeList,
VitessConnectorConfig.KEYSPACE.name(), keyspace));
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config);
List<String> includedTables = VitessConnector.getIncludedTables(connectorConfig, allTables);
List<String> expectedTables = Arrays.asList("t1", "t2");
assertEquals(expectedTables, includedTables);
}

private boolean isEmptyOffsets(Map<String, ?> offsets) {
return offsets == null || offsets.isEmpty();
}
Expand Down

0 comments on commit 4fb9b61

Please sign in to comment.