Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix map type mapping to doris type error #267

Merged
merged 24 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c7ff9bd
modify the invalid uri
vinlee19 Jul 6, 2023
fdacdbb
Merge branch 'apache:master' into master
vinlee19 Sep 21, 2023
6611669
Merge branch 'apache:master' into master
vinlee19 Oct 7, 2023
d0a0642
update pom.xml
vinlee19 Oct 16, 2023
e084a18
Merge remote-tracking branch 'origin/master'
vinlee19 Oct 16, 2023
469f74f
Merge remote-tracking branch 'upstream/master'
vinlee19 Oct 27, 2023
ab38293
Merge remote-tracking branch 'upstream/master'
vinlee19 Oct 31, 2023
2f6c09f
Merge remote-tracking branch 'origin/master'
vinlee19 Nov 13, 2023
99ed96f
Merge remote-tracking branch 'upstream/master'
vinlee19 Dec 7, 2023
8266d83
add unit test for map
vinlee19 Dec 8, 2023
02e8fd7
add map test
vinlee19 Dec 11, 2023
f5bda17
improve code for MAP conversion
vinlee19 Dec 13, 2023
8ad5c02
Merge remote-tracking branch 'upstream/master' into map_test
vinlee19 Dec 15, 2023
ee05116
add unit test for MAP in DorisRowConverter
vinlee19 Dec 15, 2023
89f8e69
[Fix] fix Load Doris data failed, schema size of fetch data is wrong …
caoliang-web Dec 11, 2023
d989ac8
[fix] improve char length proplem (#262)
JNSimba Dec 12, 2023
300ebaf
add unit test for MAP in DorisRowConverter
vinlee19 Dec 15, 2023
36c8f8a
Merge remote-tracking branch 'origin/map_test' into map_test
vinlee19 Dec 15, 2023
7bf5287
reformt code
vinlee19 Dec 15, 2023
cb16007
Merge branch 'master' into map_test
vinlee19 Dec 15, 2023
6fce59d
reformat code according to checkstyle
vinlee19 Dec 18, 2023
2f4f136
reformat code according to checkstyle
vinlee19 Dec 18, 2023
add11da
add timestamp() test case for MAP
vinlee19 Dec 20, 2023
1d149ab
add timestamp test case for MAP
vinlee19 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,26 +357,38 @@ private static Object convertMapData(MapData map, LogicalType type) {
if (map instanceof BinaryMapData) {
BinaryMapData bMap = (BinaryMapData)map;
LogicalType valueType = ((MapType)type).getValueType();
LogicalType keyType = ((MapType) type).getKeyType();
Map<?, ?> javaMap = bMap.toJavaMap(((MapType) type).getKeyType(), valueType);
for (Map.Entry<?,?> entry : javaMap.entrySet()) {
String key = entry.getKey().toString();
if (LogicalTypeRoot.MAP.equals(valueType.getTypeRoot())) {
result.put(key, convertMapData((MapData)entry.getValue(), valueType));
}else if (LogicalTypeRoot.DATE.equals(valueType.getTypeRoot())) {
result.put(key, Date.valueOf(LocalDate.ofEpochDay((Integer)entry.getValue())).toString());
}else if (LogicalTypeRoot.ARRAY.equals(valueType.getTypeRoot())) {
result.put(key, convertArrayData((ArrayData)entry.getValue(), valueType));
}else if(entry.getValue() instanceof TimestampData){
result.put(key, ((TimestampData)entry.getValue()).toTimestamp().toString());
}else{
result.put(key, entry.getValue().toString());
}
Object convertedKey = convertMapEntry(entry.getKey(), keyType);
Object convertedValue = convertMapEntry(entry.getValue(), valueType);
result.put(convertedKey,convertedValue);
}
return result;
}
throw new UnsupportedOperationException("Unsupported map data: " + map.getClass());
}

/**
* Converts the key-value pair of MAP to the actual type
*
* @param originValue the original value of key-value pair
* @param logicalType key or value logical type
*/
private static Object convertMapEntry(Object originValue, LogicalType logicalType) {
if (LogicalTypeRoot.MAP.equals(logicalType.getTypeRoot())) {
return convertMapData((MapData) originValue, logicalType);
} else if (LogicalTypeRoot.DATE.equals(logicalType.getTypeRoot())) {
return Date.valueOf(LocalDate.ofEpochDay((Integer) originValue)).toString();
} else if (LogicalTypeRoot.ARRAY.equals(logicalType.getTypeRoot())) {
return convertArrayData((ArrayData) originValue, logicalType);
} else if (originValue instanceof TimestampData) {
return ((TimestampData) originValue).toString();
JNSimba marked this conversation as resolved.
Show resolved Hide resolved
} else {
return originValue.toString();
}
}

private static Object convertRowData(RowData val, int index, LogicalType type) {
RowType rowType = (RowType)type;
Map<String, Object> value = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink;
JNSimba marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.UUID;

public class DorisSinkMapSQLExample {

public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("CREATE TABLE map_doris_source (\n" +
" `id` int,\n" +
" `c_1` map<BOOLEAN,BOOLEAN>, \n" +
" `c_2` map<BOOLEAN,BOOLEAN>, \n" +
" `c_3` map<TINYINT,TINYINT>, \n" +
" `c_4` map<SMALLINT,SMALLINT>, \n" +
" `c_5` map<INT,INT> ,\n" +
" `c_6` map<BIGINT,BIGINT>, \n" +
" `c_7` map<BIGINT,BIGINT>, \n" +
" `c_8` map<FLOAT,FLOAT>, \n" +
" `c_9` map<DOUBLE,DOUBLE>, \n" +
" `c_10` map<DECIMAL(4,2),DECIMAL(4,2)>, \n" +
" `c_11` map<DATE,DATE>,\n" +
" `c_12` map<DATE,DATE>, \n" +
" `c_13` map<TIMESTAMP,TIMESTAMP>, \n" +
" `c_14` map<TIMESTAMP,TIMESTAMP>, \n" +
" `c_15` map<TIMESTAMP_LTZ,TIMESTAMP_LTZ>, \n" +
" `c_16` map<TIMESTAMP_LTZ,TIMESTAMP_LTZ>, \n" +
" `c_17` map<TIMESTAMP_LTZ,TIMESTAMP_LTZ>, \n" +
" `c_18` map<CHAR(10),CHAR(10)>, \n" +
" `c_19` map<VARCHAR(256),VARCHAR(256)>, \n" +
" `c_20` map<STRING,STRING> \n" +
" " +
") WITH (\n" +
" 'connector' = 'datagen', \n" +
" 'number-of-rows' = '5', \n" +
" 'fields.c_7.key.min' = '1', \n" +
" 'fields.c_7.value.min' = '1', \n" +
" 'fields.c_7.key.max' = '10', \n" +
" 'fields.c_7.value.max' = '10', \n" +
" 'fields.c_8.key.min' = '1', \n" +
" 'fields.c_8.value.min' = '1', \n" +
" 'fields.c_8.key.max' = '10', \n" +
" 'fields.c_8.value.max' = '10', \n" +
" 'fields.c_18.key.length' = '10',\n" +
" 'fields.c_18.value.length' = '10',\n" +
" 'fields.c_19.key.length' = '10',\n" +
" 'fields.c_19.value.length' = '10',\n" +
" 'fields.c_20.key.length' = '10',\n" +
" 'fields.c_20.value.length' = '10'\n" +
");");

// define a dynamic aggregating query
// final Table result = tEnv.sqlQuery("SELECT * from map_doris_source ");
////
//// // print the result to the console
// tEnv.toRetractStream(result, Row.class).print();
// env.execute();

tEnv.executeSql("CREATE TABLE map_doris_sink (\n" +
" `id` int,\n" +
" `c_1` map<BOOLEAN,BOOLEAN>, \n" +
" `c_2` map<BOOLEAN,BOOLEAN>, \n" + //MAP<STRING,STRING>
" `c_3` map<TINYINT,TINYINT>, \n" +
" `c_4` map<SMALLINT,SMALLINT>, \n" +
" `c_5` map<INT,INT> ,\n" +
" `c_6` map<BIGINT,BIGINT>, \n" +
" `c_7` map<BIGINT,BIGINT>, \n" +
" `c_8` map<FLOAT,FLOAT>, \n" +
" `c_9` map<DOUBLE,DOUBLE>, \n" +
" `c_10` map<DECIMAL(4,2),DECIMAL(4,2)>, \n" +//MAP<DECIMALV3(4,2),DECIMALV3(4,2)>
" `c_11` map<DATE,DATE>, \n" + //MAP<DATEV2,DATEV2>
" `c_12` map<DATE,DATE>, \n" + //MAP<STRING,STRING>
" `c_13` map<TIMESTAMP,TIMESTAMP>, \n" +//MAP< DATETIMEV2, DATETIMEV2>
" `c_14` map<TIMESTAMP,TIMESTAMP>, \n" + //MAP<STRING,STRING>
" `c_15` map<TIMESTAMP_LTZ,TIMESTAMP_LTZ>, \n" +//MAP< DATETIMEV2, DATETIMEV2>
" `c_16` map<TIMESTAMP_LTZ,TIMESTAMP_LTZ>, \n" +//MAP< DATETIMEV2(3), DATETIMEV2(3)>
" `c_17` map<TIMESTAMP_LTZ,TIMESTAMP_LTZ>, \n" +//MAP<STRING,STRING>
" `c_18` map<CHAR(10),CHAR(10)>, \n" +
" `c_19` map<VARCHAR(256),VARCHAR(256)>, \n" +
" `c_20` map<STRING,STRING> \n" +
" " +
") WITH (\n" +
" 'connector' = 'doris',\n" +
" 'fenodes' = '127.0.0.1:8030',\n" +
" 'table.identifier' = 'test.all_map_type',\n" +
" 'username' = 'root',\n" +
" 'password' = '',\n" +
" 'sink.label-prefix' = 'doris_label_map" + UUID.randomUUID() + "'" +
");");

tEnv.executeSql("INSERT INTO map_doris_sink select * from map_doris_source");
}
}
Loading