From a88fcf6bcb383e7b9f547213c5599040a11fc68f Mon Sep 17 00:00:00 2001 From: Roman Kudryashov Date: Sun, 7 Apr 2024 09:43:41 +0300 Subject: [PATCH] DBZ-7755 Refactor exporting to CloudEvents: remove RecordParser --- .../converters/VitessCloudEventsMaker.java | 21 +++++++--- .../converters/VitessCloudEventsProvider.java | 10 +---- .../vitess/converters/VitessRecordParser.java | 42 ------------------- 3 files changed, 18 insertions(+), 55 deletions(-) delete mode 100644 src/main/java/io/debezium/connector/vitess/converters/VitessRecordParser.java diff --git a/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsMaker.java b/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsMaker.java index a8512367..39aeea59 100644 --- a/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsMaker.java +++ b/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsMaker.java @@ -5,11 +5,15 @@ */ package io.debezium.connector.vitess.converters; +import java.util.Set; + import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.vitess.SourceInfo; +import io.debezium.converters.recordandmetadata.RecordAndMetadata; import io.debezium.converters.spi.CloudEventsMaker; -import io.debezium.converters.spi.RecordParser; import io.debezium.converters.spi.SerializerType; +import io.debezium.data.Envelope; +import io.debezium.util.Collect; /** * CloudEvents maker for records produced by the Vitess connector. @@ -18,13 +22,20 @@ */ public class VitessCloudEventsMaker extends CloudEventsMaker { - public VitessCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) { - super(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName); + private static final Set VITESS_SOURCE_FIELDS = Collect.unmodifiableSet(SourceInfo.VGTID_KEY, SourceInfo.KEYSPACE_NAME_KEY); + + public VitessCloudEventsMaker(RecordAndMetadata recordAndMetadata, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) { + super(recordAndMetadata, contentType, dataSchemaUriBase, cloudEventsSchemaName, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER); } @Override public String ceId() { - return "name:" + recordParser.getMetadata(AbstractSourceInfo.SERVER_NAME_KEY) - + ";vgtid:" + recordParser.getMetadata(SourceInfo.VGTID_KEY); + return "name:" + sourceField(AbstractSourceInfo.SERVER_NAME_KEY) + + ";vgtid:" + sourceField(SourceInfo.VGTID_KEY); + } + + @Override + public Set connectorSpecificSourceFields() { + return VITESS_SOURCE_FIELDS; } } diff --git a/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsProvider.java b/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsProvider.java index 3996fef9..ca338b7b 100644 --- a/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsProvider.java +++ b/src/main/java/io/debezium/connector/vitess/converters/VitessCloudEventsProvider.java @@ -9,7 +9,6 @@ import io.debezium.converters.recordandmetadata.RecordAndMetadata; import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.converters.spi.CloudEventsProvider; -import io.debezium.converters.spi.RecordParser; import io.debezium.converters.spi.SerializerType; /** @@ -24,12 +23,7 @@ public String getName() { } @Override - public RecordParser createParser(RecordAndMetadata recordAndMetadata) { - return new VitessRecordParser(recordAndMetadata); - } - - @Override - public CloudEventsMaker createMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) { - return new VitessCloudEventsMaker(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName); + public CloudEventsMaker createMaker(RecordAndMetadata recordAndMetadata, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) { + return new VitessCloudEventsMaker(recordAndMetadata, contentType, dataSchemaUriBase, cloudEventsSchemaName); } } diff --git a/src/main/java/io/debezium/connector/vitess/converters/VitessRecordParser.java b/src/main/java/io/debezium/connector/vitess/converters/VitessRecordParser.java deleted file mode 100644 index 8d16318f..00000000 --- a/src/main/java/io/debezium/connector/vitess/converters/VitessRecordParser.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.vitess.converters; - -import java.util.Set; - -import org.apache.kafka.connect.errors.DataException; - -import io.debezium.connector.vitess.SourceInfo; -import io.debezium.converters.recordandmetadata.RecordAndMetadata; -import io.debezium.converters.spi.RecordParser; -import io.debezium.data.Envelope; -import io.debezium.util.Collect; - -/** - * Parser for records produced by the Vitess connector. - * - * @author Chris Cranford - */ -public class VitessRecordParser extends RecordParser { - - private static final Set VITESS_SOURCE_FIELD = Collect.unmodifiableSet(SourceInfo.VGTID_KEY, SourceInfo.KEYSPACE_NAME_KEY); - - public VitessRecordParser(RecordAndMetadata recordAndMetadata) { - super(recordAndMetadata, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER); - } - - @Override - public Object getMetadata(String name) { - if (SOURCE_FIELDS.contains(name)) { - return source().get(name); - } - if (VITESS_SOURCE_FIELD.contains(name)) { - return source().get(name); - } - - throw new DataException("No such field \"" + name + "\" in the \"source\" field of events from Vitess connector"); - } -}