Skip to content

Commit

Permalink
[Improve] job on yarn state bug fixed. (#3973)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys authored Aug 19, 2024
1 parent a2b7606 commit 37402fd
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, RMHAUtils}
import org.apache.http.client.config.RequestConfig

import java.io.IOException
import java.net.InetAddress
import java.net.{ConnectException, InetAddress}
import java.security.PrivilegedExceptionAction
import java.util
import java.util.{List => JavaList}
Expand Down Expand Up @@ -231,9 +231,9 @@ object YarnUtils extends Logger {
case Success(v) => v
case Failure(e) =>
if (hasYarnHttpKerberosAuth) {
throw new IOException(s"yarnUtils authRestRequest error, url: $u, detail: $e")
throw new ConnectException(s"yarnUtils authRestRequest error, url: $u, detail: $e")
} else {
throw new IOException(s"yarnUtils restRequest error, url: $u, detail: $e")
throw new ConnectException(s"yarnUtils restRequest error, url: $u, detail: $e")
}
}
case _ =>
Expand All @@ -243,7 +243,8 @@ object YarnUtils extends Logger {
Utils.retry[String](5)(request(s"${getRMWebAppURL(true)}/$url", timeout)) match {
case Success(v) => v
case Failure(e) =>
throw new IOException(s"yarnUtils restRequest retry 5 times all failed. detail: $e")
throw new ConnectException(
s"yarnUtils restRequest retry 5 times all failed. detail: $e")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ public interface ApplicationMapper extends BaseMapper<Application> {
boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);

boolean existsJobByClusterId(@Param("clusterId") Long clusterId);

void updateJobManagerUrl(@Param("id") Long id, @Param("url") String url);
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ List<Application> getByTeamIdAndExecutionModes(
List<ApplicationReport> getYARNApplication(String appName);

RestResponse buildApplication(Long appId, boolean forceBuild) throws Exception;

void updateJobManagerUrl(Long id, String url);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2013,6 +2013,11 @@ public RestResponse buildApplication(Long appId, boolean forceBuild) throws Exce
return RestResponse.success(actionResult);
}

@Override
public void updateJobManagerUrl(Long id, String url) {
baseMapper.updateJobManagerUrl(id, url);
}

private Tuple2<String, String> getNamespaceClusterId(Application application) {
String clusterId = null;
String k8sNamespace = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,12 @@ and the status is not obtained this time (flink rest server is closed),
}
} else {
try {
String trackingUrl = yarnAppInfo.getApp().getTrackingUrl();
if (trackingUrl != null && !trackingUrl.equals(application.getJobManagerUrl())) {
application.setJobManagerUrl(trackingUrl);
applicationService.updateJobManagerUrl(application.getId(), trackingUrl);
}

String state = yarnAppInfo.getApp().getFinalStatus();
FlinkAppState flinkAppState = FlinkAppState.of(state);
if (FlinkAppState.OTHER.equals(flinkAppState)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,66 +18,6 @@

<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.streampark.console.core.mapper.ApplicationMapper">
<resultMap id="BaseResultMap" type="org.apache.streampark.console.core.entity.Application">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="team_id" jdbcType="BIGINT" property="teamId"/>
<result column="project_id" jdbcType="BIGINT" property="projectId"/>
<result column="module" jdbcType="VARCHAR" property="module"/>
<result column="args" jdbcType="LONGVARCHAR" property="args"/>
<result column="options" jdbcType="LONGVARCHAR" property="options"/>
<result column="dynamic_properties" jdbcType="LONGVARCHAR" property="dynamicProperties"/>
<result column="hot_params" jdbcType="VARCHAR" property="hotParams"/>
<result column="job_name" jdbcType="VARCHAR" property="jobName"/>
<result column="version_id" jdbcType="BIGINT" property="versionId"/>
<result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
<result column="flink_cluster_id" jdbcType="BIGINT" property="flinkClusterId"/>
<result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
<result column="k8s_namespace" jdbcType="VARCHAR" property="k8sNamespace"/>
<result column="app_type" jdbcType="INTEGER" property="appType"/>
<result column="job_type" jdbcType="INTEGER" property="jobType"/>
<result column="resource_from" jdbcType="INTEGER" property="resourceFrom"/>
<result column="execution_mode" jdbcType="INTEGER" property="executionMode"/>
<result column="tracking" jdbcType="INTEGER" property="tracking"/>
<result column="jar" jdbcType="VARCHAR" property="jar"/>
<result column="jar_check_sum" jdbcType="VARCHAR" property="jarCheckSum"/>
<result column="dependency" jdbcType="LONGVARCHAR" property="dependency"/>
<result column="main_class" jdbcType="VARCHAR" property="mainClass"/>
<result column="job_id" jdbcType="VARCHAR" property="jobId"/>
<result column="job_manager_url" jdbcType="VARCHAR" property="jobManagerUrl"/>
<result column="user_id" jdbcType="BIGINT" property="userId"/>
<result column="start_time" jdbcType="DATE" property="startTime"/>
<result column="end_time" jdbcType="DATE" property="endTime"/>
<result column="duration" jdbcType="BIGINT" property="duration"/>
<result column="state" jdbcType="INTEGER" property="state"/>
<result column="cp_max_failure_interval" jdbcType="INTEGER" property="cpMaxFailureInterval"/>
<result column="cp_failure_rate_interval" jdbcType="INTEGER" property="cpFailureRateInterval"/>
<result column="cp_failure_action" jdbcType="INTEGER" property="cpFailureAction"/>
<result column="restart_size" jdbcType="INTEGER" property="restartSize"/>
<result column="restart_count" jdbcType="INTEGER" property="restartCount"/>
<result column="release" jdbcType="INTEGER" property="release"/>
<result column="build" jdbcType="BOOLEAN" property="build"/>
<result column="resolve_order" jdbcType="INTEGER" property="resolveOrder"/>
<result column="total_tm" jdbcType="INTEGER" property="totalTM"/>
<result column="total_slot" jdbcType="INTEGER" property="totalSlot"/>
<result column="available_slot" jdbcType="INTEGER" property="availableSlot"/>
<result column="total_task" jdbcType="INTEGER" property="totalTask"/>
<result column="jm_memory" jdbcType="INTEGER" property="jmMemory"/>
<result column="tm_memory" jdbcType="INTEGER" property="tmMemory"/>
<result column="option_state" jdbcType="INTEGER" property="optionState"/>
<result column="alert_id" jdbcType="BIGINT" property="alertId"/>
<result column="description" jdbcType="VARCHAR" property="description"/>
<result column="create_time" jdbcType="DATE" property="createTime"/>
<result column="option_time" jdbcType="DATE" property="optionTime"/>
<result column="k8s_rest_exposed_type" jdbcType="INTEGER" property="k8sRestExposedType"/>
<result column="k8s_pod_template" jdbcType="LONGVARCHAR" property="k8sPodTemplate"/>
<result column="k8s_jm_pod_template" jdbcType="LONGVARCHAR" property="k8sJmPodTemplate"/>
<result column="k8s_tm_pod_template" jdbcType="LONGVARCHAR" property="k8sTmPodTemplate"/>
<result column="ingress_template" jdbcType="LONGVARCHAR" property="ingressTemplate"/>
<result column="k8s_hadoop_integration" jdbcType="TINYINT" property="k8sHadoopIntegration"/>
<result column="rest_url" jdbcType="VARCHAR" property="restUrl"/>
<result column="rest_port" jdbcType="INTEGER" property="restPort"/>
<result column="tags" jdbcType="VARCHAR" property="tags"/>
</resultMap>

<update id="resetOptionState">
update t_flink_app
Expand Down Expand Up @@ -294,6 +234,12 @@
where id=#{application.id}
</update>

<update id="updateJobManagerUrl" >
update t_flink_app
set job_manager_url = #{url}
where id = #{id}
</update>

<select id="getRecentK8sNamespace" resultType="java.lang.String" parameterType="java.lang.Integer">
select k8s_namespace
from (
Expand Down

0 comments on commit 37402fd

Please sign in to comment.