From 1594be3223543a987a9ce0a95e145c7ceadec257 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Mon, 2 Dec 2024 22:53:03 +0800 Subject: [PATCH 1/3] Fix duplicated blockIds issue caused by duplicated reportShuffleResult --- .../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 56f02721da..7ab125a8f1 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; +import io.grpc.StatusRuntimeException; import io.netty.buffer.Unpooled; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -797,6 +798,11 @@ private ReportShuffleResultResponse doReportShuffleResult(ReportShuffleResultReq try { ReportShuffleResultResponse response = getBlockingStub().reportShuffleResult(rpcRequest); return response; + } catch (StatusRuntimeException e) { + if (e.getCause() instanceof InterruptedException) { + throw new RssException( + "Report shuffle result to host[" + host + "], port[" + port + "] cancelled", e); + } } catch (Exception e) { retryNum++; LOG.warn( From f38d200742c1dd852f446cd3faa9e648d2b3710b Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Fri, 3 Jan 2025 16:03:16 +0800 Subject: [PATCH 2/3] Use RetryUtils.retryWithCondition to refactor doReportShuffleResult method --- .../impl/grpc/ShuffleServerGrpcClient.java | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 7ab125a8f1..ee611bdb42 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -793,31 +793,23 @@ public RssReportShuffleResultResponse reportShuffleResult(RssReportShuffleResult } private ReportShuffleResultResponse doReportShuffleResult(ReportShuffleResultRequest rpcRequest) { - int retryNum = 0; - while (retryNum < maxRetryAttempts) { - try { - ReportShuffleResultResponse response = getBlockingStub().reportShuffleResult(rpcRequest); - return response; - } catch (StatusRuntimeException e) { - if (e.getCause() instanceof InterruptedException) { - throw new RssException( - "Report shuffle result to host[" + host + "], port[" + port + "] cancelled", e); - } - } catch (Exception e) { - retryNum++; - LOG.warn( - "Report shuffle result to host[" - + host - + "], port[" - + port - + "] failed, try again, retryNum[" - + retryNum - + "]", - e); - } + try { + return RetryUtils.retryWithCondition( + () -> getBlockingStub().reportShuffleResult(rpcRequest), + null, // No specific callback to execute + 0, // No delay between retries, retry immediately + maxRetryAttempts, // Maximum number of retry attempts + t -> { // Define retry condition directly in the method call + if (t instanceof StatusRuntimeException) { + return !(t.getCause() instanceof InterruptedException); + } + return t instanceof Exception; // Retry for all other Exceptions + }); + } catch (Throwable t) { + // Handle or rethrow the exception as appropriate + throw new RuntimeException( + "Failed to report shuffle result to host[" + host + "], port[" + port + "]", t); } - throw new RssException( - "Report shuffle result to host[" + host + "], port[" + port + "] failed"); } @Override From 28af51332f7ca6c1bb44a0b1c3a8f29062a276f2 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Fri, 3 Jan 2025 16:58:29 +0800 Subject: [PATCH 3/3] Fix spotbugs issue --- .../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index ee611bdb42..399c215479 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -807,7 +807,7 @@ private ReportShuffleResultResponse doReportShuffleResult(ReportShuffleResultReq }); } catch (Throwable t) { // Handle or rethrow the exception as appropriate - throw new RuntimeException( + throw new RssException( "Failed to report shuffle result to host[" + host + "], port[" + port + "]", t); } }