Skip to content

Commit

Permalink
[Feat] sync quick-install support from dev-2.1.4 (#3763)
Browse files Browse the repository at this point in the history
Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Jun 16, 2024
1 parent 56d8724 commit 1326b9f
Show file tree
Hide file tree
Showing 18 changed files with 623 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
import java.util.Scanner
import java.util.stream.Collectors

import scala.collection.convert.ImplicitConversions._
Expand Down Expand Up @@ -282,4 +283,21 @@ object FileUtils {
null
}

@throws[IOException]
def readString(file: File): String = {
require(file != null && file.isFile)
val reader = new FileReader(file)
val scanner = new Scanner(reader)
val buffer = new mutable.StringBuilder()
if (scanner.hasNextLine) {
buffer.append(scanner.nextLine())
}
while (scanner.hasNextLine) {
buffer.append("\r\n")
buffer.append(scanner.nextLine())
}
Utils.close(scanner, reader)
buffer.toString()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ object PropertiesUtils extends Logger {
})
.toMap
case text =>
val value = text match {
case null => ""
case other => other.toString
}
prefix match {
case "" => proper += k -> value
case other => proper += s"$other.$k" -> value
if (text != null) {
val value = text.toString.trim
prefix match {
case "" => proper += k -> value
case other => proper += s"$other.$k" -> value
}
}
proper.toMap
}
Expand Down Expand Up @@ -276,7 +275,7 @@ object PropertiesUtils extends Logger {

/** extract flink configuration from application.properties */
@Nonnull def extractDynamicProperties(properties: String): Map[String, String] = {
if (StringUtils.isBlank(properties)) Map.empty[String, String]
if (StringUtils.isEmpty(properties)) Map.empty[String, String]
else {
val map = mutable.Map[String, String]()
val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
Expand Down Expand Up @@ -308,28 +307,76 @@ object PropertiesUtils extends Logger {
@Nonnull def extractArguments(args: String): List[String] = {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(args)) {
val array = args.split("\\s+")
val iter = array.iterator
while (iter.hasNext) {
val v = iter.next()
val p = v.take(1)
p match {
case "'" | "\"" =>
var value = v
if (!v.endsWith(p)) {
while (!value.endsWith(p) && iter.hasNext) {
value += s" ${iter.next()}"
}
return extractArguments(args.split("\\s+"))
}
programArgs.toList
}

def extractArguments(array: Array[String]): List[String] = {
val programArgs = new ArrayBuffer[String]()
val iter = array.iterator
while (iter.hasNext) {
val v = iter.next()
val p = v.take(1)
p match {
case "'" | "\"" =>
var value = v
if (!v.endsWith(p)) {
while (!value.endsWith(p) && iter.hasNext) {
value += s" ${iter.next()}"
}
programArgs += value.substring(1, value.length - 1)
case _ => programArgs += v
}
}
programArgs += value.substring(1, value.length - 1)
case _ =>
val regexp = "(.*)='(.*)'$"
if (v.matches(regexp)) {
programArgs += v.replaceAll(regexp, "$1=$2")
} else {
val regexp = "(.*)=\"(.*)\"$"
if (v.matches(regexp)) {
programArgs += v.replaceAll(regexp, "$1=$2")
} else {
programArgs += v
}
}
}
}
programArgs.toList
}

def extractMultipleArguments(array: Array[String]): Map[String, Map[String, String]] = {
val iter = array.iterator
val map = mutable.Map[String, mutable.Map[String, String]]()
while (iter.hasNext) {
val v = iter.next()
v.take(2) match {
case "--" =>
val kv = iter.next()
val regexp = "(.*)=(.*)"
if (kv.matches(regexp)) {
val values = kv.split("=")
val k1 = values(0).trim
val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
val k = v.drop(2)
map.get(k) match {
case Some(m) => m += k1 -> v1
case _ => map += k -> mutable.Map(k1 -> v1)
}
}
case _ =>
}
}
map.map(x => x._1 -> x._2.toMap).toMap
}

@Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] =
new JavaMap[String, String](extractDynamicProperties(properties).asJava)

@Nonnull def extractMultipleArgumentsAsJava(
args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
val map =
extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String, String](c._2.asJava))
new JavaMap[String, JavaMap[String, String]](map.asJava)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ APP_BASE="$APP_HOME"
APP_CONF="$APP_BASE"/conf
APP_LIB="$APP_BASE"/lib
APP_LOG="$APP_BASE"/logs
APP_PID="$APP_BASE"/streampark.pid
APP_PID="$APP_BASE"/.pid
APP_OUT="$APP_LOG"/streampark.out
# shellcheck disable=SC2034
APP_TMPDIR="$APP_BASE"/temp
Expand Down Expand Up @@ -241,10 +241,16 @@ if [[ "$USE_NOHUP" = "true" ]]; then
NOHUP="nohup"
fi

BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
CONFIG="${APP_CONF}/config.yaml"
# shellcheck disable=SC2006
if [[ ! -f "$CONFIG" ]] ; then
echo_r "can not found config.yaml in \"conf\" directory, please check."
exit 1;
fi

BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"

SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG")
JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh

JVM_ARGS=""
Expand Down Expand Up @@ -276,21 +282,8 @@ print_logo() {
printf ' %s WebSite: https://streampark.apache.org%s\n' $BLUE $RESET
printf ' %s GitHub : http://github.com/apache/streampark%s\n\n' $BLUE $RESET
printf ' %s ──────── Apache StreamPark, Make stream processing easier ô~ô!%s\n\n' $PRIMARY $RESET
}

init_env() {
# shellcheck disable=SC2006
CONFIG="${APP_CONF}/application.yml"
if [[ -f "$CONFIG" ]] ; then
echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" file. The \"application.yml\" file is deprecated.
For compatibility, this application.yml will be used preferentially. The latest configuration file is \"config.yaml\". It is recommended to use \"config.yaml\".
Note: \"application.yml\" will be completely deprecated in version 2.2.0. """
else
CONFIG="${APP_CONF}/config.yaml"
if [[ ! -f "$CONFIG" ]] ; then
echo_r "can not found config.yaml in \"conf\" directory, please check."
exit 1;
fi
if [[ "$1"x == "start"x ]]; then
printf ' %s http://localhost:%s %s\n\n' $PRIMARY $SERVER_PORT $RESET
fi
}

Expand All @@ -313,19 +306,19 @@ get_pid() {
fi

# shellcheck disable=SC2006
local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG"`
if [[ x"${serverPort}" == x"" ]]; then
if [[ "${SERVER_PORT}"x == ""x ]]; then
echo_r "server.port is required, please check $CONFIG"
exit 1;
else
# shellcheck disable=SC2006
# shellcheck disable=SC2155
local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$serverPort"`
if [[ x"${used}" == x"used" ]]; then
local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$SERVER_PORT"`
if [[ "${used}"x == "used"x ]]; then
# shellcheck disable=SC2006
local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
# shellcheck disable=SC2236
if [[ ! -z $PID ]]; then
echo $PID
echo "$PID"
else
echo 0
fi
Expand Down Expand Up @@ -411,7 +404,7 @@ start() {
-Dapp.home="${APP_HOME}" \
-Dlogging.config="${APP_CONF}/logback-spring.xml" \
-Djava.io.tmpdir="$APP_TMPDIR" \
$APP_MAIN >> "$APP_OUT" 2>&1 "&"
$APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&"

local PID=$!
local IS_NUMBER="^[0-9]+$"
Expand Down Expand Up @@ -565,27 +558,31 @@ restart() {
}

main() {
print_logo
init_env
case "$1" in
"debug")
DEBUG_PORT=$2
debug
;;
"start")
start
shift
start "$@"
[[ $? -eq 0 ]] && print_logo "start"
;;
"start_docker")
print_logo
start_docker
;;
"stop")
print_logo
stop
;;
"status")
print_logo
status
;;
"restart")
restart
[[ $? -eq 0 ]] && print_logo "start"
;;
*)
echo_r "Unknown command: $1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,30 @@

