diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index cc5fe4b8a..e125a3065 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -70,6 +70,8 @@ public class DorisTypeMapper { /** Max size of varchar type of Doris. */ public static final int MAX_VARCHAR_SIZE = 65533; + /* Max precision of datetime type of Doris. */ + public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6; public static DataType toFlinkType( String columnName, String columnType, int precision, int scale) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java index e220364e8..60a5eda25 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java @@ -17,11 +17,20 @@ package org.apache.doris.flink.tools.cdc.mysql; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.util.Preconditions; import org.apache.doris.flink.catalog.doris.DorisType; +import static org.apache.doris.flink.catalog.DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION; + public class MysqlType { + + // MySQL driver returns width of timestamp types instead of precision. + // 19 characters are used for zero-precision timestamps while others + // require 19 + precision + 1 characters with the additional character + // required for the decimal separator. + private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19; private static final String BIT = "BIT"; private static final String BOOLEAN = "BOOLEAN"; private static final String BOOL = "BOOL"; @@ -146,14 +155,28 @@ public static String toDorisType(String type, Integer length, Integer scale) { case DATETIME: case TIMESTAMP: // default precision is 0 - if (length == null || length <= 0 || length == 19) { - return DorisType.DATETIME_V2; - // In JsonDebeziumSchemaSerializer record,the length of timestamp/datetime is 0 - // to 6. - } else if (length > 20) { - return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length - 20, 6)); - } else if (length <= 9) { - return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length, 6)); + // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html + if (length == null + || length <= 0 + || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) { + return String.format("%s(%s)", DorisType.DATETIME_V2, 0); + } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) { + // Timestamp with a fraction of seconds. + // For example, 2024-01-01 01:01:01.1 + // The decimal point will occupy 1 character. + // Thus,the length of the timestamp is 21. + return String.format( + "%s(%s)", + DorisType.DATETIME_V2, + Math.min( + length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1, + MAX_SUPPORTED_DATE_TIME_PRECISION)); + } else if (length <= TimestampType.MAX_PRECISION) { + // For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9. + return String.format( + "%s(%s)", + DorisType.DATETIME_V2, + Math.min(length, MAX_SUPPORTED_DATE_TIME_PRECISION)); } else { throw new UnsupportedOperationException( "Unsupported length: " diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 0ce60d31f..eeeb307d9 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.writer.serializer.jsondebezium; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; @@ -267,4 +268,51 @@ public void testAutoCreateTable() throws Exception { Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName()); schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName); } + + @Test + public void testDateTimeFullOrigin() throws JsonProcessingException { + Map srcFiledSchemaMap = new LinkedHashMap<>(); + srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null)); + srcFiledSchemaMap.put( + "test_dt_0", new FieldSchema("test_dt_0", "DATETIMEV2(0)", null, null)); + srcFiledSchemaMap.put( + "test_dt_1", new FieldSchema("test_dt_1", "DATETIMEV2(1)", null, null)); + srcFiledSchemaMap.put( + "test_dt_3", new FieldSchema("test_dt_3", "DATETIMEV2(3)", null, null)); + srcFiledSchemaMap.put( + "test_dt_6", new FieldSchema("test_dt_6", "DATETIMEV2(6)", null, null)); + srcFiledSchemaMap.put( + "test_ts_0", new FieldSchema("test_ts_0", "DATETIMEV2(0)", null, null)); + srcFiledSchemaMap.put( + "test_ts_1", + new FieldSchema("test_ts_1", "DATETIMEV2(1)", "current_timestamp", null)); + srcFiledSchemaMap.put( + "test_ts_3", + new FieldSchema("test_ts_3", "DATETIMEV2(3)", "current_timestamp", null)); + srcFiledSchemaMap.put( + "test_ts_6", + new FieldSchema("test_ts_6", "DATETIMEV2(6)", "current_timestamp", null)); + + schemaChange.setSourceConnector("mysql"); + String columnsString = + "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_1\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":1,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_3\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":3,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_6\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":6,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_0\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_1\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":1,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_3\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":3,\"position\":8,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_6\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":6,\"position\":9,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}]},\"comment\":null}]}"; + JsonNode columns = objectMapper.readTree(columnsString); + schemaChange.fillOriginSchema(columns); + Map originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap(); + + Iterator> originFieldSchemaIterator = + originFieldSchemaMap.entrySet().iterator(); + for (Entry entry : srcFiledSchemaMap.entrySet()) { + FieldSchema srcFiledSchema = entry.getValue(); + Entry originField = originFieldSchemaIterator.next(); + + Assert.assertEquals(entry.getKey(), originField.getKey()); + Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName()); + Assert.assertEquals( + srcFiledSchema.getTypeString(), originField.getValue().getTypeString()); + Assert.assertEquals( + srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue()); + Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment()); + } + } }