diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java index 4fdf48b8fc..ddc854f264 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java @@ -303,7 +303,7 @@ public Thread newThread(Runnable r) { scheduledExecutorService.scheduleAtFixedRate( () -> { try { - client.sendAppHeartbeat(appId, heartbeatTimeout); + client.sendAppHeartbeat(appId, "user", heartbeatTimeout); LOG.info("Finish send heartbeat to coordinator and servers"); } catch (Exception e) { LOG.warn("Fail to send heartbeat to coordinator and servers", e); diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java index 62f1f86f85..88045c1481 100644 --- a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java +++ b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java @@ -504,7 +504,7 @@ public SendShuffleDataResult sendShuffleData( } @Override - public void sendAppHeartbeat(String appId, long timeoutMs) {} + public void sendAppHeartbeat(String appId, String user, long timeoutMs) {} @Override public void registerApplicationInfo(String appId, long timeoutMs, String user) {} diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java index 9ca680cb3d..1fd42e97f2 100644 --- a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java +++ b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java @@ -492,7 +492,7 @@ public SendShuffleDataResult sendShuffleData( } @Override - public void sendAppHeartbeat(String appId, long timeoutMs) {} + public void sendAppHeartbeat(String appId, String user, long timeoutMs) {} @Override public void registerApplicationInfo(String appId, long timeoutMs, String user) {} diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 1e5bb49418..42a3371567 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -385,7 +385,7 @@ private void startHeartbeat() { heartBeatScheduledExecutorService.scheduleAtFixedRate( () -> { try { - shuffleWriteClient.sendAppHeartbeat(appId, heartbeatTimeout); + shuffleWriteClient.sendAppHeartbeat(appId, user, heartbeatTimeout); LOG.info("Finish send heartbeat to coordinator and servers"); } catch (Exception e) { LOG.warn("Fail to send heartbeat to coordinator and servers", e); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index bf42bf3610..150bdcf0d3 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -903,7 +903,7 @@ private synchronized void startHeartbeat() { () -> { try { String appId = id.get(); - shuffleWriteClient.sendAppHeartbeat(appId, heartbeatTimeout); + shuffleWriteClient.sendAppHeartbeat(appId, user, heartbeatTimeout); LOG.info("Finish send heartbeat to coordinator and servers"); } catch (Exception e) { LOG.warn("Fail to send heartbeat to coordinator and servers", e); diff --git a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java index c2c31aedb0..cb0ebba7cc 100644 --- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java +++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java @@ -203,7 +203,9 @@ public static void initAndStartRSSClient(final RssDAGAppMaster appMaster, Config appMaster.heartBeatExecutorService.scheduleAtFixedRate( () -> { try { - appMaster.getShuffleWriteClient().sendAppHeartbeat(strAppAttemptId, heartbeatTimeout); + appMaster + .getShuffleWriteClient() + .sendAppHeartbeat(strAppAttemptId, "user", heartbeatTimeout); if (LOG.isDebugEnabled()) { LOG.debug("Finish send heartbeat to coordinator and servers"); } diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java index 70c1a8aed8..9f96914955 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java @@ -575,7 +575,7 @@ public SendShuffleDataResult sendShuffleData( } @Override - public void sendAppHeartbeat(String appId, long timeoutMs) {} + public void sendAppHeartbeat(String appId, String user, long timeoutMs) {} @Override public void registerApplicationInfo(String appId, long timeoutMs, String user) {} diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java index db0c914848..6db9790aa5 100644 --- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java +++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java @@ -51,7 +51,7 @@ SendShuffleDataResult sendShuffleData( List shuffleBlockInfoList, Supplier needCancelRequest); - void sendAppHeartbeat(String appId, long timeoutMs); + void sendAppHeartbeat(String appId, String user, long timeoutMs); void registerApplicationInfo(String appId, long timeoutMs, String user); diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index b16b88168d..875b7d142b 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -915,8 +915,8 @@ public void registerApplicationInfo(String appId, long timeoutMs, String user) { } @Override - public void sendAppHeartbeat(String appId, long timeoutMs) { - RssAppHeartBeatRequest request = new RssAppHeartBeatRequest(appId, timeoutMs); + public void sendAppHeartbeat(String appId, String user, long timeoutMs) { + RssAppHeartBeatRequest request = new RssAppHeartBeatRequest(appId, user, timeoutMs); Set allShuffleServers = getAllShuffleServers(appId); ThreadUtils.executeTasks( diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java index 716c226923..42d736bb80 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java @@ -145,12 +145,12 @@ public void registerApplicationInfo(String appId, String user) { } } - public void refreshAppId(String appId) { - String user = appIdToUser.get(appId); + public void refreshAppId(String appId, String user) { // compatible with lower version clients - if (user == null) { - registerApplicationInfo(appId, ""); + if (appIdToUser.get(appId) == null) { + registerApplicationInfo(appId, user); } else { + user = appIdToUser.get(appId); Map appAndTime = currentUserAndApp.get(user); AppInfo appInfo = appAndTime.get(appId); long currentTimeMs = System.currentTimeMillis(); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java index ddca8a1e33..875febf84d 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java @@ -225,7 +225,8 @@ public void reportClientOperation( public void appHeartbeat( AppHeartBeatRequest request, StreamObserver responseObserver) { String appId = request.getAppId(); - coordinatorServer.getApplicationManager().refreshAppId(appId); + String user = request.getUser(); + coordinatorServer.getApplicationManager().refreshAppId(appId, user); if (LOG.isDebugEnabled()) { LOG.debug("Got heartbeat from application: {}", appId); } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java index a307119e43..bd3b69e175 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java @@ -118,7 +118,7 @@ public void clearWithoutRemoteStorageTest() throws Exception { // NPE shouldn't happen when clear the resource String testApp = "application_clearWithoutRemoteStorageTest"; applicationManager.registerApplicationInfo(testApp, "user"); - applicationManager.refreshAppId(testApp); + applicationManager.refreshAppId(testApp, "user"); // just set a value != 0, it should be reset to 0 if everything goes well CoordinatorMetrics.gaugeRunningAppNum.set(100.0); assertEquals(1, applicationManager.getAppIds().size()); diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java index cd25929194..304df757c1 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java @@ -93,7 +93,7 @@ public void selectStorageTest() throws Exception { applicationManager.incRemoteStorageCounter(remotePath1); String testApp1 = "application_test_" + 1; applicationManager.registerApplicationInfo(testApp1, "user"); - applicationManager.refreshAppId(testApp1); + applicationManager.refreshAppId(testApp1, "user"); // in this case, ensure that all the paths are read and written normally applicationManager.getRemoteStoragePathRankValue().get(remotePath1).getCostTime().set(0); applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getCostTime().set(0); @@ -116,7 +116,7 @@ public void selectStorageTest() throws Exception { // refresh app1, got remotePath2, then remove remotePath2, // it should be existed in counter until it expired applicationManager.registerApplicationInfo(testApp1, "user"); - applicationManager.refreshAppId(testApp1); + applicationManager.refreshAppId(testApp1, "user"); assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath()); remoteStoragePath = remotePath1; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); @@ -160,7 +160,7 @@ public void storageCounterMulThreadTest() throws Exception { for (int i = 0; i < 1000; i++) { String appId = testApp1 + i; applicationManager.registerApplicationInfo(appId, "user"); - applicationManager.refreshAppId(appId); + applicationManager.refreshAppId(appId, "user"); applicationManager.pickRemoteStorage(appId); } cdl.countDown(); @@ -172,7 +172,7 @@ public void storageCounterMulThreadTest() throws Exception { for (int i = 1000; i < 2000; i++) { String appId = testApp1 + i; applicationManager.registerApplicationInfo(appId, "user"); - applicationManager.refreshAppId(appId); + applicationManager.refreshAppId(appId, "user"); applicationManager.pickRemoteStorage(appId); } cdl.countDown(); @@ -184,7 +184,7 @@ public void storageCounterMulThreadTest() throws Exception { for (int i = 2000; i < 3000; i++) { String appId = testApp1 + i; applicationManager.registerApplicationInfo(appId, "user"); - applicationManager.refreshAppId(appId); + applicationManager.refreshAppId(appId, "user"); applicationManager.pickRemoteStorage(appId); } cdl.countDown(); diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java index 642eec22c9..d1565b49ee 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java @@ -107,7 +107,7 @@ public void selectStorageTest() throws Exception { applicationManager.incRemoteStorageCounter(remoteStorage1); String testApp1 = "application_test_" + 1; applicationManager.registerApplicationInfo(testApp1, "user"); - applicationManager.refreshAppId(testApp1); + applicationManager.refreshAppId(testApp1, "user"); selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile, System.currentTimeMillis()); // Ensure that the `System.currentTimeMillis()` corresponding to remoteStorage1 is greater than // that of @@ -138,7 +138,7 @@ public void selectStorageTest() throws Exception { // refresh app1, got remotePath2, then remove remotePath2, // it should be existed in counter until it expired applicationManager.registerApplicationInfo(testApp1, "user"); - applicationManager.refreshAppId(testApp1); + applicationManager.refreshAppId(testApp1, "user"); assertEquals(remoteStorage2, applicationManager.pickRemoteStorage(testApp1).getPath()); remoteStoragePath = remoteStorage1; applicationManager.refreshRemoteStorage(remoteStoragePath, ""); @@ -185,7 +185,7 @@ public void selectStorageMulThreadTest() throws Exception { for (int i = 0; i < 1000; i++) { String appId = testApp1 + i; applicationManager.registerApplicationInfo(appId, "user"); - applicationManager.refreshAppId(appId); + applicationManager.refreshAppId(appId, "user"); applicationManager.pickRemoteStorage(appId); } cdl.countDown(); @@ -197,7 +197,7 @@ public void selectStorageMulThreadTest() throws Exception { for (int i = 1000; i < 2000; i++) { String appId = testApp1 + i; applicationManager.registerApplicationInfo(appId, "user"); - applicationManager.refreshAppId(appId); + applicationManager.refreshAppId(appId, "user"); applicationManager.pickRemoteStorage(appId); } cdl.countDown(); @@ -209,7 +209,7 @@ public void selectStorageMulThreadTest() throws Exception { for (int i = 2000; i < 3000; i++) { String appId = testApp1 + i; applicationManager.registerApplicationInfo(appId, "user"); - applicationManager.refreshAppId(appId); + applicationManager.refreshAppId(appId, "user"); applicationManager.pickRemoteStorage(appId); } cdl.countDown(); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java index 8a6e0cecf0..af886e6e85 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.protobuf.UnsafeByteOperations; +import org.apache.commons.lang3.StringUtils; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -188,9 +189,9 @@ public void clearResourceTest() throws Exception { ShuffleDataDistributionType.NORMAL, -1); shuffleWriteClient.registerApplicationInfo("application_clearResourceTest1", 500L, "user"); - shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", 500L); + shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", "user", 500L); shuffleWriteClient.registerApplicationInfo("application_clearResourceTest2", 500L, "user"); - shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest2", 500L); + shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest2", "user", 500L); RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest( @@ -210,7 +211,7 @@ public void clearResourceTest() throws Exception { () -> { int i = 0; while (i < 20) { - shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", 500L); + shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", "user", 500L); i++; try { Thread.sleep(1000); @@ -783,7 +784,8 @@ public void rpcMetricsTest() throws Exception { .getCounterMap() .get(ShuffleServerGrpcMetrics.APP_HEARTBEAT_METHOD) .get(); - grpcShuffleServerClient.sendHeartBeat(new RssAppHeartBeatRequest(appId, 10000)); + grpcShuffleServerClient.sendHeartBeat( + new RssAppHeartBeatRequest(appId, StringUtils.EMPTY, 10000)); newValue = grpcShuffleServers .get(0) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java index 484bb43d77..cb5163d710 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java @@ -236,7 +236,10 @@ public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request) { @Override public RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest request) { RssProtos.AppHeartBeatRequest rpcRequest = - RssProtos.AppHeartBeatRequest.newBuilder().setAppId(request.getAppId()).build(); + RssProtos.AppHeartBeatRequest.newBuilder() + .setAppId(request.getAppId()) + .setUser(request.getUser()) + .build(); RssProtos.AppHeartBeatResponse rpcResponse = blockingStub .withDeadlineAfter(request.getTimeoutMs(), TimeUnit.MILLISECONDS) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 14dbf2f60f..c0d27c048c 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -249,8 +249,9 @@ private ShuffleCommitResponse doSendCommit(String appId, int shuffleId) { throw new RssException("Send commit to host[" + host + "], port[" + port + "] failed"); } - private AppHeartBeatResponse doSendHeartBeat(String appId, long timeout) { - AppHeartBeatRequest request = AppHeartBeatRequest.newBuilder().setAppId(appId).build(); + private AppHeartBeatResponse doSendHeartBeat(String appId, String user, long timeout) { + AppHeartBeatRequest request = + AppHeartBeatRequest.newBuilder().setAppId(appId).setUser(user).build(); return blockingStub.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS).appHeartbeat(request); } @@ -653,7 +654,7 @@ public RssSendCommitResponse sendCommit(RssSendCommitRequest request) { @Override public RssAppHeartBeatResponse sendHeartBeat(RssAppHeartBeatRequest request) { AppHeartBeatResponse appHeartBeatResponse = - doSendHeartBeat(request.getAppId(), request.getTimeoutMs()); + doSendHeartBeat(request.getAppId(), request.getUser(), request.getTimeoutMs()); if (appHeartBeatResponse.getStatus() != RssProtos.StatusCode.SUCCESS) { String msg = "Can't send heartbeat to " diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java index 6a7c3e901a..1c60559550 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAppHeartBeatRequest.java @@ -20,10 +20,12 @@ public class RssAppHeartBeatRequest { private final String appId; + private final String user; private final long timeoutMs; - public RssAppHeartBeatRequest(String appId, long timeoutMs) { + public RssAppHeartBeatRequest(String appId, String user, long timeoutMs) { this.appId = appId; + this.user = user; this.timeoutMs = timeoutMs; } @@ -31,6 +33,10 @@ public String getAppId() { return appId; } + public String getUser() { + return user; + } + public long getTimeoutMs() { return timeoutMs; } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 97928bf201..383b26bdb5 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -378,6 +378,7 @@ service CoordinatorServer { message AppHeartBeatRequest { string appId = 1; + string user = 2; } message AppHeartBeatResponse {