From 6d290f6f78be90ae84cf0b7e6e1536dd691f4399 Mon Sep 17 00:00:00 2001 From: twthorn Date: Thu, 30 May 2024 15:49:52 -0400 Subject: [PATCH] DBZ-7910 Fix column exclude filter type, add itest --- .../vitess/VitessConnectorConfig.java | 2 +- .../vitess/AbstractVitessConnectorTest.java | 16 ++++++++++++++ .../connector/vitess/VitessConnectorIT.java | 22 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index b5628eb4..32d68314 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -489,7 +489,7 @@ public VitessConnectorConfig(Configuration config) { config, null, x -> x.schema() + "." + x.table(), -1, - ColumnFilterMode.CATALOG, + ColumnFilterMode.SCHEMA, true); } diff --git a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java index c1695ac1..91e64398 100644 --- a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java +++ b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java @@ -209,6 +209,22 @@ protected List schemasAndValuesForStringTypesTruncated() { return fields; } + protected List schemasAndValuesForStringTypesExcludedColumn() { + final List fields = new ArrayList<>(); + fields.addAll( + Arrays.asList( + new SchemaAndValueField("char_col", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "a"), + new SchemaAndValueField("varchar_col", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "bc"), + new SchemaAndValueField("varchar_ko_col", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "상품 명1"), + new SchemaAndValueField("varchar_ja_col", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "リンゴ"), + new SchemaAndValueField("tinytext_col", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "gh"), + new SchemaAndValueField("text_col", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "ij"), + new SchemaAndValueField("longtext_col", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "mn"), + new SchemaAndValueField("json_col", Json.builder().optional().build(), + "{\"key1\":\"value1\",\"key2\":{\"key21\":\"value21\",\"key22\":\"value22\"}}"))); + return fields; + } + protected List schemasAndValuesForBytesTypesAsBytes() { final List fields = new ArrayList<>(); fields.addAll( diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index b18ed3fc..1f27a47a 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -249,6 +249,28 @@ public void shouldConsumeEventsWithTruncatedColumn() throws Exception { assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypesTruncated(), TestHelper.PK_FIELD); } + @Test + public void shouldConsumeEventsWithExcludedColumn() throws Exception { + String columnToExlude = "mediumtext_col"; + String someColumnIncluded = "varchar_col"; + TestHelper.executeDDL("vitess_create_tables.ddl"); + startConnector(builder -> builder.with("column.exclude.list", + TEST_UNSHARDED_KEYSPACE + ".string_table." + columnToExlude), false, + false, 1, -1, -1, null, + VitessConnectorConfig.SnapshotMode.NEVER, ""); + assertConnectorIsRunning(); + + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount); + + consumer.expects(expectedRecordsCount); + SourceRecord record = assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypesExcludedColumn(), TestHelper.PK_FIELD); + Struct value = (Struct) record.value(); + Struct after = (Struct) value.get("after"); + assertThat(after.schema().field(columnToExlude)).isNull(); + assertThat(after.schema().field(someColumnIncluded)).isNotNull(); + } + @Test public void shouldReceiveBytesAsBytes() throws Exception { TestHelper.executeDDL("vitess_create_tables.ddl");