Skip to content

Commit

Permalink
[Improve] proxy bug fixed. (#3982)
Browse files Browse the repository at this point in the history
* [Improve] proxy bug fixed.

* [Improve] proxy check permission improvement
  • Loading branch information
wolfboys authored Aug 21, 2024
1 parent 7398663 commit fba37b7
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ public class ProxyController {

@Autowired private ProxyService proxyService;

@GetMapping("flink-ui/{id}/**")
public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, @PathVariable("id") Long id)
@GetMapping("flink/{id}/**")
public ResponseEntity<?> proxyFlink(HttpServletRequest request, @PathVariable("id") Long id)
throws Exception {
return proxyService.proxyFlinkUI(request, id);
return proxyService.proxyFlink(request, id);
}

@GetMapping("job_manager/{id}/**")
public ResponseEntity<?> proxyJobManager(
HttpServletRequest request, @PathVariable("id") Long logId) throws Exception {
return proxyService.proxyJobManager(request, logId);
@GetMapping("history/{id}/**")
public ResponseEntity<?> proxyHistory(HttpServletRequest request, @PathVariable("id") Long logId)
throws Exception {
return proxyService.proxyHistory(request, logId);
}

@GetMapping("yarn/{appId}/**")
public ResponseEntity<?> proxyURL(HttpServletRequest request, @PathVariable("appId") String appId)
public ResponseEntity<?> proxyYarn(HttpServletRequest request, @PathVariable("appId") Long logId)
throws Exception {
return proxyService.proxyYarn(request, appId);
return proxyService.proxyYarn(request, logId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import javax.servlet.http.HttpServletRequest;

public interface ProxyService {
ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long id) throws Exception;
ResponseEntity<?> proxyFlink(HttpServletRequest request, Long id) throws Exception;

ResponseEntity<?> proxyYarn(HttpServletRequest request, String url) throws Exception;
ResponseEntity<?> proxyYarn(HttpServletRequest request, Long logId) throws Exception;

ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long logId) throws Exception;
ResponseEntity<?> proxyHistory(HttpServletRequest request, Long logId) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.PermissionDeniedException;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
Expand All @@ -28,12 +29,13 @@
import org.apache.streampark.console.core.service.ProxyService;
import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
import org.apache.streampark.console.system.authentication.JWTUtil;
import org.apache.streampark.console.system.entity.Member;
import org.apache.streampark.console.system.entity.User;
import org.apache.streampark.console.system.service.MemberService;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand Down Expand Up @@ -98,75 +100,97 @@ public void handleError(@Nonnull ClientHttpResponse response) {
}

@Override
public ResponseEntity<?> proxyFlinkUI(HttpServletRequest request, Long appId) throws Exception {
ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
if (appId == null) {
return builder.body("Invalid operation, appId is null");
}
public ResponseEntity<?> proxyFlink(HttpServletRequest request, Long appId) throws Exception {
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);

User currentUser = serviceHelper.getLoginUser();
Application app = applicationService.getById(appId);
if (app == null) {
return builder.body("Invalid operation, appId is invalid.");
}
if (!currentUser.getUserId().equals(app.getUserId())) {
Member member = memberService.findByUserId(app.getTeamId(), currentUser.getUserId());
if (member == null) {
return builder.body(
"Permission denied, this job not created by the current user, And the job cannot be found in the current user's team.");
}
}

checkProxyApp(app);
String url = null;
switch (app.getExecutionModeEnum()) {
case REMOTE:
FlinkCluster cluster = flinkClusterService.getById(app.getFlinkClusterId());
url = cluster.getAddress();
break;
case YARN_PER_JOB:
case YARN_APPLICATION:
case YARN_SESSION:
String yarnURL = YarnUtils.getRMWebAppProxyURL();
url = yarnURL + "/proxy/" + app.getClusterId();
url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
url += getRequestURL(request).replace("/proxy/flink/" + appId, "");
return proxyYarnRequest(request, url);
case REMOTE:
FlinkCluster cluster = flinkClusterService.getById(app.getFlinkClusterId());
url = cluster.getAddress();
break;
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
String jobManagerUrl = app.getJobManagerUrl();
if (jobManagerUrl == null) {
builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
builder.body("The flink job manager url is not ready");
return builder.build();
}
url = flinkK8sWatcher.getRemoteRestUrl(k8sWatcherWrapper.toTrackId(app));
break;
}

if (url == null) {
return builder.body("The flink job manager url is not ready");
ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
builder.body("The flink job manager url is not ready");
return builder.build();
}

url += getRequestURL(request).replace("/proxy/flink-ui/" + appId, "");
url += getRequestURL(request).replace("/proxy/flink/" + appId, "");
return proxyRequest(request, url);
}

@Override
public ResponseEntity<?> proxyYarn(HttpServletRequest request, String appId) throws Exception {
public ResponseEntity<?> proxyYarn(HttpServletRequest request, Long logId) throws Exception {
ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);
ApplicationLog log = logService.getById(logId);
if (log == null) {
return builder.body("The application log not found.");
}
checkProxyAppLog(log);
String yarnId = log.getYarnAppId();
String yarnURL = YarnUtils.getRMWebAppProxyURL();
String url = yarnURL + "/proxy/" + appId + "/";
url += getRequestURL(request).replace("/proxy/yarn/" + appId, "");
String url = yarnURL + "/proxy/" + yarnId + "/";
url += getRequestURL(request).replace("/proxy/yarn/" + yarnId, "");
return proxyYarnRequest(request, url);
}

@Override
public ResponseEntity<?> proxyJobManager(HttpServletRequest request, Long logId)
throws Exception {
public ResponseEntity<?> proxyHistory(HttpServletRequest request, Long logId) throws Exception {
ResponseEntity.BodyBuilder builder = ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE);

ApplicationLog log = logService.getById(logId);
if (log == null) {
return builder.body("The application log not found.");
}
checkProxyAppLog(log);
String url = log.getJobManagerUrl();
url += getRequestURL(request).replace("/proxy/job_manager/" + logId, "");
if (StringUtils.isBlank(url)) {
return builder.body("The jobManager url is null.");
}
url += getRequestURL(request).replace("/proxy/history/" + logId, "");
return proxyRequest(request, url);
}

public void checkProxyApp(Application app) {
if (app == null) {
throw new PermissionDeniedException("Invalid operation, application is invalid.");
}
String token = serviceHelper.getAuthorization();
if (token != null) {
Long userId = JWTUtil.getUserId(token);
if (userId != null && !userId.equals(app.getUserId())) {
Member member = memberService.findByUserId(app.getTeamId(), userId);
if (member == null) {
throw new PermissionDeniedException(
"Permission denied, this job not created by the current user, And the job cannot be found in the current user's team.");
}
}
}
}

public void checkProxyAppLog(ApplicationLog log) {
if (log == null) {
throw new PermissionDeniedException("Invalid operation, The application log not found.");
}
Application app = applicationService.getById(log.getAppId());
checkProxyApp(app);
}

private HttpEntity<?> getRequestEntity(HttpServletRequest request, String url) throws Exception {
HttpHeaders headers = new HttpHeaders();
Enumeration<String> headerNames = request.getHeaderNames();
Expand All @@ -178,7 +202,6 @@ private HttpEntity<?> getRequestEntity(HttpServletRequest request, String url) t
// Ensure the Host header is set correctly.
URI uri = new URI(url);
headers.set("Host", uri.getHost());

byte[] body = null;
if (request.getInputStream().available() > 0) {
InputStream inputStream = request.getInputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@
});
}
async function handleYarnUrl(yarnAppId: string) {
window.open(baseUrl() + '/proxy/yarn/' + yarnAppId + '/');
async function handleYarnUrl(id: string) {
window.open(baseUrl() + '/proxy/yarn/' + id + '/');
}
async function handleViewJobManager(record: Recordable) {
window.open(baseUrl() + '/proxy/job_manager/' + record.id + '/');
async function handleViewHistory(record: Recordable) {
window.open(baseUrl() + '/proxy/history/' + record.id + '/');
}
function getSavePointAction(record: Recordable): ActionItem[] {
Expand Down Expand Up @@ -394,12 +394,12 @@
<Tag color="orange" v-if="record.optionName === OperationEnum.CANCEL"> Cancel </Tag>
</template>
<template v-if="column.dataIndex === 'yarnAppId'">
<a type="link" @click="handleYarnUrl(record.yarnAppId)" target="_blank">
<a type="link" @click="handleYarnUrl(record.id)" target="_blank">
{{ record.yarnAppId }}
</a>
</template>
<template v-if="column.dataIndex === 'jobManagerUrl'">
<a type="link" target="_blank" @click="handleViewJobManager(record)">
<a type="link" target="_blank" @click="handleViewHistory(record)">
{{ record.jobManagerUrl }}
</a>
</template>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export function descriptionFilter(option) {
}

export async function handleView(app: AppListRecord) {
window.open(baseUrl() + '/proxy/flink-ui/' + app.id + '/');
window.open(baseUrl() + '/proxy/flink/' + app.id + '/');
}

export function handleIsStart(app: Recordable, optionApps: Recordable) {
Expand Down

0 comments on commit fba37b7

Please sign in to comment.