diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java index b2a49d1a8..afda237ef 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java @@ -360,39 +360,50 @@ private static List convertArrayData(ArrayData array, LogicalType type) private static Object convertMapData(MapData map, LogicalType type) { Map result = new HashMap<>(); + LogicalType valueType = ((MapType) type).getValueType(); + LogicalType keyType = ((MapType) type).getKeyType(); if (map instanceof GenericMapData) { GenericMapData gMap = (GenericMapData) map; for (Object key : ((GenericArrayData) gMap.keyArray()).toObjectArray()) { - result.put(key, gMap.get(key)); + + Object convertedKey = convertMapEntry(key, keyType); + Object convertedValue = convertMapEntry(gMap.get(key), valueType); + result.put(convertedKey, convertedValue); } return result; - } - if (map instanceof BinaryMapData) { + } else if (map instanceof BinaryMapData) { BinaryMapData bMap = (BinaryMapData) map; - LogicalType valueType = ((MapType) type).getValueType(); Map javaMap = bMap.toJavaMap(((MapType) type).getKeyType(), valueType); for (Map.Entry entry : javaMap.entrySet()) { - String key = entry.getKey().toString(); - if (LogicalTypeRoot.MAP.equals(valueType.getTypeRoot())) { - result.put(key, convertMapData((MapData) entry.getValue(), valueType)); - } else if (LogicalTypeRoot.DATE.equals(valueType.getTypeRoot())) { - result.put( - key, - Date.valueOf(LocalDate.ofEpochDay((Integer) entry.getValue())) - .toString()); - } else if (LogicalTypeRoot.ARRAY.equals(valueType.getTypeRoot())) { - result.put(key, convertArrayData((ArrayData) entry.getValue(), valueType)); - } else if (entry.getValue() instanceof TimestampData) { - result.put(key, ((TimestampData) entry.getValue()).toTimestamp().toString()); - } else { - result.put(key, entry.getValue().toString()); - } + Object convertedKey = convertMapEntry(entry.getKey(), keyType); + Object convertedValue = convertMapEntry(entry.getValue(), valueType); + result.put(convertedKey, convertedValue); } return result; } throw new UnsupportedOperationException("Unsupported map data: " + map.getClass()); } + /** + * Converts the key-value pair of MAP to the actual type. + * + * @param originValue the original value of key-value pair + * @param logicalType key or value logical type + */ + private static Object convertMapEntry(Object originValue, LogicalType logicalType) { + if (LogicalTypeRoot.MAP.equals(logicalType.getTypeRoot())) { + return convertMapData((MapData) originValue, logicalType); + } else if (LogicalTypeRoot.DATE.equals(logicalType.getTypeRoot())) { + return Date.valueOf(LocalDate.ofEpochDay((Integer) originValue)).toString(); + } else if (LogicalTypeRoot.ARRAY.equals(logicalType.getTypeRoot())) { + return convertArrayData((ArrayData) originValue, logicalType); + } else if (originValue instanceof TimestampData) { + return ((TimestampData) originValue).toTimestamp().toString(); + } else { + return originValue.toString(); + } + } + private static Object convertRowData(RowData val, int index, LogicalType type) { RowType rowType = (RowType) type; Map value = new HashMap<>(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java index d69a1311d..b63d03320 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java @@ -21,6 +21,7 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; @@ -40,10 +41,11 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class DorisRowConverterTest implements Serializable { - @Test public void testConvert() throws IOException { ResolvedSchema schema = @@ -67,11 +69,11 @@ public void testConvert() throws IOException { DorisRowConverter converter = new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType()); - - LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); - LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); + // Doris DatetimeV2 supports up to 6 decimal places (microseconds). + LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); LocalDate date1 = LocalDate.of(2021, 1, 1); - List record = + List record = Arrays.asList( null, true, @@ -104,7 +106,7 @@ public void testConvert() throws IOException { .build(); String s = new String(serializer.serialize(rowData).getRow()); Assert.assertEquals( - "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", + "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris", s); } @@ -130,8 +132,9 @@ public void testExternalConvert() { Column.physical("f16", DataTypes.VARCHAR(256))); DorisRowConverter converter = new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType()); - LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); - LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0); + // Doris DatetimeV2 supports up to 6 decimal places (microseconds). + LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); LocalDate date1 = LocalDate.of(2021, 1, 1); GenericRowData rowData = GenericRowData.of( @@ -151,12 +154,198 @@ public void testExternalConvert() { (int) date1.toEpochDay(), StringData.fromString("a"), StringData.fromString("doris")); - List row = new ArrayList(); + List row = new ArrayList<>(); for (int i = 0; i < rowData.getArity(); i++) { row.add(converter.convertExternal(rowData, i)); } Assert.assertEquals( - "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:00:00.0, 2021-01-01 08:00:00.0, 2021-01-01, a, doris]", + "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris]", row.toString()); } + + @Test + public void testMapInternalConvert() throws IOException { + + ResolvedSchema schema = getRowMapSchema(); + DorisRowConverter converter = + new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType()); + // Doris DatetimeV2 supports up to 6 decimal places (microseconds). + LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDateTime time3 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDate date1 = LocalDate.of(2021, 1, 1); + Map booleanMap = createMapAndPut(new HashMap<>(), true, false); + Map floatMap = createMapAndPut(new HashMap<>(), 1.2f, 1.3f); + Map doubleMap = createMapAndPut(new HashMap<>(), 1.2345d, 1.2345d); + Map intervalYearMap = createMapAndPut(new HashMap<>(), 24, 24); + Map intervalDayMap = createMapAndPut(new HashMap<>(), 10, 10); + Map tinyIntMap = createMapAndPut(new HashMap<>(), (byte) 1, (byte) 1); + Map shortIntMap = createMapAndPut(new HashMap<>(), (short) 32, (short) 32); + Map intMap = createMapAndPut(new HashMap<>(), 64, 64); + Map longMap = createMapAndPut(new HashMap<>(), 128L, 128L); + Map decimalMap = + createMapAndPut( + new HashMap<>(), BigDecimal.valueOf(10.123), BigDecimal.valueOf(10.123)); + Map timestampWithZoneMap = + createMapAndPut(new HashMap<>(), time1, time1); + Map timestampWithLocalZoneMap = + createMapAndPut(new HashMap<>(), time2, time2); + Map timestampNoLTZ = + createMapAndPut(new HashMap<>(), time3, time3); + Map dateMap = createMapAndPut(new HashMap<>(), date1, date1); + Map charMap = createMapAndPut(new HashMap<>(), 'a', 'a'); + Map stringMap = createMapAndPut(new HashMap<>(), "doris", "doris"); + + List record = + Arrays.asList( + booleanMap, + floatMap, + doubleMap, + intervalYearMap, + intervalDayMap, + tinyIntMap, + shortIntMap, + intMap, + longMap, + decimalMap, + timestampWithZoneMap, + timestampWithLocalZoneMap, + timestampNoLTZ, + dateMap, + charMap, + stringMap); + GenericRowData rowData = converter.convertInternal(record); + + RowDataSerializer serializer = + new Builder() + .setFieldType(schema.getColumnDataTypes().toArray(new DataType[0])) + .setType("csv") + .setFieldDelimiter("|") + .setFieldNames( + new String[] { + "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", + "f11", "f12", "f13", "f14", "f15", "f16" + }) + .build(); + String s = new String(serializer.serialize(rowData).getRow()); + Assert.assertEquals( + "{\"true\":\"false\"}|{\"1.2\":\"1.3\"}|{\"1.2345\":\"1.2345\"}|{\"24\":\"24\"}|{\"10\":\"10\"}|{\"1\":\"1\"}|{\"32\":\"32\"}|{\"64\":\"64\"}|{\"128\":\"128\"}|{\"10.12\":\"10.12\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01\":\"2021-01-01\"}|{\"a\":\"a\"}|{\"doris\":\"doris\"}", + s); + } + + @Test + public void testMapExternalConvert() { + + ResolvedSchema schema = getRowMapSchema(); + DorisRowConverter converter = + new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType()); + // Doris DatetimeV2 supports up to 6 decimal places (microseconds). + LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDateTime time3 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + LocalDate date1 = LocalDate.of(2021, 1, 1); + + Map booleanMap = createMapAndPut(new HashMap<>(), true, false); + Map floatMap = createMapAndPut(new HashMap<>(), 1.2f, 1.3f); + Map doubleMap = createMapAndPut(new HashMap<>(), 1.2345d, 1.2345d); + Map intervalYearMap = createMapAndPut(new HashMap<>(), 24, 24); + Map intervalDayMap = createMapAndPut(new HashMap<>(), 10, 10); + Map tinyIntMap = createMapAndPut(new HashMap<>(), (byte) 1, (byte) 1); + Map shortIntMap = createMapAndPut(new HashMap<>(), (short) 32, (short) 32); + Map intMap = createMapAndPut(new HashMap<>(), 64, 64); + Map longMap = createMapAndPut(new HashMap<>(), 128L, 128L); + Map decimalMap = + createMapAndPut( + new HashMap<>(), BigDecimal.valueOf(10.123), BigDecimal.valueOf(10.123)); + Map timestampWithZoneMap = + createMapAndPut( + new HashMap<>(), + TimestampData.fromLocalDateTime(time1), + TimestampData.fromLocalDateTime(time1)); + Map timestampWithLocalZoneMap = + createMapAndPut( + new HashMap<>(), + TimestampData.fromLocalDateTime(time2), + TimestampData.fromLocalDateTime(time2)); + Map timestampNoLTZ = + createMapAndPut( + new HashMap<>(), + TimestampData.fromLocalDateTime(time3), + TimestampData.fromLocalDateTime(time3)); + Map dateMap = + createMapAndPut( + new HashMap<>(), (int) date1.toEpochDay(), (int) date1.toEpochDay()); + Map charMap = createMapAndPut(new HashMap<>(), 'a', 'a'); + Map stringMap = createMapAndPut(new HashMap<>(), "doris", "doris"); + GenericRowData rowData = + GenericRowData.of( + new GenericMapData(booleanMap), + new GenericMapData(floatMap), + new GenericMapData(doubleMap), + new GenericMapData(intervalYearMap), + new GenericMapData(intervalDayMap), + new GenericMapData(tinyIntMap), + new GenericMapData(shortIntMap), + new GenericMapData(intMap), + new GenericMapData(longMap), + new GenericMapData(decimalMap), + new GenericMapData(timestampWithZoneMap), + new GenericMapData(timestampWithLocalZoneMap), + new GenericMapData(timestampNoLTZ), + new GenericMapData(dateMap), + new GenericMapData(charMap), + new GenericMapData(stringMap)); + + List row = new ArrayList<>(); + for (int i = 0; i < rowData.getArity(); i++) { + row.add(converter.convertExternal(rowData, i)); + } + Assert.assertEquals( + "[{\"true\":\"false\"}, {\"1.2\":\"1.3\"}, {\"1.2345\":\"1.2345\"}, {\"24\":\"24\"}, {\"10\":\"10\"}, {\"1\":\"1\"}, {\"32\":\"32\"}, {\"64\":\"64\"}, {\"128\":\"128\"}, {\"10.123\":\"10.123\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01\":\"2021-01-01\"}, {\"a\":\"a\"}, {\"doris\":\"doris\"}]", + row.toString()); + } + + /** generate map data. */ + public static Map createMapAndPut(Map map, K key, V value) { + map.put(key, value); + return map; + } + + public static ResolvedSchema getRowMapSchema() { + return ResolvedSchema.of( + Column.physical("f1", DataTypes.MAP(DataTypes.BOOLEAN(), DataTypes.BOOLEAN())), + Column.physical("f2", DataTypes.MAP(DataTypes.FLOAT(), DataTypes.FLOAT())), + Column.physical("f3", DataTypes.MAP(DataTypes.DOUBLE(), DataTypes.DOUBLE())), + Column.physical( + "f4", + DataTypes.MAP( + DataTypes.INTERVAL(DataTypes.YEAR()), + DataTypes.INTERVAL(DataTypes.YEAR()))), + Column.physical( + "f5", + DataTypes.MAP( + DataTypes.INTERVAL(DataTypes.DAY()), + DataTypes.INTERVAL(DataTypes.DAY()))), + Column.physical("f6", DataTypes.MAP(DataTypes.TINYINT(), DataTypes.TINYINT())), + Column.physical("f7", DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.SMALLINT())), + Column.physical("f8", DataTypes.MAP(DataTypes.INT(), DataTypes.INT())), + Column.physical("f9", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BIGINT())), + Column.physical( + "f10", DataTypes.MAP(DataTypes.DECIMAL(10, 2), DataTypes.DECIMAL(10, 2))), + Column.physical( + "f11", + DataTypes.MAP( + DataTypes.TIMESTAMP_WITH_TIME_ZONE(), + DataTypes.TIMESTAMP_WITH_TIME_ZONE())), + Column.physical( + "f12", + DataTypes.MAP( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())), + Column.physical("f13", DataTypes.MAP(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP())), + Column.physical("f14", DataTypes.MAP(DataTypes.DATE(), DataTypes.DATE())), + Column.physical("f15", DataTypes.MAP(DataTypes.CHAR(1), DataTypes.CHAR(1))), + Column.physical( + "f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256)))); + } }