Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](cdc)fix the error mapping default values from RDBMS to Doris #308

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.util.StringUtils;

import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.catalog.DorisTypeMapper;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.connection.JdbcConnectionProvider;
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
Expand Down Expand Up @@ -244,20 +245,31 @@ public static String buildCreateTableDDL(TableSchema schema) {
private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
fieldType =
String.format("%s(%s)", DorisType.VARCHAR, DorisTypeMapper.MAX_VARCHAR_SIZE);
}
sql.append(identifier(field.getName())).append(" ").append(fieldType);

if (field.getDefaultValue() != null) {
sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
String defaultValue = field.getDefaultValue();
if (defaultValue != null) {
sql.append(" DEFAULT ").append(quoteDefaultValue(defaultValue));
}
sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
}

public static String quoteDefaultValue(String defaultValue) {
// DEFAULT current_timestamp not need quote
if (defaultValue.equalsIgnoreCase("current_timestamp")) {
return defaultValue;
String colDefaultValue = defaultValue.toUpperCase();
// In Doris, CURRENT_TIMESTAMP can be used as a default value for DATETIME columns.
if (colDefaultValue.startsWith("CURRENT_TIMESTAMP")) {
long length = 0;
if (colDefaultValue.contains("(")) {
String substring =
colDefaultValue.substring(18, colDefaultValue.length() - 1).trim();
length = substring.isEmpty() ? 0 : Long.parseLong(substring);
if (length > DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION) {
length = DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;
}
}
return String.format("%s(%d)", "CURRENT_TIMESTAMP", length);
}
return "'" + defaultValue + "'";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;

public class TableSchema {
public static final String TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$";
private String database;
private String table;
private String tableComment;
Expand All @@ -31,7 +32,6 @@ public class TableSchema {
private DataModel model = DataModel.DUPLICATE;
private List<String> distributeKeys = new ArrayList<>();
private Map<String, String> properties = new HashMap<>();

private Integer tableBuckets;

public String getDatabase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.DorisTypeMapper;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
Expand Down Expand Up @@ -61,6 +62,8 @@
*/
public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImplV2.class);
// The default value of setting the current time in CDC is 1970-01-01 00:00:00
private static final String DEFAULT_TIMESTAMP = "1970-01-01 00:00:00";
private static final Pattern renameDDLPattern =
Pattern.compile(
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)",
Expand Down Expand Up @@ -310,8 +313,10 @@ public void fillOriginSchema(JsonNode columns) {
String fieldName = column.get("name").asText();
if (originFieldSchemaMap.containsKey(fieldName)) {
String dorisTypeName = buildDorisTypeName(column);
String defaultValue =
handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
int length = column.get("length") == null ? 0 : column.get("length").asInt();
String defaultValueExpression =
extractJsonNode(column, "defaultValueExpression");
String defaultValue = handleDefaultValue(defaultValueExpression, length);
String comment = extractJsonNode(column, "comment");
FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName);
fieldSchema.setName(fieldName);
Expand All @@ -333,7 +338,9 @@ public void fillOriginSchema(JsonNode columns) {
private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) {
String fieldName = column.get("name").asText();
String dorisTypeName = buildDorisTypeName(column);
String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
String defaultValueExpression = extractJsonNode(column, "defaultValueExpression");
int length = column.get("length") == null ? 0 : column.get("length").asInt();
String defaultValue = handleDefaultValue(defaultValueExpression, length);
String comment = extractJsonNode(column, "comment");
filedSchemaMap.put(
fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment));
Expand Down Expand Up @@ -365,13 +372,15 @@ public String buildDorisTypeName(JsonNode column) {
return dorisTypeName;
}

private String handleDefaultValue(String defaultValue) {
private String handleDefaultValue(String defaultValue, int length) {
if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
return null;
}
if (defaultValue.equals("1970-01-01 00:00:00")) {
// TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00
return "current_timestamp";
if (defaultValue.equals(DEFAULT_TIMESTAMP)) {
if (length > DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION) {
length = DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;
}
return String.format("%s(%d)", "CURRENT_TIMESTAMP", length);
}
return defaultValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.apache.flink.util.StringUtils;

import org.apache.doris.flink.catalog.DorisTypeMapper;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;

Expand All @@ -33,6 +35,9 @@
import java.util.StringJoiner;

public abstract class SourceSchema {
public static final String DEFAULT_FUNCTION_REGEX = ".*\\(.*\\)";
public static final String DEFAULT_STRING_SINGLE_QUOTE = "(?<!')'";
public static final String DEFAULT_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
private final String databaseName;
private final String schemaName;
private final String tableName;
Expand All @@ -50,9 +55,8 @@ public SourceSchema(
throws Exception {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
this.tableComment = tableComment;

// Oracle support '/' in table name, we give it a special treatment
fields = new LinkedHashMap<>();
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
Expand All @@ -70,8 +74,10 @@ public SourceSchema(
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
String convertedDefault = convertToDoriDefaultValue(defaultValue, dorisTypeStr);
fields.put(
fieldName, new FieldSchema(fieldName, dorisTypeStr, defaultValue, comment));
fieldName,
new FieldSchema(fieldName, dorisTypeStr, convertedDefault, comment));
}
}

Expand All @@ -82,6 +88,101 @@ public SourceSchema(
primaryKeys.add(fieldName);
}
}
this.tableName = tableName.replace("%", "_");
}

public String convertToDoriDefaultValue(String defaultStr, String dorisType) {
if (defaultStr == null) {
return null;
}
String dorisTypeStrUpperCase = dorisType.toUpperCase();
switch (dorisTypeStrUpperCase) {
case DorisType.TINYINT:
case DorisType.SMALLINT:
case DorisType.INT:
case DorisType.BIGINT:
case DorisType.LARGEINT:
case DorisType.DOUBLE:
return convertNoDateTimeDefault(defaultStr, dorisType);
case DorisType.JSON:
case DorisType.JSONB:
return convertStringDefault(defaultStr, dorisType);
default:
if (dorisTypeStrUpperCase.startsWith(DorisType.CHAR)
|| dorisTypeStrUpperCase.startsWith(DorisType.VARCHAR)
|| dorisTypeStrUpperCase.startsWith(DorisType.STRING)) {
return convertStringDefault(defaultStr, dorisType);
} else if (dorisTypeStrUpperCase.startsWith(DorisType.DATETIME)
|| dorisTypeStrUpperCase.startsWith(DorisType.DATETIME_V2)) {
return convertDateTimeDefault(defaultStr, dorisType);
} else if (dorisTypeStrUpperCase.startsWith(DorisType.DECIMAL)
|| dorisTypeStrUpperCase.startsWith(DorisType.DECIMAL_V3)) {
return convertNoDateTimeDefault(defaultStr, dorisType);
} else if (dorisTypeStrUpperCase.startsWith(DorisType.DATE)
|| dorisTypeStrUpperCase.startsWith(DorisType.DATE_V2)) {
return convertDateTimeDefault(defaultStr, dorisType);
}
return null;
}
}

/**
* Convert default value of numeric type to Doris type. In MySQL8.x, the default function may
* like `((rand() * rand()))`, `((curdate() + interval 1 year))`, etc. We can't convert these
* default value to Doris default value, so we return null.
*
* @param defaultStr default value of source Database
* @param dorisType the type of the field in Doris
*/
public String convertNoDateTimeDefault(String defaultStr, String dorisType) {
if (defaultStr.matches(DEFAULT_FUNCTION_REGEX)) {
return null;
}
return defaultStr.replaceAll(DEFAULT_STRING_SINGLE_QUOTE, "").replace(" ", "");
}

public String convertDateTimeDefault(String defaultStr, String dorisType) {
String defaultStrUpperCase = defaultStr.toUpperCase();
if (defaultStrUpperCase.startsWith(DEFAULT_CURRENT_TIMESTAMP)) {
long length = 0;
int startIndex = defaultStrUpperCase.indexOf('(') + 1;
int endIndex = defaultStrUpperCase.indexOf(')');
// In MariaDB, the CURRENT_TIMESTAMP() is the same as CURRENT_TIMESTAMP(0) in MySQL.
if (endIndex >= 0) {
String substring = defaultStrUpperCase.substring(startIndex, endIndex).trim();
length = substring.isEmpty() ? 0 : Long.parseLong(substring);
if (length > DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION) {
length = DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;
}
}
return String.format("CURRENT_TIMESTAMP(%d)", length);
} else if (defaultStrUpperCase.matches(DEFAULT_FUNCTION_REGEX)) {
return null;
}
return handleTimestampNanoseconds(defaultStr);
}

public String handleTimestampNanoseconds(String defaultStr) {
if (null == defaultStr || defaultStr.isEmpty()) {
return null;
}

if (defaultStr.length() < 20) {
return defaultStr;
}
String[] split = defaultStr.split("\\.");
if (split.length != 2) {
return null;
}
String nanoseconds = split[1];
if (nanoseconds.length() > 6) {
nanoseconds = nanoseconds.substring(0, 6);
}
return split[0] + "." + nanoseconds;
}

public String convertStringDefault(String defaultStr, String dorisType) {
return defaultStr.replaceAll("'", "");
}

public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -115,6 +116,13 @@ public List<SourceSchema> getSchemaList() throws Exception {
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
String tableComment = tables.getString("REMARKS");
if (!tableName.matches(TableSchema.TABLE_REGEX)) {
LOG.warn(
"table name {} is not valid in Doris, the regex of Doris table name is {}",
tableName,
TableSchema.TABLE_REGEX);
continue;
}
if (!isSyncNeeded(tableName)) {
continue;
}
Expand Down
Loading
Loading