From 1326b9ff1bab693f8d2d5f58d38ce9e5a779a49c Mon Sep 17 00:00:00 2001 From: benjobs Date: Sun, 16 Jun 2024 20:10:12 +0800 Subject: [PATCH] [Feat] sync quick-install support from dev-2.1.4 (#3763) Co-authored-by: benjobs --- .../streampark/common/util/FileUtils.scala | 18 ++ .../common/util/PropertiesUtils.scala | 93 ++++-- .../src/main/assembly/bin/streampark.sh | 51 ++-- .../console/base/util/BashJavaUtils.java | 90 +++++- .../controller/FlinkClusterController.java | 6 +- .../console/core/entity/AppBuildPipeline.java | 9 +- .../console/core/entity/Application.java | 2 +- .../core/entity/ApplicationConfig.java | 6 +- .../console/core/entity/FlinkCluster.java | 5 +- .../console/core/entity/SparkApplication.java | 2 +- .../console/core/runner/QuickStartRunner.java | 101 +++++++ .../core/service/FlinkClusterService.java | 2 +- .../service/impl/AppBuildPipeServiceImpl.java | 6 +- .../service/impl/FlinkClusterServiceImpl.java | 7 +- .../impl/SparkAppBuildPipeServiceImpl.java | 6 +- .../core/utils/YarnQueueLabelExpression.java | 3 +- .../system/runner/StartedUpRunner.java | 19 +- streampark.sh | 282 ++++++++++++++++++ 18 files changed, 623 insertions(+), 85 deletions(-) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java create mode 100755 streampark.sh diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index e8fe7ef515..7d508b2c12 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -25,6 +25,7 @@ import java.nio.channels.Channels import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import java.util +import java.util.Scanner import java.util.stream.Collectors import scala.collection.convert.ImplicitConversions._ @@ -282,4 +283,21 @@ object FileUtils { null } + @throws[IOException] + def readString(file: File): String = { + require(file != null && file.isFile) + val reader = new FileReader(file) + val scanner = new Scanner(reader) + val buffer = new mutable.StringBuilder() + if (scanner.hasNextLine) { + buffer.append(scanner.nextLine()) + } + while (scanner.hasNextLine) { + buffer.append("\r\n") + buffer.append(scanner.nextLine()) + } + Utils.close(scanner, reader) + buffer.toString() + } + } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index e3a0ae5478..d9a00a6d9c 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -70,13 +70,12 @@ object PropertiesUtils extends Logger { }) .toMap case text => - val value = text match { - case null => "" - case other => other.toString - } - prefix match { - case "" => proper += k -> value - case other => proper += s"$other.$k" -> value + if (text != null) { + val value = text.toString.trim + prefix match { + case "" => proper += k -> value + case other => proper += s"$other.$k" -> value + } } proper.toMap } @@ -276,7 +275,7 @@ object PropertiesUtils extends Logger { /** extract flink configuration from application.properties */ @Nonnull def extractDynamicProperties(properties: String): Map[String, String] = { - if (StringUtils.isBlank(properties)) Map.empty[String, String] + if (StringUtils.isEmpty(properties)) Map.empty[String, String] else { val map = mutable.Map[String, String]() val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "") @@ -308,28 +307,76 @@ object PropertiesUtils extends Logger { @Nonnull def extractArguments(args: String): List[String] = { val programArgs = new ArrayBuffer[String]() if (StringUtils.isNotEmpty(args)) { - val array = args.split("\\s+") - val iter = array.iterator - while (iter.hasNext) { - val v = iter.next() - val p = v.take(1) - p match { - case "'" | "\"" => - var value = v - if (!v.endsWith(p)) { - while (!value.endsWith(p) && iter.hasNext) { - value += s" ${iter.next()}" - } + return extractArguments(args.split("\\s+")) + } + programArgs.toList + } + + def extractArguments(array: Array[String]): List[String] = { + val programArgs = new ArrayBuffer[String]() + val iter = array.iterator + while (iter.hasNext) { + val v = iter.next() + val p = v.take(1) + p match { + case "'" | "\"" => + var value = v + if (!v.endsWith(p)) { + while (!value.endsWith(p) && iter.hasNext) { + value += s" ${iter.next()}" } - programArgs += value.substring(1, value.length - 1) - case _ => programArgs += v - } + } + programArgs += value.substring(1, value.length - 1) + case _ => + val regexp = "(.*)='(.*)'$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + val regexp = "(.*)=\"(.*)\"$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + programArgs += v + } + } } } programArgs.toList } + def extractMultipleArguments(array: Array[String]): Map[String, Map[String, String]] = { + val iter = array.iterator + val map = mutable.Map[String, mutable.Map[String, String]]() + while (iter.hasNext) { + val v = iter.next() + v.take(2) match { + case "--" => + val kv = iter.next() + val regexp = "(.*)=(.*)" + if (kv.matches(regexp)) { + val values = kv.split("=") + val k1 = values(0).trim + val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "") + val k = v.drop(2) + map.get(k) match { + case Some(m) => m += k1 -> v1 + case _ => map += k -> mutable.Map(k1 -> v1) + } + } + case _ => + } + } + map.map(x => x._1 -> x._2.toMap).toMap + } + @Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] = new JavaMap[String, String](extractDynamicProperties(properties).asJava) + @Nonnull def extractMultipleArgumentsAsJava( + args: Array[String]): JavaMap[String, JavaMap[String, String]] = { + val map = + extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String, String](c._2.asJava)) + new JavaMap[String, JavaMap[String, String]](map.asJava) + } + } diff --git a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh index 7b3e59f876..4df9e5e67b 100755 --- a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh +++ b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh @@ -132,7 +132,7 @@ APP_BASE="$APP_HOME" APP_CONF="$APP_BASE"/conf APP_LIB="$APP_BASE"/lib APP_LOG="$APP_BASE"/logs -APP_PID="$APP_BASE"/streampark.pid +APP_PID="$APP_BASE"/.pid APP_OUT="$APP_LOG"/streampark.out # shellcheck disable=SC2034 APP_TMPDIR="$APP_BASE"/temp @@ -241,10 +241,16 @@ if [[ "$USE_NOHUP" = "true" ]]; then NOHUP="nohup" fi -BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils" +CONFIG="${APP_CONF}/config.yaml" +# shellcheck disable=SC2006 +if [[ ! -f "$CONFIG" ]] ; then + echo_r "can not found config.yaml in \"conf\" directory, please check." + exit 1; +fi +BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils" APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap" - +SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG") JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh JVM_ARGS="" @@ -276,21 +282,8 @@ print_logo() { printf ' %s WebSite: https://streampark.apache.org%s\n' $BLUE $RESET printf ' %s GitHub : http://github.com/apache/streampark%s\n\n' $BLUE $RESET printf ' %s ──────── Apache StreamPark, Make stream processing easier ô~ô!%s\n\n' $PRIMARY $RESET -} - -init_env() { - # shellcheck disable=SC2006 - CONFIG="${APP_CONF}/application.yml" - if [[ -f "$CONFIG" ]] ; then - echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" file. The \"application.yml\" file is deprecated. - For compatibility, this application.yml will be used preferentially. The latest configuration file is \"config.yaml\". It is recommended to use \"config.yaml\". - Note: \"application.yml\" will be completely deprecated in version 2.2.0. """ - else - CONFIG="${APP_CONF}/config.yaml" - if [[ ! -f "$CONFIG" ]] ; then - echo_r "can not found config.yaml in \"conf\" directory, please check." - exit 1; - fi + if [[ "$1"x == "start"x ]]; then + printf ' %s http://localhost:%s %s\n\n' $PRIMARY $SERVER_PORT $RESET fi } @@ -313,19 +306,19 @@ get_pid() { fi # shellcheck disable=SC2006 - local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG"` - if [[ x"${serverPort}" == x"" ]]; then + if [[ "${SERVER_PORT}"x == ""x ]]; then echo_r "server.port is required, please check $CONFIG" exit 1; else # shellcheck disable=SC2006 # shellcheck disable=SC2155 - local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$serverPort"` - if [[ x"${used}" == x"used" ]]; then + local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$SERVER_PORT"` + if [[ "${used}"x == "used"x ]]; then # shellcheck disable=SC2006 local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'` + # shellcheck disable=SC2236 if [[ ! -z $PID ]]; then - echo $PID + echo "$PID" else echo 0 fi @@ -411,7 +404,7 @@ start() { -Dapp.home="${APP_HOME}" \ -Dlogging.config="${APP_CONF}/logback-spring.xml" \ -Djava.io.tmpdir="$APP_TMPDIR" \ - $APP_MAIN >> "$APP_OUT" 2>&1 "&" + $APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&" local PID=$! local IS_NUMBER="^[0-9]+$" @@ -565,27 +558,31 @@ restart() { } main() { - print_logo - init_env case "$1" in "debug") DEBUG_PORT=$2 debug ;; "start") - start + shift + start "$@" + [[ $? -eq 0 ]] && print_logo "start" ;; "start_docker") + print_logo start_docker ;; "stop") + print_logo stop ;; "status") + print_logo status ;; "restart") restart + [[ $? -eq 0 ]] && print_logo "start" ;; *) echo_r "Unknown command: $1" diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java index faf8b8eadf..999167fdeb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java @@ -17,15 +17,30 @@ package org.apache.streampark.console.base.util; +import org.apache.streampark.common.conf.FlinkVersion; +import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.commons.io.output.NullOutputStream; + +import java.io.File; +import java.io.FileWriter; import java.io.IOException; -import java.net.ServerSocket; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.Socket; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.Map; public class BashJavaUtils { + private static String localhost = "localhost"; + public static void main(String[] args) throws IOException { String action = args[0].toLowerCase(); String[] actionArgs = Arrays.copyOfRange(args, 1, args.length); @@ -39,12 +54,77 @@ public static void main(String[] args) throws IOException { System.out.println(value); break; case "--check_port": - Integer port = Integer.parseInt(actionArgs[0]); - try { - new ServerSocket(port); + int port = Integer.parseInt(actionArgs[0]); + try (Socket ignored = new Socket(localhost, port)) { + System.out.println("used"); + } catch (Exception e) { System.out.println("free"); + } + break; + case "--free_port": + int start = Integer.parseInt(actionArgs[0]); + for (port = start; port < 65535; port++) { + try (Socket ignored = new Socket(localhost, port)) { + } catch (Exception e) { + System.out.println(port); + break; + } + } + break; + case "--read_flink": + String input = actionArgs[0]; + String[] inputs = input.split(":"); + String flinkDist = + Arrays.stream(inputs).filter(c -> c.contains("flink-dist-")).findFirst().get(); + File flinkHome = new File(flinkDist.replaceAll("/lib/.*", "")); + FlinkVersion flinkVersion = new FlinkVersion(flinkHome.getAbsolutePath()); + + PrintStream originalOut = System.out; + System.setOut(new PrintStream(new NullOutputStream())); + + String version = flinkVersion.majorVersion(); + float ver = Float.parseFloat(version); + File yaml = + new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" : "/conf/config.yaml"); + + Map config = PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath()); + String flinkPort = config.getOrDefault("rest.port", "8081"); + System.setOut(originalOut); + System.out.println( + flinkHome + .getAbsolutePath() + .concat(",") + .concat(flinkHome.getName()) + .concat(",") + .concat(flinkPort)); + break; + case "--replace": + String filePath = actionArgs[0]; + String[] text = actionArgs[1].split("\\|\\|"); + String searchText = text[0]; + String replaceText = text[1]; + try { + File file = new File(filePath); + String content = FileUtils.readString(file); + content = content.replace(searchText, replaceText); + FileWriter writer = new FileWriter(filePath); + writer.write(content); + writer.flush(); + writer.close(); + System.exit(0); + } catch (IOException e) { + System.exit(1); + } + break; + case "--download": + try { + URL url = new URL(actionArgs[0]); + Path path = Paths.get(actionArgs[1]).toAbsolutePath().normalize(); + try (InputStream inStream = url.openStream()) { + Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING); + } } catch (Exception e) { - System.out.println("used"); + System.exit(1); } break; default: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java index 71b297d67b..dd796437dd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java @@ -23,6 +23,7 @@ import org.apache.streampark.console.core.bean.ResponseResult; import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.service.FlinkClusterService; +import org.apache.streampark.console.core.service.ServiceHelper; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -43,6 +44,8 @@ public class FlinkClusterController { @Autowired private FlinkClusterService flinkClusterService; + @Autowired private ServiceHelper serviceHelper; + @PostMapping("availableList") public RestResponse listAvailableCluster() { List flinkClusters = flinkClusterService.listAvailableCluster(); @@ -70,7 +73,8 @@ public RestResponse check(FlinkCluster cluster) { @PostMapping("create") @RequiresPermissions("cluster:create") public RestResponse create(FlinkCluster cluster) { - Boolean success = flinkClusterService.create(cluster); + Long userId = serviceHelper.getUserId(); + Boolean success = flinkClusterService.create(cluster, userId); return RestResponse.success(success); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java index 75b5e386c3..9314a07149 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java @@ -46,7 +46,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -122,7 +121,7 @@ public AppBuildPipeline setPipeStatus(@Nonnull PipelineStatusEnum pipeStatus) { @JsonIgnore public Map getStepStatus() { if (StringUtils.isBlank(stepStatusJson)) { - return Collections.emptyMap(); + return new HashMap<>(); } try { return JacksonUtils.read( @@ -130,7 +129,7 @@ public Map getStepStatus() { } catch (JsonProcessingException e) { log.error( "json parse error on ApplicationBuildPipeline, stepStatusJson={}", stepStatusJson, e); - return Collections.emptyMap(); + return new HashMap<>(); } } @@ -153,7 +152,7 @@ public AppBuildPipeline setStepStatus(@Nonnull Map getStepStatusTimestamp() { if (StringUtils.isBlank(stepStatusTimestampJson)) { - return Collections.emptyMap(); + return new HashMap<>(); } try { return JacksonUtils.read( @@ -163,7 +162,7 @@ public Map getStepStatusTimestamp() { "json parse error on ApplicationBuildPipeline, stepStatusJson={}", stepStatusTimestampJson, e); - return Collections.emptyMap(); + return new HashMap<>(); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 93dacc9458..bd6498757c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -438,7 +438,7 @@ public ApplicationType getApplicationType() { @SuppressWarnings("unchecked") public Map getOptionMap() { if (StringUtils.isBlank(this.options)) { - return Collections.emptyMap(); + return new HashMap<>(); } Map optionMap = JacksonUtils.read(this.options, Map.class); optionMap.entrySet().removeIf(entry -> entry.getValue() == null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java index 5a096fbdee..93634c3d05 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java @@ -34,8 +34,8 @@ import org.jetbrains.annotations.Nullable; import java.util.Base64; -import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -109,7 +109,7 @@ public Map readConfig() { }, Map.Entry::getValue)); } - return Collections.emptyMap(); + return new HashMap<>(); } @Nullable @@ -126,7 +126,7 @@ private Map renderConfigs() { case HOCON: return PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content)); default: - return Collections.emptyMap(); + return new HashMap<>(); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index d1ce09e770..0314323b11 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -46,7 +46,6 @@ import java.io.Serializable; import java.net.URI; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -136,7 +135,7 @@ public ClusterState getClusterStateEnum() { @SneakyThrows public Map getOptionMap() { if (StringUtils.isBlank(this.options)) { - return Collections.emptyMap(); + return new HashMap<>(); } Map optionMap = JacksonUtils.read(this.options, Map.class); if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) { @@ -167,7 +166,7 @@ public Map getFlinkConfig() throws JsonProcessingException { HttpClientUtils.httpGetRequest( restUrl, RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build()); if (StringUtils.isBlank(json)) { - return Collections.emptyMap(); + return new HashMap<>(); } List> confList = JacksonUtils.read(json, new TypeReference>>() {}); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java index f8d3d6fad1..eb8b6688db 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java @@ -422,7 +422,7 @@ public ApplicationType getApplicationType() { @SuppressWarnings("unchecked") public Map getOptionMap() { if (StringUtils.isBlank(this.options)) { - return Collections.emptyMap(); + return new HashMap<>(); } Map optionMap = JacksonUtils.read(this.options, Map.class); optionMap.entrySet().removeIf(entry -> entry.getValue() == null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java new file mode 100644 index 0000000000..98a215f743 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.runner; + +import org.apache.streampark.common.enums.ClusterState; +import org.apache.streampark.common.enums.FlinkExecutionMode; +import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.entity.FlinkCluster; +import org.apache.streampark.console.core.entity.FlinkEnv; +import org.apache.streampark.console.core.entity.FlinkSql; +import org.apache.streampark.console.core.service.AppBuildPipeService; +import org.apache.streampark.console.core.service.FlinkClusterService; +import org.apache.streampark.console.core.service.FlinkEnvService; +import org.apache.streampark.console.core.service.FlinkSqlService; +import org.apache.streampark.console.core.service.application.ApplicationManageService; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +@Order +@Slf4j +@Component +public class QuickStartRunner implements ApplicationRunner { + + @Autowired private FlinkEnvService flinkEnvService; + + @Autowired private FlinkClusterService flinkClusterService; + + @Autowired private FlinkSqlService flinkSqlService; + + @Autowired private ApplicationManageService applicationManageService; + + @Autowired private AppBuildPipeService appBuildPipeService; + + private static Long defaultId = 100000L; + + @Override + public void run(ApplicationArguments args) throws Exception { + Map> map = + PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs()); + + Map quickstart = map.get("quickstart"); + + if (quickstart != null && quickstart.size() == 3) { + // 1) create flinkEnv + FlinkEnv flinkEnv = new FlinkEnv(); + flinkEnv.setFlinkName(quickstart.get("flink_name")); + flinkEnv.setFlinkHome(quickstart.get("flink_home")); + flinkEnvService.create(flinkEnv); + + // 2) create flinkCluster + FlinkCluster flinkCluster = new FlinkCluster(); + flinkCluster.setClusterName("quickstart"); + flinkCluster.setVersionId(flinkEnv.getId()); + flinkCluster.setClusterState(ClusterState.RUNNING.getState()); + flinkCluster.setExecutionMode(FlinkExecutionMode.REMOTE.getMode()); + flinkCluster.setAddress("http://localhost:" + quickstart.get("flink_port")); + flinkClusterService.create(flinkCluster, defaultId); + + // 3) set flink version and cluster + Application app = new Application(); + app.setId(defaultId); + Application application = applicationManageService.getApp(app.getId()); + application.setFlinkClusterId(flinkCluster.getId()); + application.setVersionId(flinkEnv.getId()); + application.setExecutionMode(FlinkExecutionMode.REMOTE.getMode()); + + FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true); + application.setFlinkSql(flinkSql.getSql()); + + boolean success = applicationManageService.update(application); + if (success) { + // 4) build application + appBuildPipeService.buildApplication(defaultId, false); + } + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java index 30f0ea8810..ba24b5809d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java @@ -51,7 +51,7 @@ public interface FlinkClusterService extends IService { * @param flinkCluster FlinkCluster to be create * @return Whether the creation is successful */ - Boolean create(FlinkCluster flinkCluster); + Boolean create(FlinkCluster flinkCluster, Long userId); /** * Remove flink cluster diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index cf4d9adf0b..b85ce4f452 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -105,8 +105,8 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -615,14 +615,14 @@ public boolean allowToBuildNow(@Nonnull Long appId) { @Override public Map listAppIdPipelineStatusMap(List appIds) { if (CollectionUtils.isEmpty(appIds)) { - return Collections.emptyMap(); + return new HashMap<>(); } LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().in(AppBuildPipeline::getAppId, appIds); List appBuildPipelines = baseMapper.selectList(queryWrapper); if (CollectionUtils.isEmpty(appBuildPipelines)) { - return Collections.emptyMap(); + return new HashMap<>(); } return appBuildPipelines.stream() .collect(Collectors.toMap(AppBuildPipeline::getAppId, AppBuildPipeline::getPipelineStatus)); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 73ee6370bf..e908453128 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -28,7 +28,6 @@ import org.apache.streampark.console.core.mapper.FlinkClusterMapper; import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; -import org.apache.streampark.console.core.service.ServiceHelper; import org.apache.streampark.console.core.service.YarnQueueService; import org.apache.streampark.console.core.service.application.ApplicationInfoService; import org.apache.streampark.console.core.watcher.FlinkClusterWatcher; @@ -82,8 +81,6 @@ public class FlinkClusterServiceImpl extends ServiceImpl listAppIdPipelineStatusMap(List appIds) { if (CollectionUtils.isEmpty(appIds)) { - return Collections.emptyMap(); + return new HashMap<>(); } LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().in(AppBuildPipeline::getAppId, appIds); List appBuildPipelines = baseMapper.selectList(queryWrapper); if (CollectionUtils.isEmpty(appBuildPipelines)) { - return Collections.emptyMap(); + return new HashMap<>(); } return appBuildPipelines.stream() .collect(Collectors.toMap(AppBuildPipeline::getAppId, AppBuildPipeline::getPipelineStatus)); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java index 9604084f8b..f7f54514d7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java @@ -27,7 +27,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -105,7 +104,7 @@ public static YarnQueueLabelExpression of( public static Map getQueueLabelMap(String queueLabelExp) { if (StringUtils.isBlank(queueLabelExp)) { - return Collections.emptyMap(); + return new HashMap<>(); } YarnQueueLabelExpression yarnQueueLabelExpression = of(queueLabelExp); Map queueLabelMap = new HashMap<>(2); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java index 3228c2266f..9a5f050428 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.runner; -import org.apache.streampark.common.util.Utils; +import org.apache.streampark.common.util.SystemPropertyUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -27,6 +27,8 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; + @Order @Slf4j @Component @@ -37,7 +39,20 @@ public class StartedUpRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) { if (context.isActive()) { - Utils.printLogo("streampark-console start successful"); + String port = SystemPropertyUtils.get("server.port", "10000"); + System.out.println("\n"); + System.out.println(" _____ __ __ "); + System.out.println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ "); + System.out.println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/"); + System.out.println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< "); + System.out.println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| "); + System.out.println(" /_/ \n\n"); + System.out.println(" Version: 2.2.0 "); + System.out.println(" WebSite: https://streampark.apache.org "); + System.out.println(" GitHub : https://github.com/apache/incubator-streampark "); + System.out.println(" Info : streampark-console start successful "); + System.out.println(" Local : http://localhost:" + port); + System.out.println(" Time : " + LocalDateTime.now() + "\n\n"); } } } diff --git a/streampark.sh b/streampark.sh new file mode 100755 index 0000000000..c8ae6379d2 --- /dev/null +++ b/streampark.sh @@ -0,0 +1,282 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# shellcheck disable=SC2317 + +# Bugzilla 37848: When no TTY is available, don't output to console +have_tty=0 +# shellcheck disable=SC2006 +if [[ "`tty`" != "not a tty" ]]; then + have_tty=1 +fi + +# Bugzilla 37848: When no TTY is available, don't output to console +have_tty=0 +# shellcheck disable=SC2006 +if [[ "`tty`" != "not a tty" ]]; then + have_tty=1 +fi + + # Only use colors if connected to a terminal +if [[ ${have_tty} -eq 1 ]]; then + RED=$(printf '\033[31m') + GREEN=$(printf '\033[32m') + BLUE=$(printf '\033[34m') + RESET=$(printf '\033[0m') +else + RED="" + GREEN="" + BLUE="" + RESET="" +fi + +echo_r () { + # Color red: Error, Failed + [[ $# -ne 1 ]] && return 1 + # shellcheck disable=SC2059 + printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$RED" "$RESET" +} + +echo_g () { + # Color green: Success + [[ $# -ne 1 ]] && return 1 + # shellcheck disable=SC2059 + printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$GREEN" "$RESET" +} + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "$(uname)" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME + else + JAVA_HOME="/Library/Java/Home"; export JAVA_HOME + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=$(java-config --jre-home) + fi +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$JAVA_HOME" ] && + JAVA_HOME=$(cygpath --unix "$JAVA_HOME") + [ -n "$CLASSPATH" ] && + CLASSPATH=$(cygpath --path --unix "$CLASSPATH") +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] && + JAVA_HOME="$(cd "$JAVA_HOME" || (echo_r "cannot cd into $JAVA_HOME."; exit 1); pwd)" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="$(which javac)" + if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=$(which readlink) + if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then + if $darwin ; then + javaHome="$(dirname "\"$javaExecutable\"")" + javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac" + else + javaExecutable="$(readlink -f "\"$javaExecutable\"")" + fi + javaHome="$(dirname "\"$javaExecutable\"")" + javaHome=$(expr "$javaHome" : '\(.*\)/bin') + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo_r "Error: JAVA_HOME is not defined correctly." >&2 + echo_r " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo_r "Warning: JAVA_HOME environment variable is not set." +fi + +_RUNJAVA="$JAVA_HOME/bin/java" + +# resolve links - $0 may be a softlink +PRG="$0" + +while [[ -h "$PRG" ]]; do + # shellcheck disable=SC2006 + ls=`ls -ld "$PRG"` + # shellcheck disable=SC2006 + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + # shellcheck disable=SC2006 + PRG=`dirname "$PRG"`/"$link" + fi +done + +# Get standard environment variables +# shellcheck disable=SC2006 +PRG_DIR=`dirname "$PRG"` +WORK_DIR=$(cd "$PRG_DIR" >/dev/null || exit; pwd) + +SP_VERSION="2.1.5" +SP_NAME="apache-streampark_2.12-${SP_VERSION}-incubating-bin" +SP_TAR="${SP_NAME}.tar.gz" +SP_URL="https://archive.apache.org/dist/incubator/streampark/${SP_VERSION}/${SP_TAR}" +SP_HOME="${WORK_DIR}"/"${SP_NAME}" +SP_PATH="${WORK_DIR}"/"${SP_TAR}" +SP_CONFIG="${SP_HOME}/conf/config.yaml" + +download() { + local url=$1 + local name=$2 + local path=$3 + if command -v wget > /dev/null; then + wget "$url" -O "$path" || rm -f "$path" + # shellcheck disable=SC2181 + if [[ $? -ne 0 ]]; then + echo_r "download $name failed, please try again." + exit 1 + fi + elif command -v curl > /dev/null; then + curl -o "$path" "$url" -f -L || rm -f "$path" + # shellcheck disable=SC2181 + if [[ $? -ne 0 ]]; then + echo_r "download $name failed, please try again." + exit 1 + fi + else + echo " + import java.io.InputStream; + import java.net.URL; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + import java.nio.file.StandardCopyOption; + + public class Downloader { + public static void main(String[] args) { + try { + URL url = new URL(args[0]); + Path path = Paths.get(args[1]).toAbsolutePath().normalize(); + try (InputStream inStream = url.openStream()) { + Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING); + } + } catch (Exception e) { + System.exit(1); + } + } + }" > "${WORK_DIR}"/Downloader.java + + "$JAVA_HOME/bin/javac" "${WORK_DIR}"/Downloader.java && rm -f "${WORK_DIR}"/Downloader.java + + "$JAVA_HOME/bin/java" -cp "${WORK_DIR}" Downloader "$url" "$path" && rm -f "${WORK_DIR}"/Downloader.class + + if [[ $? -ne 0 ]]; then + echo_r "download $name failed, please try again." + exit 1 + fi + fi +} + +BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils" + +# 1). download streampark. +echo_g "download streampark..." + +download "$SP_URL" "$SP_TAR" "$SP_PATH" +tar -xvf "${SP_TAR}" >/dev/null 2>&1 \ + && rm -r "${SP_TAR}" \ + && mkdir "${SP_HOME}"/flink \ + && mkdir "${SP_HOME}"/workspace + +# 1.1) workspace +$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "local: ||local: ${SP_HOME}/workspace #" + +# 1.2) port. +SP_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "10000") +$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "port: 10000||port: ${SP_PORT}" + +# 2). flink +# shellcheck disable=SC2009 +FLINK_PROCESS="$(ps -ef | grep "flink-dist-" | grep 'org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint')" +if [[ -n "${FLINK_PROCESS}" ]]; then + FLINK_PARAM=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --read_flink "$FLINK_PROCESS") + IFS=',' read -r -a ARRAY <<< "$FLINK_PARAM" + FLINK_HOME=${ARRAY[0]} + FLINK_NAME=${ARRAY[1]} + FLINK_PORT=${ARRAY[2]} +else + FLINK_NAME="flink-1.19.0" + FLINK_URL="https://archive.apache.org/dist/flink/${FLINK_NAME}/${FLINK_NAME}-bin-scala_2.12.tgz" + FLINK_TAR="${FLINK_NAME}-bin-scala_2.12.tgz" + FLINK_HOME="${WORK_DIR}"/${SP_NAME}/flink/${FLINK_NAME} + FLINK_PATH="${WORK_DIR}"/"${FLINK_TAR}" + FLINK_CONF="${FLINK_HOME}/conf/config.yaml" + + # 1) download flink + echo_g "download flink..." + download "$FLINK_URL" "$FLINK_TAR" "$FLINK_PATH" + tar -xvf "${FLINK_TAR}" >/dev/null 2>&1 \ + && rm -r "${FLINK_TAR}" \ + && mv "$FLINK_NAME" "${WORK_DIR}"/"${SP_NAME}"/flink + + # 2) start flink-cluster + FLINK_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "8081") + $_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "# port: 8081||port: ${FLINK_PORT}" + + bash +x "${FLINK_HOME}"/bin/start-cluster.sh +fi + +# 3) start streampark +bash +x "${SP_HOME}"/bin/startup.sh \ + --quickstart flink_home="$FLINK_HOME" \ + --quickstart flink_port="$FLINK_PORT" \ + --quickstart flink_name="quickstart-$FLINK_NAME"