From f843e0c13eeadf736e6120645947b19ddc9c09b8 Mon Sep 17 00:00:00 2001 From: gnehil Date: Fri, 27 Dec 2024 16:34:58 +0800 Subject: [PATCH] [fix](connector) Fix catalog config check (#242) * fix unstable ut * fix unstable ut * fix doris catalog init config table check failed --- .../client/read/DorisFlightSqlReader.java | 5 +-- .../doris/spark/config/DorisConfig.java | 33 +++++++++++-------- .../doris/spark/config/DorisOptions.java | 2 +- .../doris/spark/rdd/AbstractDorisRDD.scala | 2 +- .../spark/sql/sources/DorisRelation.scala | 2 +- .../doris/spark/sql/DorisSourceProvider.scala | 4 +-- .../doris/spark/catalog/DorisTable.scala | 6 ++-- .../spark/catalog/DorisTableCatalog.scala | 4 +-- .../spark/sql/sources/DorisDataSourceV2.scala | 4 +-- 9 files changed, 33 insertions(+), 29 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java index 88a66968..4561f400 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java @@ -159,9 +159,7 @@ protected Schema processDorisSchema(DorisReaderPartition partition, final Schema .collect(Collectors.toMap(Field::getName, Function.identity())); String[] readColumns = partition.getReadColumns(); List newFieldList = new ArrayList<>(); - int offset = 0; - for (int i = 0; i < readColumns.length; i++) { - String readColumn = readColumns[i]; + for (String readColumn : readColumns) { if (!fieldTypeMap.containsKey(readColumn) && readColumn.contains(" AS ")) { int asIdx = readColumn.indexOf(" AS "); String realColumn = readColumn.substring(asIdx + 4).trim().replaceAll("`", ""); @@ -169,7 +167,6 @@ protected Schema processDorisSchema(DorisReaderPartition partition, final Schema && ("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()) || "HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) { newFieldList.add(new Field(realColumn, TPrimitiveType.VARCHAR.name(), null, 0, 0, null)); - offset++; } } else { newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", ""))); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java index 1d04b70f..d8166f80 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java @@ -29,17 +29,20 @@ public class DorisConfig implements Serializable { - private Map configOptions; private final String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user"; private final String DORIS_REQUEST_AUTH_PASSWORD = "doris.request.auth.password"; + private Map configOptions; + private boolean ignoreTableCheck; + // only for test public DorisConfig() { configOptions = Collections.emptyMap(); } - private DorisConfig(Map options) throws OptionRequiredException { + private DorisConfig(Map options, Boolean ignoreTableCheck) throws OptionRequiredException { this.configOptions = new HashMap<>(processOptions(options)); + this.ignoreTableCheck = ignoreTableCheck; checkOptions(this.configOptions); } @@ -87,14 +90,16 @@ private void checkOptions(Map options) throws OptionRequiredExce throw new IllegalArgumentException("option [" + DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for example: host:port[,host2:port]"); } } - if (!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) { - throw new OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName()); - } else { - String tableIdentifier = options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName()); - if (tableIdentifier.isEmpty()) { - throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty"); - } else if (!tableIdentifier.contains(".")) { - throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format, for example: db.table"); + if (!ignoreTableCheck) { + if (!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) { + throw new OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName()); + } else { + String tableIdentifier = options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName()); + if (tableIdentifier.isEmpty()) { + throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty"); + } else if (!tableIdentifier.contains(".")) { + throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format, for example: db.table"); + } } } if (!options.containsKey(DorisOptions.DORIS_USER.getName())) { @@ -176,16 +181,16 @@ public Map toMap() { return new HashMap<>(configOptions); } - public static DorisConfig fromMap(Map sparkConfMap) throws OptionRequiredException { - return fromMap(sparkConfMap, Collections.emptyMap()); + public static DorisConfig fromMap(Map sparkConfMap, Boolean ignoreTableCheck) throws OptionRequiredException { + return fromMap(sparkConfMap, Collections.emptyMap(), ignoreTableCheck); } - public static DorisConfig fromMap(Map sparkConfMap, Map options) throws OptionRequiredException { + public static DorisConfig fromMap(Map sparkConfMap, Map options, Boolean ignoreTableCheck) throws OptionRequiredException { Map map = new HashMap<>(sparkConfMap); if (MapUtils.isNotEmpty(options)) { map.putAll(options); } - return new DorisConfig(map); + return new DorisConfig(map, ignoreTableCheck); } } \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java index 6d4f9ffe..455c14be 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java @@ -110,7 +110,7 @@ public class DorisOptions { public static final ConfigOption DORIS_HTTPS_KEY_STORE_PASSWORD = ConfigOptions.name("doris.https.key-store-password").stringType().withoutDefaultValue().withDescription(""); - public static final ConfigOption LOAD_MODE = ConfigOptions.name("doris.sink.load.mode").stringType().defaultValue("stream_load").withDescription(""); + public static final ConfigOption LOAD_MODE = ConfigOptions.name("doris.sink.mode").stringType().defaultValue("stream_load").withDescription(""); public static final ConfigOption READ_MODE = ConfigOptions.name("doris.read.mode").stringType().defaultValue("thrift").withDescription(""); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala index 54598ebd..bb9008a9 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala @@ -49,7 +49,7 @@ protected[spark] abstract class AbstractDorisRDD[T: ClassTag]( /** * doris configuration get from rdd parameters and spark conf. */ - @transient private[spark] lazy val dorisCfg = DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava) + @transient private[spark] lazy val dorisCfg = DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava, false) @transient private[spark] lazy val dorisPartitions = ReaderPartitionGenerator.generatePartitions(dorisCfg) } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala index 070629fb..55249e95 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala @@ -35,7 +35,7 @@ private[sql] class DorisRelation( val sqlContext: SQLContext, parameters: Map[String, String]) extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan with InsertableRelation { - private lazy val cfg = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava) + private lazy val cfg = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava, false) private lazy val inValueLengthLimit = cfg.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) diff --git a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index a989359d..86d18dc7 100644 --- a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -54,7 +54,7 @@ private[sql] class DorisSourceProvider extends DorisSourceRegisterTrait mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - val config = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava) + val config = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava, false) mode match { case SaveMode.Overwrite => @@ -85,7 +85,7 @@ private[sql] class DorisSourceProvider extends DorisSourceRegisterTrait } override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - new DorisStreamLoadSink(sqlContext, DorisConfig.fromMap(Utils.params(parameters, logger).asJava)) + new DorisStreamLoadSink(sqlContext, DorisConfig.fromMap(Utils.params(parameters, logger).asJava, false)) } private def truncateTable(config: DorisConfig): Unit = { diff --git a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala index 6a890a82..d9aca8a9 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala @@ -18,7 +18,7 @@ package org.apache.doris.spark.catalog import org.apache.doris.spark.client.DorisFrontendClient -import org.apache.doris.spark.config.DorisConfig +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} import org.apache.doris.spark.read.DorisScanBuilder import org.apache.doris.spark.rest.models.Schema import org.apache.doris.spark.util.SchemaConvertors @@ -53,10 +53,12 @@ class DorisTable(identifier: Identifier, config: DorisConfig, schema: Option[Str } override def newScanBuilder(caseInsensitiveStringMap: CaseInsensitiveStringMap): ScanBuilder = { - new DorisScanBuilder(config: DorisConfig, schema()) + config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name()) + new DorisScanBuilder(config, schema()) } override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): WriteBuilder = { + config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name()) new DorisWriteBuilder(config, logicalWriteInfo.schema()) } diff --git a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala index b953c247..9ca7f026 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala @@ -44,7 +44,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with TableCatalog { override def initialize(name: String, caseInsensitiveStringMap: CaseInsensitiveStringMap): Unit = { assert(catalogName.isEmpty, "The Doris table catalog is already initialed") catalogName = Some(name) - dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap) + dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap, true) frontend = new DorisFrontendClient(dorisConfig) } @@ -55,7 +55,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with TableCatalog { override def loadTable(identifier: Identifier): Table = { checkIdentifier(identifier) new DorisTable(identifier, DorisConfig.fromMap((dorisConfig.toMap.asScala + - (DorisOptions.DORIS_TABLE_IDENTIFIER.getName -> getFullTableName(identifier))).asJava), None) + (DorisOptions.DORIS_TABLE_IDENTIFIER.getName -> getFullTableName(identifier))).asJava, false), None) } override def createTable(identifier: Identifier, structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = throw new UnsupportedOperationException() diff --git a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala index f7fbc2a0..4fad7edd 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala @@ -38,7 +38,7 @@ class DorisDataSourceV2 extends DorisDataSource with TableProvider { override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { if (t != null) t else { - val dorisConfig = DorisConfig.fromMap(properties) + val dorisConfig = DorisConfig.fromMap(properties, false) val tableIdentifier = dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER) val tableIdentifierArr = tableIdentifier.split("\\.") new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)), tableIdentifierArr(1)), dorisConfig, Some(schema)) @@ -48,7 +48,7 @@ class DorisDataSourceV2 extends DorisDataSource with TableProvider { private def getTable(options: CaseInsensitiveStringMap): Table = { if (t != null) t else { - val dorisConfig = DorisConfig.fromMap(options) + val dorisConfig = DorisConfig.fromMap(options, false) val tableIdentifier = dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER) val tableIdentifierArr = tableIdentifier.split("\\.") new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)), tableIdentifierArr(1)), dorisConfig, None)