Skip to content

Commit

Permalink
[fix](connector) Fix catalog config check (apache#242)
Browse files Browse the repository at this point in the history
* fix unstable ut

* fix unstable ut

* fix doris catalog init config table check failed
  • Loading branch information
gnehil authored Dec 27, 2024
1 parent 160d366 commit f843e0c
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,14 @@ protected Schema processDorisSchema(DorisReaderPartition partition, final Schema
.collect(Collectors.toMap(Field::getName, Function.identity()));
String[] readColumns = partition.getReadColumns();
List<Field> newFieldList = new ArrayList<>();
int offset = 0;
for (int i = 0; i < readColumns.length; i++) {
String readColumn = readColumns[i];
for (String readColumn : readColumns) {
if (!fieldTypeMap.containsKey(readColumn) && readColumn.contains(" AS ")) {
int asIdx = readColumn.indexOf(" AS ");
String realColumn = readColumn.substring(asIdx + 4).trim().replaceAll("`", "");
if (fieldTypeMap.containsKey(realColumn)
&& ("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType())
|| "HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) {
newFieldList.add(new Field(realColumn, TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
offset++;
}
} else {
newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", "")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@

public class DorisConfig implements Serializable {

private Map<String, String> configOptions;
private final String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
private final String DORIS_REQUEST_AUTH_PASSWORD = "doris.request.auth.password";

private Map<String, String> configOptions;
private boolean ignoreTableCheck;

// only for test
public DorisConfig() {
configOptions = Collections.emptyMap();
}

private DorisConfig(Map<String, String> options) throws OptionRequiredException {
private DorisConfig(Map<String, String> options, Boolean ignoreTableCheck) throws OptionRequiredException {
this.configOptions = new HashMap<>(processOptions(options));
this.ignoreTableCheck = ignoreTableCheck;
checkOptions(this.configOptions);
}

Expand Down Expand Up @@ -87,14 +90,16 @@ private void checkOptions(Map<String, String> options) throws OptionRequiredExce
throw new IllegalArgumentException("option [" + DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for example: host:port[,host2:port]");
}
}
if (!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) {
throw new OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
} else {
String tableIdentifier = options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
if (tableIdentifier.isEmpty()) {
throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty");
} else if (!tableIdentifier.contains(".")) {
throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format, for example: db.table");
if (!ignoreTableCheck) {
if (!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) {
throw new OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
} else {
String tableIdentifier = options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
if (tableIdentifier.isEmpty()) {
throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty");
} else if (!tableIdentifier.contains(".")) {
throw new IllegalArgumentException("option [" + DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format, for example: db.table");
}
}
}
if (!options.containsKey(DorisOptions.DORIS_USER.getName())) {
Expand Down Expand Up @@ -176,16 +181,16 @@ public Map<String, String> toMap() {
return new HashMap<>(configOptions);
}

public static DorisConfig fromMap(Map<String, String> sparkConfMap) throws OptionRequiredException {
return fromMap(sparkConfMap, Collections.emptyMap());
public static DorisConfig fromMap(Map<String, String> sparkConfMap, Boolean ignoreTableCheck) throws OptionRequiredException {
return fromMap(sparkConfMap, Collections.emptyMap(), ignoreTableCheck);
}

public static DorisConfig fromMap(Map<String, String> sparkConfMap, Map<String, String> options) throws OptionRequiredException {
public static DorisConfig fromMap(Map<String, String> sparkConfMap, Map<String, String> options, Boolean ignoreTableCheck) throws OptionRequiredException {
Map<String, String> map = new HashMap<>(sparkConfMap);
if (MapUtils.isNotEmpty(options)) {
map.putAll(options);
}
return new DorisConfig(map);
return new DorisConfig(map, ignoreTableCheck);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class DorisOptions {

public static final ConfigOption<String> DORIS_HTTPS_KEY_STORE_PASSWORD = ConfigOptions.name("doris.https.key-store-password").stringType().withoutDefaultValue().withDescription("");

public static final ConfigOption<String> LOAD_MODE = ConfigOptions.name("doris.sink.load.mode").stringType().defaultValue("stream_load").withDescription("");
public static final ConfigOption<String> LOAD_MODE = ConfigOptions.name("doris.sink.mode").stringType().defaultValue("stream_load").withDescription("");

public static final ConfigOption<String> READ_MODE = ConfigOptions.name("doris.read.mode").stringType().defaultValue("thrift").withDescription("");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected[spark] abstract class AbstractDorisRDD[T: ClassTag](
/**
* doris configuration get from rdd parameters and spark conf.
*/
@transient private[spark] lazy val dorisCfg = DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava)
@transient private[spark] lazy val dorisCfg = DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava, false)

@transient private[spark] lazy val dorisPartitions = ReaderPartitionGenerator.generatePartitions(dorisCfg)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[sql] class DorisRelation(
val sqlContext: SQLContext, parameters: Map[String, String])
extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan with InsertableRelation {

private lazy val cfg = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava)
private lazy val cfg = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava, false)

private lazy val inValueLengthLimit = cfg.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sql] class DorisSourceProvider extends DorisSourceRegisterTrait
mode: SaveMode, parameters: Map[String, String],
data: DataFrame): BaseRelation = {

val config = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava)
val config = DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, parameters.asJava, false)

mode match {
case SaveMode.Overwrite =>
Expand Down Expand Up @@ -85,7 +85,7 @@ private[sql] class DorisSourceProvider extends DorisSourceRegisterTrait
}

override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
new DorisStreamLoadSink(sqlContext, DorisConfig.fromMap(Utils.params(parameters, logger).asJava))
new DorisStreamLoadSink(sqlContext, DorisConfig.fromMap(Utils.params(parameters, logger).asJava, false))
}

private def truncateTable(config: DorisConfig): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.spark.catalog

import org.apache.doris.spark.client.DorisFrontendClient
import org.apache.doris.spark.config.DorisConfig
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.read.DorisScanBuilder
import org.apache.doris.spark.rest.models.Schema
import org.apache.doris.spark.util.SchemaConvertors
Expand Down Expand Up @@ -53,10 +53,12 @@ class DorisTable(identifier: Identifier, config: DorisConfig, schema: Option[Str
}

override def newScanBuilder(caseInsensitiveStringMap: CaseInsensitiveStringMap): ScanBuilder = {
new DorisScanBuilder(config: DorisConfig, schema())
config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name())
new DorisScanBuilder(config, schema())
}

override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): WriteBuilder = {
config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name())
new DorisWriteBuilder(config, logicalWriteInfo.schema())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with TableCatalog {
override def initialize(name: String, caseInsensitiveStringMap: CaseInsensitiveStringMap): Unit = {
assert(catalogName.isEmpty, "The Doris table catalog is already initialed")
catalogName = Some(name)
dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap)
dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap, true)
frontend = new DorisFrontendClient(dorisConfig)
}

Expand All @@ -55,7 +55,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with TableCatalog {
override def loadTable(identifier: Identifier): Table = {
checkIdentifier(identifier)
new DorisTable(identifier, DorisConfig.fromMap((dorisConfig.toMap.asScala +
(DorisOptions.DORIS_TABLE_IDENTIFIER.getName -> getFullTableName(identifier))).asJava), None)
(DorisOptions.DORIS_TABLE_IDENTIFIER.getName -> getFullTableName(identifier))).asJava, false), None)
}

override def createTable(identifier: Identifier, structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = throw new UnsupportedOperationException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DorisDataSourceV2 extends DorisDataSource with TableProvider {
override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = {
if (t != null) t
else {
val dorisConfig = DorisConfig.fromMap(properties)
val dorisConfig = DorisConfig.fromMap(properties, false)
val tableIdentifier = dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
val tableIdentifierArr = tableIdentifier.split("\\.")
new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)), tableIdentifierArr(1)), dorisConfig, Some(schema))
Expand All @@ -48,7 +48,7 @@ class DorisDataSourceV2 extends DorisDataSource with TableProvider {
private def getTable(options: CaseInsensitiveStringMap): Table = {
if (t != null) t
else {
val dorisConfig = DorisConfig.fromMap(options)
val dorisConfig = DorisConfig.fromMap(options, false)
val tableIdentifier = dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
val tableIdentifierArr = tableIdentifier.split("\\.")
new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)), tableIdentifierArr(1)), dorisConfig, None)
Expand Down

0 comments on commit f843e0c

Please sign in to comment.