diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue index 82ed8bc2e5..9c415fd955 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue @@ -75,7 +75,7 @@ checkedChildren: 'ON', unCheckedChildren: 'OFF', }, - defaultValue: receiveData.historySavePoint && receiveData.historySavePoint.length > 0, + defaultValue: receiveData.selected != null, afterItem: () => h('span', { class: 'pop-tip' }, t('flink.app.view.savepointTip')), }, { diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts index c83e298958..6956e69c4e 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts @@ -64,6 +64,12 @@ export const useAppTableAction = ( /* Operation button list */ function getActionList(record: AppListRecord, currentPageNo: number): ActionItem[] { return [ + { + tooltip: { title: t('flink.app.operation.detail') }, + auth: 'app:detail', + icon: 'carbon:data-view-alt', + onClick: handleDetail.bind(null, record), + }, { tooltip: { title: t('flink.app.operation.release') }, ifShow: @@ -210,6 +216,13 @@ export const useAppTableAction = ( }), }; } + + /* Click for details */ + function handleDetail(app: AppListRecord) { + flinkAppStore.setApplicationId(app.id); + router.push({ path: '/flink/app/detail', query: { appId: app.id } }); + } + /* Click to edit */ function handleEdit(app: AppListRecord, currentPageNo: number) { // Record the current page number diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index 4c3cf57125..f9ea57cdae 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -151,7 +151,7 @@ object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Lo val flinkConfig = getFlinkK8sConfig(deployRequest) - replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)", deployRequest.clusterName) + FlinkSessionSubmitHelper.doReplaceJobName(flinkConfig, deployRequest.clusterName) val kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client") diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index 76d86b8b63..091e6a998a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -20,6 +20,7 @@ package org.apache.streampark.flink.client.impl import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.client.`trait`.YarnClientTrait import org.apache.streampark.flink.client.bean._ +import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper import org.apache.commons.lang3.StringUtils import org.apache.flink.client.deployment.ClusterSpecification @@ -77,7 +78,7 @@ object YarnSessionClient extends YarnClientTrait { // app name .safeSet(YarnConfigOptions.APPLICATION_NAME, deployRequest.clusterName) - replaceConfig(flinkConfig, "\\$\\{job(Name|name)}|\\$job(Name|name)", deployRequest.clusterName) + FlinkSessionSubmitHelper.doReplaceJobName(flinkConfig, deployRequest.clusterName) logInfo(s""" |------------------------------------------------------------------ diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala index 2e1945e964..fcfcd3fc16 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala @@ -34,6 +34,7 @@ import org.json4s.jackson.Serialization import java.io.File import java.nio.charset.StandardCharsets +import scala.collection.JavaConversions._ import scala.util.{Failure, Success, Try} object FlinkSessionSubmitHelper extends Logger { @@ -102,6 +103,20 @@ object FlinkSessionSubmitHelper extends Logger { } } + private[client] def doReplaceJobName(flinkConfig: Configuration, replacement: String): Unit = { + flinkConfig + .keySet() + .foreach( + k => { + val v = flinkConfig.getString(k, null) + if (v != null) { + val result = v + .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", replacement) + flinkConfig.setString(k, result) + } + }) + } + } /** diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index d741df50f3..896767ca3a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -496,18 +496,4 @@ trait FlinkClientTrait extends Logger { }) } - def replaceConfig(flinkConfig: Configuration, regexp: String, replacement: String): Unit = { - flinkConfig - .keySet() - .foreach( - k => { - val v = flinkConfig.getString(k, null) - if (v != null) { - val result = v - .replaceAll(regexp, replacement) - flinkConfig.setString(k, result) - } - }) - } - }