From c7f2f2bbc7590019e6557d2fa2a54c04e807ac50 Mon Sep 17 00:00:00 2001
From: wudongliang <46414265+DongLiang-0@users.noreply.github.com>
Date: Wed, 3 Jan 2024 17:11:04 +0800
Subject: [PATCH] [Improve]Separate the record and schema-change in
JsonDebeziumSchemaSerializer (#279)
---
.../doris/flink/sink/writer/ChangeEvent.java | 25 +
.../JsonDebeziumSchemaSerializer.java | 624 ++----------------
.../JsonDebeziumChangeContext.java | 98 +++
.../jsondebezium/JsonDebeziumDataChange.java | 167 +++++
.../JsonDebeziumSchemaChange.java | 143 ++++
.../JsonDebeziumSchemaChangeImpl.java | 145 ++++
.../JsonDebeziumSchemaChangeImplV2.java | 389 +++++++++++
.../TestJsonDebeziumSchemaSerializer.java | 473 ++-----------
.../TestJsonDebeziumChangeBase.java | 55 ++
.../TestJsonDebeziumDataChange.java | 190 ++++++
.../TestJsonDebeziumSchemaChangeImpl.java | 143 ++++
.../TestJsonDebeziumSchemaChangeImplV2.java | 268 ++++++++
.../flink/tools/cdc/MySQLDorisE2ECase.java | 153 ++++-
13 files changed, 1871 insertions(+), 1002 deletions(-)
create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java
create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImpl.java
create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumChangeBase.java
create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java
new file mode 100644
index 000000000..8d7c5cc1b
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/ChangeEvent.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import java.io.Serializable;
+
+/**
+ * represents the change events of external systems, including data change and schema change event.
+ */
+public interface ChangeEvent extends Serializable {}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index cdbd48240..370bca7b2 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -18,88 +18,56 @@
package org.apache.doris.flink.sink.writer.serializer;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.doris.flink.catalog.doris.FieldSchema;
-import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.exception.IllegalArgumentException;
-import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
-import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
-import org.apache.doris.flink.sink.schema.SchemaChangeManager;
-import org.apache.doris.flink.sink.writer.EventType;
-import org.apache.doris.flink.tools.cdc.SourceConnector;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
-import org.apache.doris.flink.tools.cdc.oracle.OracleType;
-import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
-import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+/**
+ * Serialize the records of the upstream data source into a data form that can be recognized by
+ * downstream doris.
+ *
+ *
There are two serialization methods here:
+ * 1. data change{@link JsonDebeziumDataChange} record.
+ * 2. schema change{@link JsonDebeziumSchemaChange} records.
+ */
public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
- private static final String OP_READ = "r"; // snapshot read
- private static final String OP_CREATE = "c"; // insert
- private static final String OP_UPDATE = "u"; // update
- private static final String OP_DELETE = "d"; // delete
- public static final String EXECUTE_DDL =
- "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int
- private static final String addDropDDLRegex =
- "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
- private static final Pattern renameDDLPattern =
- Pattern.compile(
- "ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
- Pattern.CASE_INSENSITIVE);
- private final Pattern addDropDDLPattern;
- private DorisOptions dorisOptions;
- private ObjectMapper objectMapper = new ObjectMapper();
- private String database;
- private String table;
+ private final Pattern pattern;
+ private final DorisOptions dorisOptions;
+ private final ObjectMapper objectMapper = new ObjectMapper();
// table name of the cdc upstream, format is db.tbl
- private String sourceTableName;
+ private final String sourceTableName;
private boolean firstLoad;
- private boolean firstSchemaChange;
- private Map originFieldSchemaMap;
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
- private SourceConnector sourceConnector;
- private SchemaChangeManager schemaChangeManager;
//
private Map tableMapping;
// create table properties
private Map tableProperties;
private String targetDatabase;
+ private JsonDebeziumDataChange dataChange;
+ private JsonDebeziumSchemaChange schemaChange;
public JsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
@@ -107,15 +75,7 @@ public JsonDebeziumSchemaSerializer(
String sourceTableName,
boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
- this.addDropDDLPattern =
- pattern == null
- ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE)
- : pattern;
- if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
- String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
- this.database = tableInfo[0];
- this.table = tableInfo[1];
- }
+ this.pattern = pattern;
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
@@ -123,8 +83,6 @@ public JsonDebeziumSchemaSerializer(
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.newSchemaChange = newSchemaChange;
this.firstLoad = true;
- this.firstSchemaChange = true;
- this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
}
public JsonDebeziumSchemaSerializer(
@@ -156,6 +114,27 @@ public JsonDebeziumSchemaSerializer(
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
+ init();
+ }
+
+ private void init() {
+ JsonDebeziumChangeContext changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ sourceTableName,
+ targetDatabase,
+ tableProperties,
+ objectMapper,
+ pattern,
+ lineDelimiter,
+ ignoreUpdateBefore);
+
+ this.schemaChange =
+ newSchemaChange
+ ? new JsonDebeziumSchemaChangeImplV2(changeContext)
+ : new JsonDebeziumSchemaChangeImpl(changeContext);
+ this.dataChange = new JsonDebeziumDataChange(changeContext);
}
@Override
@@ -165,360 +144,15 @@ public DorisRecord serialize(String record) throws IOException {
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
- if (newSchemaChange) {
- schemaChangeV2(recordRoot);
- } else {
- schemaChange(recordRoot);
- }
- return null;
- }
-
- if (newSchemaChange && firstLoad) {
- initOriginFieldSchema(recordRoot);
- }
-
- // Filter out table records that are not in tableMapping
- String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
- String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
- if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
- LOG.warn(
- "filter table {}, because it is not listened, record detail is {}",
- cdcTableIdentifier,
- record);
- return null;
- }
-
- Map valueMap;
- switch (op) {
- case OP_READ:
- case OP_CREATE:
- valueMap = extractAfterRow(recordRoot);
- addDeleteSign(valueMap, false);
- break;
- case OP_UPDATE:
- return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot));
- case OP_DELETE:
- valueMap = extractBeforeRow(recordRoot);
- addDeleteSign(valueMap, true);
- break;
- default:
- LOG.error("parse record fail, unknown op {} in {}", op, record);
- return null;
- }
-
- return DorisRecord.of(
- dorisTableIdentifier,
- objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
- }
-
- /**
- * Change the update event into two.
- *
- * @param recordRoot
- * @return
- */
- private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
- StringBuilder updateRow = new StringBuilder();
- if (!ignoreUpdateBefore) {
- // convert delete
- Map beforeRow = extractBeforeRow(recordRoot);
- addDeleteSign(beforeRow, true);
- updateRow.append(objectMapper.writeValueAsString(beforeRow)).append(this.lineDelimiter);
- }
-
- // convert insert
- Map afterRow = extractAfterRow(recordRoot);
- addDeleteSign(afterRow, false);
- updateRow.append(objectMapper.writeValueAsString(afterRow));
- return updateRow.toString().getBytes(StandardCharsets.UTF_8);
- }
-
- public boolean schemaChangeV2(JsonNode recordRoot) {
- boolean status = false;
- try {
- if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
- return false;
- }
-
- EventType eventType = extractEventType(recordRoot);
- if (eventType == null) {
- return false;
- }
- if (eventType.equals(EventType.CREATE)) {
- TableSchema tableSchema = extractCreateTableSchema(recordRoot);
- status = schemaChangeManager.createTable(tableSchema);
- if (status) {
- String cdcTbl = getCdcTableIdentifier(recordRoot);
- String dorisTbl = getCreateTableIdentifier(recordRoot);
- tableMapping.put(cdcTbl, dorisTbl);
- LOG.info("create table ddl status: {}", status);
- }
- } else if (eventType.equals(EventType.ALTER)) {
- // db,table
- Tuple2 tuple = getDorisTableTuple(recordRoot);
- if (tuple == null) {
- return false;
- }
- List ddlSqlList = extractDDLList(recordRoot);
- if (CollectionUtils.isEmpty(ddlSqlList)) {
- LOG.info("ddl can not do schema change:{}", recordRoot);
- return false;
- }
- List ddlSchemas = SchemaChangeHelper.getDdlSchemas();
- for (int i = 0; i < ddlSqlList.size(); i++) {
- DDLSchema ddlSchema = ddlSchemas.get(i);
- String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
- status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
- LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
- }
- } else {
- LOG.info("Unsupported event type {}", eventType);
- }
- } catch (Exception ex) {
- LOG.warn("schema change error :", ex);
- }
- return status;
- }
-
- protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException {
- JsonNode historyRecord = extractHistoryRecord(record);
- JsonNode tableChanges = historyRecord.get("tableChanges");
- if (!Objects.isNull(tableChanges)) {
- JsonNode tableChange = tableChanges.get(0);
- return tableChange;
- }
- return null;
- }
-
- /** Parse Alter Event. */
- @VisibleForTesting
- public List extractDDLList(JsonNode record) throws IOException {
- String dorisTable = getDorisTableIdentifier(record);
- JsonNode historyRecord = extractHistoryRecord(record);
- String ddl = extractJsonNode(historyRecord, "ddl");
- JsonNode tableChange = extractTableChange(record);
- if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
- return null;
- }
-
- JsonNode columns = tableChange.get("table").get("columns");
- if (firstSchemaChange) {
- sourceConnector =
- SourceConnector.valueOf(
- record.get("source").get("connector").asText().toUpperCase());
- fillOriginSchema(columns);
- }
-
- // rename ddl
- Matcher renameMatcher = renameDDLPattern.matcher(ddl);
- if (renameMatcher.find()) {
- String oldColumnName = renameMatcher.group(2);
- String newColumnName = renameMatcher.group(3);
- return SchemaChangeHelper.generateRenameDDLSql(
- dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
- }
-
- // add/drop ddl
- Map updateFiledSchema = new LinkedHashMap<>();
- for (JsonNode column : columns) {
- buildFieldSchema(updateFiledSchema, column);
- }
- SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap);
- // In order to avoid other source table column change operations other than add/drop/rename,
- // which may lead to the accidental deletion of the doris column.
- Matcher matcher = addDropDDLPattern.matcher(ddl);
- if (!matcher.find()) {
+ schemaChange.schemaChange(recordRoot);
return null;
}
- return SchemaChangeHelper.generateDDLSql(dorisTable);
- }
-
- @VisibleForTesting
- public TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException {
- if (sourceConnector == null) {
- sourceConnector =
- SourceConnector.valueOf(
- record.get("source").get("connector").asText().toUpperCase());
- }
-
- String dorisTable = getCreateTableIdentifier(record);
- JsonNode tableChange = extractTableChange(record);
- JsonNode pkColumns = tableChange.get("table").get("primaryKeyColumnNames");
- JsonNode columns = tableChange.get("table").get("columns");
- JsonNode comment = tableChange.get("table").get("comment");
- String tblComment = comment == null ? "" : comment.asText();
- Map field = new LinkedHashMap<>();
- for (JsonNode column : columns) {
- buildFieldSchema(field, column);
- }
- List pkList = new ArrayList<>();
- for (JsonNode column : pkColumns) {
- String fieldName = column.asText();
- pkList.add(fieldName);
- }
-
- TableSchema tableSchema = new TableSchema();
- tableSchema.setFields(field);
- tableSchema.setKeys(pkList);
- tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
- tableSchema.setTableComment(tblComment);
- tableSchema.setProperties(tableProperties);
-
- String[] split = dorisTable.split("\\.");
- Preconditions.checkArgument(split.length == 2);
- tableSchema.setDatabase(split[0]);
- tableSchema.setTable(split[1]);
- return tableSchema;
- }
-
- private List buildDistributeKeys(
- List primaryKeys, Map fields) {
- if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
- return primaryKeys;
- }
- if (!fields.isEmpty()) {
- Map.Entry firstField = fields.entrySet().iterator().next();
- return Collections.singletonList(firstField.getKey());
- }
- return new ArrayList<>();
- }
-
- @VisibleForTesting
- public void setOriginFieldSchemaMap(Map originFieldSchemaMap) {
- this.originFieldSchemaMap = originFieldSchemaMap;
- }
-
- @VisibleForTesting
- public boolean schemaChange(JsonNode recordRoot) {
- boolean status = false;
- try {
- if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
- return false;
- }
- // db,table
- Tuple2 tuple = getDorisTableTuple(recordRoot);
- if (tuple == null) {
- return false;
- }
-
- String ddl = extractDDL(recordRoot);
- if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
- LOG.info("ddl can not do schema change:{}", recordRoot);
- return false;
- }
-
- boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddl);
- status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
- LOG.info("schema change status:{}", status);
- } catch (Exception ex) {
- LOG.warn("schema change error :", ex);
- }
- return status;
- }
- /** When cdc synchronizes multiple tables, it will capture multiple table schema changes. */
- protected boolean checkTable(JsonNode recordRoot) {
- String db = extractDatabase(recordRoot);
- String tbl = extractTable(recordRoot);
- String dbTbl = db + "." + tbl;
- return sourceTableName.equals(dbTbl);
- }
-
- public String getCdcTableIdentifier(JsonNode record) {
- String db = extractJsonNode(record.get("source"), "db");
- String schema = extractJsonNode(record.get("source"), "schema");
- String table = extractJsonNode(record.get("source"), "table");
- return SourceSchema.getString(db, schema, table);
- }
-
- public String getCreateTableIdentifier(JsonNode record) {
- String table = extractJsonNode(record.get("source"), "table");
- return targetDatabase + "." + table;
- }
-
- public String getDorisTableIdentifier(String cdcTableIdentifier) {
- if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
- return dorisOptions.getTableIdentifier();
- }
- if (!CollectionUtil.isNullOrEmpty(tableMapping)
- && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
- && tableMapping.get(cdcTableIdentifier) != null) {
- return tableMapping.get(cdcTableIdentifier);
+ if (firstLoad) {
+ schemaChange.init(recordRoot);
+ firstLoad = false;
}
- return null;
- }
-
- protected String getDorisTableIdentifier(JsonNode record) {
- String identifier = getCdcTableIdentifier(record);
- return getDorisTableIdentifier(identifier);
- }
-
- protected Tuple2 getDorisTableTuple(JsonNode record) {
- String identifier = getDorisTableIdentifier(record);
- if (StringUtils.isNullOrWhitespaceOnly(identifier)) {
- return null;
- }
- String[] tableInfo = identifier.split("\\.");
- if (tableInfo.length != 2) {
- return null;
- }
- return Tuple2.of(tableInfo[0], tableInfo[1]);
- }
-
- private boolean checkSchemaChange(String database, String table, String ddl)
- throws IOException, IllegalArgumentException {
- Map param = buildRequestParam(ddl);
- return schemaChangeManager.checkSchemaChange(database, table, param);
- }
-
- private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema)
- throws IOException, IllegalArgumentException {
- Map param =
- SchemaChangeManager.buildRequestParam(
- ddlSchema.isDropColumn(), ddlSchema.getColumnName());
- return schemaChangeManager.checkSchemaChange(database, table, param);
- }
-
- /** Build param { "isDropColumn": true, "columnName" : "column" }. */
- protected Map buildRequestParam(String ddl) {
- Map params = new HashMap<>();
- Matcher matcher = addDropDDLPattern.matcher(ddl);
- if (matcher.find()) {
- String op = matcher.group(1);
- String col = matcher.group(3);
- params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
- params.put("columnName", col);
- }
- return params;
- }
-
- protected String extractDatabase(JsonNode record) {
- if (record.get("source").has("schema")) {
- // compatible with schema
- return extractJsonNode(record.get("source"), "schema");
- } else {
- return extractJsonNode(record.get("source"), "db");
- }
- }
-
- protected String extractTable(JsonNode record) {
- return extractJsonNode(record.get("source"), "table");
- }
-
- /** Parse event type. */
- protected EventType extractEventType(JsonNode record) throws JsonProcessingException {
- JsonNode tableChange = extractTableChange(record);
- if (tableChange == null || tableChange.get("type") == null) {
- return null;
- }
- String type = tableChange.get("type").asText();
- if (EventType.ALTER.toString().equalsIgnoreCase(type)) {
- return EventType.ALTER;
- } else if (EventType.CREATE.toString().equalsIgnoreCase(type)) {
- return EventType.CREATE;
- }
- return null;
+ return dataChange.serialize(record, recordRoot, op);
}
private String extractJsonNode(JsonNode record, String key) {
@@ -527,154 +161,9 @@ private String extractJsonNode(JsonNode record, String key) {
: null;
}
- private Map extractBeforeRow(JsonNode record) {
- return extractRow(record.get("before"));
- }
-
- private Map extractAfterRow(JsonNode record) {
- return extractRow(record.get("after"));
- }
-
- private Map extractRow(JsonNode recordRow) {
- Map recordMap =
- objectMapper.convertValue(recordRow, new TypeReference