package org.apache.streampark.console.base.util;

import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.PropertiesUtils;

import org.apache.commons.io.output.NullOutputStream;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.ServerSocket;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Map;

public class BashJavaUtils {

private static String localhost = "localhost";

public static void main(String[] args) throws IOException {
String action = args[0].toLowerCase();
String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
Expand All @@ -39,12 +54,77 @@ public static void main(String[] args) throws IOException {
System.out.println(value);
break;
case "--check_port":
Integer port = Integer.parseInt(actionArgs[0]);
try {
new ServerSocket(port);
int port = Integer.parseInt(actionArgs[0]);
try (Socket ignored = new Socket(localhost, port)) {
System.out.println("used");
} catch (Exception e) {
System.out.println("free");
}
break;
case "--free_port":
int start = Integer.parseInt(actionArgs[0]);
for (port = start; port < 65535; port++) {
try (Socket ignored = new Socket(localhost, port)) {
} catch (Exception e) {
System.out.println(port);
break;
}
}
break;
case "--read_flink":
String input = actionArgs[0];
String[] inputs = input.split(":");
String flinkDist =
Arrays.stream(inputs).filter(c -> c.contains("flink-dist-")).findFirst().get();
File flinkHome = new File(flinkDist.replaceAll("/lib/.*", ""));
FlinkVersion flinkVersion = new FlinkVersion(flinkHome.getAbsolutePath());

PrintStream originalOut = System.out;
System.setOut(new PrintStream(new NullOutputStream()));

String version = flinkVersion.majorVersion();
float ver = Float.parseFloat(version);
File yaml =
new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" : "/conf/config.yaml");

Map<String, String> config = PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath());
String flinkPort = config.getOrDefault("rest.port", "8081");
System.setOut(originalOut);
System.out.println(
flinkHome
.getAbsolutePath()
.concat(",")
.concat(flinkHome.getName())
.concat(",")
.concat(flinkPort));
break;
case "--replace":
String filePath = actionArgs[0];
String[] text = actionArgs[1].split("\\|\\|");
String searchText = text[0];
String replaceText = text[1];
try {
File file = new File(filePath);
String content = FileUtils.readString(file);
content = content.replace(searchText, replaceText);
FileWriter writer = new FileWriter(filePath);
writer.write(content);
writer.flush();
writer.close();
System.exit(0);
} catch (IOException e) {
System.exit(1);
}
break;
case "--download":
try {
URL url = new URL(actionArgs[0]);
Path path = Paths.get(actionArgs[1]).toAbsolutePath().normalize();
try (InputStream inStream = url.openStream()) {
Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
}
} catch (Exception e) {
System.out.println("used");
System.exit(1);
}
break;
default:
Expand Down
Loading

0 comments on commit 1326b9f

Please sign in to comment.