From bbe77310f3801ab7ed7ece7f82454598e09df276 Mon Sep 17 00:00:00 2001 From: benjobs Date: Mon, 7 Oct 2024 11:43:39 +0800 Subject: [PATCH] [Improve] load flink-config minor improvements --- .../flink/client/trait/FlinkClientTrait.scala | 2 +- .../flink/core/FlinkStreamingInitializer.scala | 12 +----------- .../flink/core/FlinkTableInitializer.scala | 12 +----------- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 896767ca3a..0437dbaab4 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -77,7 +77,7 @@ trait FlinkClientTrait extends Logger { |""".stripMargin) val flinkConfig = prepareConfig(submitRequest) - + flinkConfig.toMap.foreach(c => logInfo(s"flinkConfig: ${c._1}: ${c._2}")) setConfig(submitRequest, flinkConfig) Try(doSubmit(submitRequest, flinkConfig)) match { diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index ab1808f1c6..27c72cd196 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -95,17 +95,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api .mergeWith(ParameterTool.fromMap(appConf)) .mergeWith(argsMap) - val flinkConf: Map[String, String] = { - parameter.get(KEY_FLINK_CONF(), null) match { - case flinkConf if flinkConf != null => - PropertiesUtils - .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf)) - .filter(_._2.nonEmpty) - case _ => Map.empty - } - } - - val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf) + val envConfig = Configuration.fromMap(appFlinkConf) FlinkConfiguration(parameter, envConfig, null) } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 89f3044d31..1727a72a02 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -187,17 +187,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX) val sqlConf = extractConfigByPrefix(configMap, KEY_SQL_PREFIX) - val flinkConf: Map[String, String] = { - parameter.get(KEY_FLINK_CONF(), null) match { - case flinkConf if flinkConf != null => - PropertiesUtils - .loadFlinkConfYaml(DeflaterUtils.unzipString(flinkConf)) - .filter(_._2.nonEmpty) - case _ => Map.empty - } - } - - val envConfig = Configuration.fromMap(flinkConf ++ appFlinkConf) + val envConfig = Configuration.fromMap(appFlinkConf) val tableConfig = Configuration.fromMap(tableConf) val parameterTool = ParameterTool