Skip to content

Commit

Permalink
add ut test
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 committed Jan 24, 2024
1 parent 1c49635 commit e3f5468
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class DorisTypeMapper {

/** Max size of varchar type of Doris. */
public static final int MAX_VARCHAR_SIZE = 65533;
/* Max precision of datetime type of Doris. */
public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;

public static DataType toFlinkType(
String columnName, String columnType, int precision, int scale) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@

package org.apache.doris.flink.tools.cdc.mysql;

import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;

import org.apache.doris.flink.catalog.doris.DorisType;

import static org.apache.doris.flink.catalog.DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;

public class MysqlType {

// MySQL driver returns width of timestamp types instead of precision.
// 19 characters are used for zero-precision timestamps while others
// require 19 + precision + 1 characters with the additional character
// required for the decimal separator.
private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
private static final String BIT = "BIT";
private static final String BOOLEAN = "BOOLEAN";
private static final String BOOL = "BOOL";
Expand Down Expand Up @@ -146,14 +155,28 @@ public static String toDorisType(String type, Integer length, Integer scale) {
case DATETIME:
case TIMESTAMP:
// default precision is 0
if (length == null || length <= 0 || length == 19) {
return DorisType.DATETIME_V2;
// In JsonDebeziumSchemaSerializer record,the length of timestamp/datetime is 0
// to 6.
} else if (length > 20) {
return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length - 20, 6));
} else if (length <= 9) {
return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length, 6));
// see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
if (length == null
|| length <= 0
|| length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
} else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
// Timestamp with a fraction of seconds.
// For example, 2024-01-01 01:01:01.1
// The decimal point will occupy 1 character.
// Thus,the length of the timestamp is 21.
return String.format(
"%s(%s)",
DorisType.DATETIME_V2,
Math.min(
length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
MAX_SUPPORTED_DATE_TIME_PRECISION));
} else if (length <= TimestampType.MAX_PRECISION) {
// For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9.
return String.format(
"%s(%s)",
DorisType.DATETIME_V2,
Math.min(length, MAX_SUPPORTED_DATE_TIME_PRECISION));
} else {
throw new UnsupportedOperationException(
"Unsupported length: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.writer.serializer.jsondebezium;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -267,4 +268,51 @@ public void testAutoCreateTable() throws Exception {
Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName());
schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
}

@Test
public void testDateTimeFullOrigin() throws JsonProcessingException {
Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
srcFiledSchemaMap.put(
"test_dt_0", new FieldSchema("test_dt_0", "DATETIMEV2(0)", null, null));
srcFiledSchemaMap.put(
"test_dt_1", new FieldSchema("test_dt_1", "DATETIMEV2(1)", null, null));
srcFiledSchemaMap.put(
"test_dt_3", new FieldSchema("test_dt_3", "DATETIMEV2(3)", null, null));
srcFiledSchemaMap.put(
"test_dt_6", new FieldSchema("test_dt_6", "DATETIMEV2(6)", null, null));
srcFiledSchemaMap.put(
"test_ts_0", new FieldSchema("test_ts_0", "DATETIMEV2(0)", null, null));
srcFiledSchemaMap.put(
"test_ts_1",
new FieldSchema("test_ts_1", "DATETIMEV2(1)", "current_timestamp", null));
srcFiledSchemaMap.put(
"test_ts_3",
new FieldSchema("test_ts_3", "DATETIMEV2(3)", "current_timestamp", null));
srcFiledSchemaMap.put(
"test_ts_6",
new FieldSchema("test_ts_6", "DATETIMEV2(6)", "current_timestamp", null));

schemaChange.setSourceConnector("mysql");
String columnsString =
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_1\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":1,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_3\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":3,\"position\":4,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_dt_6\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"length\":6,\"position\":5,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_0\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"position\":6,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"enumValues\":[]},{\"name\":\"test_ts_1\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":1,\"position\":7,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_3\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":3,\"position\":8,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]},{\"name\":\"test_ts_6\",\"jdbcType\":2014,\"typeName\":\"TIMESTAMP\",\"typeExpression\":\"TIMESTAMP\",\"charsetName\":null,\"length\":6,\"position\":9,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"1970-01-01 00:00:00\",\"enumValues\":[]}]},\"comment\":null}]}";
JsonNode columns = objectMapper.readTree(columnsString);
schemaChange.fillOriginSchema(columns);
Map<String, FieldSchema> originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap();

Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
originFieldSchemaMap.entrySet().iterator();
for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
FieldSchema srcFiledSchema = entry.getValue();
Entry<String, FieldSchema> originField = originFieldSchemaIterator.next();

Assert.assertEquals(entry.getKey(), originField.getKey());
Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName());
Assert.assertEquals(
srcFiledSchema.getTypeString(), originField.getValue().getTypeString());
Assert.assertEquals(
srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue());
Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment());
}
}
}

0 comments on commit e3f5468

Please sign in to comment.