Skip to content

Commit

Permalink
DBZ-7755 Refactor exporting to CloudEvents: remove RecordParser
Browse files Browse the repository at this point in the history
  • Loading branch information
rkudryashov authored and jpechane committed Apr 8, 2024
1 parent 62ca8ef commit a88fcf6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
*/
package io.debezium.connector.vitess.converters;

import java.util.Set;

import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.vitess.SourceInfo;
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.RecordParser;
import io.debezium.converters.spi.SerializerType;
import io.debezium.data.Envelope;
import io.debezium.util.Collect;

/**
* CloudEvents maker for records produced by the Vitess connector.
Expand All @@ -18,13 +22,20 @@
*/
public class VitessCloudEventsMaker extends CloudEventsMaker {

public VitessCloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
super(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName);
private static final Set<String> VITESS_SOURCE_FIELDS = Collect.unmodifiableSet(SourceInfo.VGTID_KEY, SourceInfo.KEYSPACE_NAME_KEY);

public VitessCloudEventsMaker(RecordAndMetadata recordAndMetadata, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
super(recordAndMetadata, contentType, dataSchemaUriBase, cloudEventsSchemaName, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
}

@Override
public String ceId() {
return "name:" + recordParser.getMetadata(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";vgtid:" + recordParser.getMetadata(SourceInfo.VGTID_KEY);
return "name:" + sourceField(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";vgtid:" + sourceField(SourceInfo.VGTID_KEY);
}

@Override
public Set<String> connectorSpecificSourceFields() {
return VITESS_SOURCE_FIELDS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.CloudEventsProvider;
import io.debezium.converters.spi.RecordParser;
import io.debezium.converters.spi.SerializerType;

/**
Expand All @@ -24,12 +23,7 @@ public String getName() {
}

@Override
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
return new VitessRecordParser(recordAndMetadata);
}

@Override
public CloudEventsMaker createMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
return new VitessCloudEventsMaker(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName);
public CloudEventsMaker createMaker(RecordAndMetadata recordAndMetadata, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
return new VitessCloudEventsMaker(recordAndMetadata, contentType, dataSchemaUriBase, cloudEventsSchemaName);
}
}

This file was deleted.

0 comments on commit a88fcf6

Please sign in to comment.