Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Improvements related to Spark & Flink class-names #4097

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions qodana.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#-------------------------------------------------------------------------------#
# Qodana analysis is configured by qodana.yaml file #
# https://www.jetbrains.com/help/qodana/qodana-yaml.html #
#-------------------------------------------------------------------------------#
version: "1.0"

#Specify inspection profile for code analysis
profile:
name: qodana.starter

#Enable inspections
#include:
# - name: <SomeEnabledInspectionId>

#Disable inspections
#exclude:
# - name: <SomeDisabledInspectionId>
# paths:
# - <path/where/not/run/inspection>

projectJDK: 8 #(Applied in CI/CD pipeline)

#Execute shell command before Qodana execution (Applied in CI/CD pipeline)
#bootstrap: sh ./prepare-qodana.sh

#Install IDE plugins before Qodana execution (Applied in CI/CD pipeline)
#plugins:
# - id: <plugin.id> #(plugin id can be found at https://plugins.jetbrains.com)

#Specify Qodana linter for analysis (Applied in CI/CD pipeline)
linter: jetbrains/qodana-jvm:latest
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;

/** Flink execution mode enum. */
public enum FlinkExecutionMode {
public enum FlinkDeployMode {

/** Unknown Mode */
UNKNOWN(-1, "Unknown"),
Expand Down Expand Up @@ -54,41 +54,41 @@ public enum FlinkExecutionMode {

private final String name;

FlinkExecutionMode(@Nonnull Integer mode, @Nonnull String name) {
FlinkDeployMode(@Nonnull Integer mode, @Nonnull String name) {
this.mode = mode;
this.name = name;
}

/**
* Try to resolve the mode value into {@link FlinkExecutionMode}.
* Try to resolve the mode value into {@link FlinkDeployMode}.
*
* @param value The mode value of potential flink execution mode.
* @return The parsed flink execution mode enum.
*/
@Nonnull
public static FlinkExecutionMode of(@Nullable Integer value) {
for (FlinkExecutionMode mode : values()) {
public static FlinkDeployMode of(@Nullable Integer value) {
for (FlinkDeployMode mode : values()) {
if (mode.mode.equals(value)) {
return mode;
}
}
return FlinkExecutionMode.UNKNOWN;
return FlinkDeployMode.UNKNOWN;
}

/**
* Try to resolve the mode name into {@link FlinkExecutionMode}.
* Try to resolve the mode name into {@link FlinkDeployMode}.
*
* @param name The mode name of potential flink execution mode.
* @return The parsed flink execution mode enum.
*/
@Nonnull
public static FlinkExecutionMode of(@Nullable String name) {
for (FlinkExecutionMode mode : values()) {
public static FlinkDeployMode of(@Nullable String name) {
for (FlinkDeployMode mode : values()) {
if (mode.name.equals(name)) {
return mode;
}
}
return FlinkExecutionMode.UNKNOWN;
return FlinkDeployMode.UNKNOWN;
}

public int getMode() {
Expand All @@ -106,7 +106,7 @@ public String getName() {
* @param mode The given mode.
* @return The judged result.
*/
public static boolean isYarnMode(@Nullable FlinkExecutionMode mode) {
public static boolean isYarnMode(@Nullable FlinkDeployMode mode) {
return YARN_PER_JOB == mode || YARN_APPLICATION == mode || YARN_SESSION == mode;
}

Expand All @@ -117,7 +117,7 @@ public static boolean isYarnMode(@Nullable FlinkExecutionMode mode) {
* @return The judged result. TODO: We'll inline this method back to the corresponding caller
* lines after dropping the yarn perjob mode.
*/
public static boolean isYarnPerJobOrAppMode(@Nullable FlinkExecutionMode mode) {
public static boolean isYarnPerJobOrAppMode(@Nullable FlinkDeployMode mode) {
return YARN_PER_JOB == mode || YARN_APPLICATION == mode;
}

Expand All @@ -127,7 +127,7 @@ public static boolean isYarnPerJobOrAppMode(@Nullable FlinkExecutionMode mode) {
* @param mode The given mode.
* @return The judged result.
*/
public static boolean isYarnSessionMode(@Nullable FlinkExecutionMode mode) {
public static boolean isYarnSessionMode(@Nullable FlinkDeployMode mode) {
return YARN_SESSION == mode;
}

Expand Down Expand Up @@ -157,7 +157,7 @@ public static boolean isKubernetesSessionMode(@Nullable Integer value) {
* @param mode The given flink execution mode.
* @return The judged result.
*/
public static boolean isKubernetesMode(@Nullable FlinkExecutionMode mode) {
public static boolean isKubernetesMode(@Nullable FlinkDeployMode mode) {
return KUBERNETES_NATIVE_SESSION == mode || KUBERNETES_NATIVE_APPLICATION == mode;
}

Expand Down Expand Up @@ -189,7 +189,7 @@ public static List<Integer> getKubernetesMode() {
}

/** Judge the given flink execution mode whether is session execution mode. */
public static boolean isSessionMode(@Nullable FlinkExecutionMode mode) {
public static boolean isSessionMode(@Nullable FlinkDeployMode mode) {
return KUBERNETES_NATIVE_SESSION == mode || YARN_SESSION == mode;
}

Expand All @@ -199,7 +199,7 @@ public static boolean isRemoteMode(@Nullable Integer value) {
}

/** Judge the given flink execution mode whether is remote execution mode. */
public static boolean isRemoteMode(@Nullable FlinkExecutionMode mode) {
public static boolean isRemoteMode(@Nullable FlinkDeployMode mode) {
return REMOTE == mode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import javax.annotation.Nullable;

/** The flink deployment mode enum. */
public enum FlinkDevelopmentMode {
public enum FlinkJobType {

/** Unknown type replace null */
UNKNOWN("Unknown", -1),
Expand All @@ -39,28 +39,28 @@ public enum FlinkDevelopmentMode {

private final Integer mode;

FlinkDevelopmentMode(@Nonnull String name, @Nonnull Integer mode) {
FlinkJobType(@Nonnull String name, @Nonnull Integer mode) {
this.name = name;
this.mode = mode;
}

/**
* Try to resolve the mode value into {@link FlinkDevelopmentMode}.
* Try to resolve the mode value into {@link FlinkJobType}.
*
* @param value The mode value of potential flink deployment mode.
* @return The parsed flink deployment mode.
*/
@Nonnull
public static FlinkDevelopmentMode of(@Nullable Integer value) {
for (FlinkDevelopmentMode flinkDevelopmentMode : values()) {
public static FlinkJobType of(@Nullable Integer value) {
for (FlinkJobType flinkDevelopmentMode : values()) {
if (flinkDevelopmentMode.mode.equals(value)) {
return flinkDevelopmentMode;
}
}
return FlinkDevelopmentMode.UNKNOWN;
return FlinkJobType.UNKNOWN;
}

/** Get the mode value of the current {@link FlinkDevelopmentMode} enum. */
/** Get the mode value of the current {@link FlinkJobType} enum. */
@Nonnull
public Integer getMode() {
return mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import javax.annotation.Nullable;

/** Spark execution mode enum. */
public enum SparkExecutionMode {
public enum SparkDeployMode {

/** Unknown Mode */
UNKNOWN(-1, "Unknown"),
Expand All @@ -42,41 +42,41 @@ public enum SparkExecutionMode {

private final String name;

SparkExecutionMode(@Nonnull Integer mode, @Nonnull String name) {
SparkDeployMode(@Nonnull Integer mode, @Nonnull String name) {
this.mode = mode;
this.name = name;
}

/**
* Try to resolve the mode value into {@link SparkExecutionMode}.
* Try to resolve the mode value into {@link SparkDeployMode}.
*
* @param value The mode value of potential spark execution mode.
* @return The parsed spark execution mode enum.
*/
@Nonnull
public static SparkExecutionMode of(@Nullable Integer value) {
for (SparkExecutionMode mode : values()) {
public static SparkDeployMode of(@Nullable Integer value) {
for (SparkDeployMode mode : values()) {
if (mode.mode.equals(value)) {
return mode;
}
}
return SparkExecutionMode.UNKNOWN;
return SparkDeployMode.UNKNOWN;
}

/**
* Try to resolve the mode name into {@link SparkExecutionMode}.
* Try to resolve the mode name into {@link SparkDeployMode}.
*
* @param name The mode name of potential spark execution mode.
* @return The parsed spark execution mode enum.
*/
@Nonnull
public static SparkExecutionMode of(@Nullable String name) {
for (SparkExecutionMode mode : values()) {
public static SparkDeployMode of(@Nullable String name) {
for (SparkDeployMode mode : values()) {
if (mode.name.equals(name)) {
return mode;
}
}
return SparkExecutionMode.UNKNOWN;
return SparkDeployMode.UNKNOWN;
}

public int getMode() {
Expand All @@ -94,7 +94,7 @@ public String getName() {
* @param mode The given mode.
* @return The judged result.
*/
public static boolean isYarnMode(@Nullable SparkExecutionMode mode) {
public static boolean isYarnMode(@Nullable SparkDeployMode mode) {
return YARN_CLUSTER == mode || YARN_CLIENT == mode;
}

Expand All @@ -114,7 +114,7 @@ public static boolean isRemoteMode(@Nullable Integer value) {
}

/** Judge the given spark execution mode whether is remote execution mode. */
public static boolean isRemoteMode(@Nullable SparkExecutionMode mode) {
public static boolean isRemoteMode(@Nullable SparkDeployMode mode) {
return REMOTE == mode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import javax.annotation.Nullable;

/** The spark deployment mode enum. */
public enum SparkDevelopmentMode {
public enum SparkJobType {

/** Unknown type replace null */
UNKNOWN("Unknown", -1),
Expand All @@ -39,28 +39,28 @@ public enum SparkDevelopmentMode {

private final Integer mode;

SparkDevelopmentMode(@Nonnull String name, @Nonnull Integer mode) {
SparkJobType(@Nonnull String name, @Nonnull Integer mode) {
this.name = name;
this.mode = mode;
}

/**
* Try to resolve the mode value into {@link SparkDevelopmentMode}.
* Try to resolve the mode value into {@link SparkJobType}.
*
* @param value The mode value of potential spark deployment mode.
* @return The parsed spark deployment mode.
*/
@Nonnull
public static SparkDevelopmentMode valueOf(@Nullable Integer value) {
for (SparkDevelopmentMode sparkDevelopmentMode : values()) {
public static SparkJobType valueOf(@Nullable Integer value) {
for (SparkJobType sparkDevelopmentMode : values()) {
if (sparkDevelopmentMode.mode.equals(value)) {
return sparkDevelopmentMode;
}
}
return SparkDevelopmentMode.UNKNOWN;
return SparkJobType.UNKNOWN;
}

/** Get the mode value of the current {@link SparkDevelopmentMode} enum. */
/** Get the mode value of the current {@link SparkJobType} enum. */
@Nonnull
public Integer getMode() {
return mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ object PropertiesUtils extends Logger {

private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")

private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN = Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")

// scalastyle:off
private[this] lazy val SPARK_ARGUMENT_REGEXP = "\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
// scalastyle:on

private[this] lazy val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"

private[this] lazy val MULTI_PROPERTY_PATTERN = Pattern.compile(MULTI_PROPERTY_REGEXP)
Expand Down Expand Up @@ -398,7 +392,7 @@ object PropertiesUtils extends Logger {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
val p = PROPERTY_PATTERN.matcher(x)
if (p.matches) {
map += p.group(1).trim -> p.group(2).trim
}
Expand All @@ -409,22 +403,4 @@ object PropertiesUtils extends Logger {
map.toMap
}
}

/** extract spark configuration from sparkApplication.appArgs */
@Nonnull def extractSparkArgumentsAsJava(arguments: String): JavaList[String] = {
val list = new JavaArrayList[String]()
if (StringUtils.isEmpty(arguments)) list
else {
arguments.split(SPARK_ARGUMENT_REGEXP) match {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
list.add(x)
}
})
case _ =>
}
list
}
}
}
12 changes: 0 additions & 12 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -397,18 +397,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-sql-gateway-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-sql-gateway-flink-v1</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
Expand Down
Loading
Loading