From 73c1d7d21ac11fa8ed20bdb8b28b86a480d9e1cf Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Wed, 3 Apr 2024 08:59:41 +0000 Subject: [PATCH] Add retries for remote artifact and remote application calls --- .../RemoteArtifactRepositoryReader.java | 31 ++++++++++-- .../RemoteApplicationDetailFetcher.java | 29 +++++++++-- .../ArtifactHttpHandlerInternalTest.java | 49 +++++++++++++++++-- .../ApplicationDetailFetcherTest.java | 26 ++++++++++ 4 files changed, 124 insertions(+), 11 deletions(-) diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/artifact/RemoteArtifactRepositoryReader.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/artifact/RemoteArtifactRepositoryReader.java index 5b8a5aa9d2d2..46042b857cfc 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/artifact/RemoteArtifactRepositoryReader.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/artifact/RemoteArtifactRepositoryReader.java @@ -16,6 +16,7 @@ package io.cdap.cdap.internal.app.runtime.artifact; +import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; @@ -24,6 +25,7 @@ import io.cdap.cdap.api.artifact.ArtifactRange; import io.cdap.cdap.api.artifact.ArtifactScope; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.retry.RetryableException; import io.cdap.cdap.common.ArtifactNotFoundException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.api.service.ServiceUnavailableException; @@ -32,6 +34,9 @@ import io.cdap.cdap.common.id.Id; import io.cdap.cdap.common.internal.remote.RemoteClient; import io.cdap.cdap.common.internal.remote.RemoteClientFactory; +import io.cdap.cdap.common.service.Retries; +import io.cdap.cdap.common.service.RetryStrategies; +import io.cdap.cdap.common.service.RetryStrategy; import io.cdap.cdap.gateway.handlers.AppLifecycleHttpHandler; import io.cdap.cdap.gateway.handlers.ArtifactHttpHandlerInternal; import io.cdap.cdap.internal.io.SchemaTypeAdapter; @@ -49,6 +54,8 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.twill.filesystem.Location; import org.apache.twill.filesystem.LocationFactory; @@ -65,6 +72,11 @@ public class RemoteArtifactRepositoryReader implements ArtifactRepositoryReader }.getType(); private static final Type ARTIFACT_DETAIL_LIST_TYPE = new TypeToken>() { }.getType(); + private static final Predicate RETRYABLE_PREDICATE = + throwable -> (throwable instanceof RetryableException); + private static final int RETRY_BASE_DELAY_MILLIS = 100; + private static final int RETRY_MAX_DELAY_MILLIS = 5000; + private static final int RETRY_TIMEOUT_SECS = 300; private final RemoteClient remoteClient; private final LocationFactory locationFactory; @@ -98,7 +110,7 @@ public ArtifactDetail getArtifact(Id.Artifact artifactId) throws Exception { artifactId.getNamespace().getId(), artifactId.getName(), artifactId.getVersion()); - HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url); + HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url); httpResponse = execute(requestBuilder.build()); ArtifactDetail detail = GSON.fromJson(httpResponse.getResponseBodyAsString(), ARTIFACT_DETAIL_TYPE); @@ -133,7 +145,7 @@ public InputStream newInputStream(Id.Artifact artifactId) throws IOException, No artifactId.getName(), artifactId.getVersion(), scope); - HttpURLConnection urlConn = remoteClient.openConnection(HttpMethod.GET, url); + HttpURLConnection urlConn = getRemoteClient().openConnection(HttpMethod.GET, url); throwIfError(artifactId, urlConn); // Use FilterInputStream and override close to ensure the connection is closed once the input stream is closed @@ -161,7 +173,7 @@ public List getArtifactDetails(ArtifactRange range, int limit, range.getUpper().toString(), limit, order.name()); - HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url); + HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url); HttpResponse httpResponse = execute(requestBuilder.build()); List details = GSON.fromJson(httpResponse.getResponseBodyAsString(), ARTIFACT_DETAIL_LIST_TYPE); @@ -187,7 +199,13 @@ protected Location getArtifactLocation(ArtifactDescriptor descriptor) private HttpResponse execute(HttpRequest request) throws IOException, NotFoundException, UnauthorizedException { - HttpResponse httpResponse = remoteClient.execute(request); + + RetryStrategy baseRetryStrategy = RetryStrategies.exponentialDelay( + RETRY_BASE_DELAY_MILLIS, RETRY_MAX_DELAY_MILLIS, TimeUnit.MILLISECONDS); + HttpResponse httpResponse = + Retries.callWithRetries(() -> getRemoteClient().execute(request), + RetryStrategies.timeLimit(RETRY_TIMEOUT_SECS, TimeUnit.SECONDS, baseRetryStrategy), + RETRYABLE_PREDICATE); if (httpResponse.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new NotFoundException(httpResponse.getResponseBodyAsString()); } @@ -226,4 +244,9 @@ private void throwIfError(Id.Artifact artifactId, errorMsg)); } } + + @VisibleForTesting + public RemoteClient getRemoteClient() { + return remoteClient; + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/RemoteApplicationDetailFetcher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/RemoteApplicationDetailFetcher.java index ffaca22b62aa..32fd39272443 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/RemoteApplicationDetailFetcher.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/RemoteApplicationDetailFetcher.java @@ -16,17 +16,23 @@ package io.cdap.cdap.metadata; +import com.google.common.annotations.VisibleForTesting; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; import com.google.inject.Inject; +import io.cdap.cdap.api.retry.Idempotency; +import io.cdap.cdap.api.retry.RetryableException; import io.cdap.cdap.common.NamespaceNotFoundException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.http.DefaultHttpRequestConfig; import io.cdap.cdap.common.internal.remote.RemoteClient; import io.cdap.cdap.common.internal.remote.RemoteClientFactory; +import io.cdap.cdap.common.service.Retries; +import io.cdap.cdap.common.service.RetryStrategies; +import io.cdap.cdap.common.service.RetryStrategy; import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter; import io.cdap.cdap.proto.ApplicationDetail; import io.cdap.cdap.proto.id.ApplicationReference; @@ -40,7 +46,9 @@ import java.lang.reflect.Type; import java.net.HttpURLConnection; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Predicate; /** * Fetch application detail via internal REST API calls. @@ -57,6 +65,11 @@ public class RemoteApplicationDetailFetcher implements ApplicationDetailFetcher private static final String APPLICATIONS_KEY = "applications"; private static final String NEXT_PAGE_TOKEN_KEY = "nextPageToken"; + private static final Predicate RETRYABLE_PREDICATE = + throwable -> (throwable instanceof RetryableException); + private static final int RETRY_BASE_DELAY_MILLIS = 100; + private static final int RETRY_MAX_DELAY_MILLIS = 5000; + private static final int RETRY_TIMEOUT_SECS = 300; private final RemoteClient remoteClient; /** @@ -79,7 +92,7 @@ public ApplicationDetail get(ApplicationReference appRef) throws IOException, NotFoundException, UnauthorizedException { String url = String.format("namespaces/%s/app/%s", appRef.getNamespace(), appRef.getApplication()); - HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url); + HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url); HttpResponse httpResponse; httpResponse = execute(requestBuilder.build()); return GSON.fromJson(httpResponse.getResponseBodyAsString(), ApplicationDetail.class); @@ -95,7 +108,7 @@ public void scan(String namespace, Consumer consumer, Integer String token; do { - HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url); + HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url); HttpResponse httpResponse; try { httpResponse = execute(requestBuilder.build()); @@ -126,7 +139,12 @@ public void scan(String namespace, Consumer consumer, Integer private HttpResponse execute(HttpRequest request) throws IOException, NotFoundException, UnauthorizedException { - HttpResponse httpResponse = remoteClient.execute(request); + RetryStrategy baseRetryStrategy = RetryStrategies.exponentialDelay( + RETRY_BASE_DELAY_MILLIS, RETRY_MAX_DELAY_MILLIS, TimeUnit.MILLISECONDS); + HttpResponse httpResponse = + Retries.callWithRetries(() -> getRemoteClient().execute(request), + RetryStrategies.timeLimit(RETRY_TIMEOUT_SECS, TimeUnit.SECONDS, baseRetryStrategy), + RETRYABLE_PREDICATE); if (httpResponse.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new NotFoundException(httpResponse.getResponseBodyAsString()); } @@ -135,4 +153,9 @@ private HttpResponse execute(HttpRequest request) } return httpResponse; } + + @VisibleForTesting + public RemoteClient getRemoteClient() { + return remoteClient; + } } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ArtifactHttpHandlerInternalTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ArtifactHttpHandlerInternalTest.java index 575d06881a4d..1df459104816 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ArtifactHttpHandlerInternalTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/ArtifactHttpHandlerInternalTest.java @@ -20,17 +20,22 @@ import io.cdap.cdap.api.artifact.ArtifactRange; import io.cdap.cdap.api.artifact.ArtifactScope; import io.cdap.cdap.api.artifact.ArtifactVersion; +import io.cdap.cdap.api.retry.RetryableException; import io.cdap.cdap.common.id.Id; +import io.cdap.cdap.common.internal.remote.RemoteClient; import io.cdap.cdap.internal.app.runtime.artifact.ArtifactDetail; import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepositoryReader; import io.cdap.cdap.internal.app.runtime.artifact.RemoteArtifactRepositoryReader; import io.cdap.cdap.proto.artifact.ArtifactSortOrder; import io.cdap.cdap.proto.id.ArtifactId; import io.cdap.cdap.proto.id.NamespaceId; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; public class ArtifactHttpHandlerInternalTest extends ArtifactHttpHandlerTestBase { @Test @@ -106,15 +111,51 @@ public void testRemoteArtifactRepositoryReaderGetArtifact() throws Exception { // Add a system artifact String systemArtfiactName = "sysApp"; String systemArtifactVeresion = "1.0.0"; - ArtifactId systemArtifactId = NamespaceId.SYSTEM.artifact(systemArtfiactName, systemArtifactVeresion); + ArtifactId systemArtifactId = + NamespaceId.SYSTEM.artifact(systemArtfiactName, systemArtifactVeresion); addAppAsSystemArtifacts(systemArtifactId); // Fetch ArtifactDetail using RemoteArtifactRepositoryReader and verify - ArtifactRepositoryReader remoteReader = getInjector().getInstance(RemoteArtifactRepositoryReader.class); + ArtifactRepositoryReader remoteReader = + getInjector().getInstance(RemoteArtifactRepositoryReader.class); ArtifactDetail detail = remoteReader.getArtifact(Id.Artifact.fromEntityId(systemArtifactId)); Assert.assertNotNull(detail); - ArtifactDetail expectedDetail = getArtifactDetailFromRepository(Id.Artifact.fromEntityId(systemArtifactId)); - Assert.assertTrue(detail.equals(expectedDetail)); + ArtifactDetail expectedDetail = + getArtifactDetailFromRepository(Id.Artifact.fromEntityId(systemArtifactId)); + Assert.assertEquals(detail, expectedDetail); + + wipeData(); + } + + @Test + public void testRemoteArtifactRepositoryReaderGetArtifactWithRetries() throws Exception { + // Add a system artifact + String systemArtfiactName = "sysApp"; + String systemArtifactVeresion = "1.0.0"; + ArtifactId systemArtifactId = + NamespaceId.SYSTEM.artifact(systemArtfiactName, systemArtifactVeresion); + addAppAsSystemArtifacts(systemArtifactId); + // Fetch ArtifactDetail using RemoteArtifactRepositoryReader and verify + RemoteArtifactRepositoryReader remoteReader = + getInjector().getInstance(RemoteArtifactRepositoryReader.class); + RemoteClient remoteClient = remoteReader.getRemoteClient(); + + RemoteClient mockRemoteClient = Mockito.spy(remoteClient); + RemoteArtifactRepositoryReader mockRemoteReader = Mockito.spy(remoteReader); + Mockito.doReturn(mockRemoteClient).when(mockRemoteReader).getRemoteClient(); + + Mockito + .doThrow(new RetryableException(new SocketTimeoutException())) + .doAnswer(InvocationOnMock::callRealMethod) + .when(mockRemoteClient).execute(Mockito.any()); + + ArtifactDetail detail = + mockRemoteReader.getArtifact(Id.Artifact.fromEntityId(systemArtifactId)); + Assert.assertNotNull(detail); + ArtifactDetail expectedDetail = + getArtifactDetailFromRepository(Id.Artifact.fromEntityId(systemArtifactId)); + Assert.assertEquals(detail, expectedDetail); + Mockito.verify(mockRemoteClient, Mockito.times(2)).execute(Mockito.any()); wipeData(); } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/metadata/ApplicationDetailFetcherTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/metadata/ApplicationDetailFetcherTest.java index 4fc17106b9ee..face5aaa15cc 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/metadata/ApplicationDetailFetcherTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/metadata/ApplicationDetailFetcherTest.java @@ -19,13 +19,16 @@ import com.google.common.collect.ImmutableList; import io.cdap.cdap.AllProgramsApp; import io.cdap.cdap.AppWithSchedule; +import io.cdap.cdap.api.retry.RetryableException; import io.cdap.cdap.common.NamespaceNotFoundException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.internal.remote.RemoteClient; import io.cdap.cdap.gateway.handlers.AppLifecycleHttpHandlerInternal; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; import io.cdap.cdap.proto.ApplicationDetail; import io.cdap.cdap.proto.id.ApplicationReference; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -35,6 +38,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; /** * Tests for {@link RemoteApplicationDetailFetcher} {@link LocalApplicationDetailFetcher} and @@ -113,9 +118,30 @@ public void testGetApplication() throws Exception { // Deploy the application deploy(AllProgramsApp.class, 200, Constants.Gateway.API_VERSION_3_TOKEN, namespace); + // Get and validate the application + RemoteClient mockRemoteClient = null; + if (fetcherType == ApplicationDetailFetcherType.REMOTE) { + // test remote application detail fetcher retries + RemoteClient remoteClient = ((RemoteApplicationDetailFetcher) fetcher).getRemoteClient(); + + mockRemoteClient = Mockito.spy(remoteClient); + fetcher = Mockito.spy((RemoteApplicationDetailFetcher) fetcher); + Mockito + .doReturn(mockRemoteClient) + .when((RemoteApplicationDetailFetcher) fetcher).getRemoteClient(); + + Mockito + .doThrow(new RetryableException(new SocketTimeoutException())) + .doAnswer(InvocationOnMock::callRealMethod) + .when(mockRemoteClient).execute(Mockito.any()); + } + ApplicationDetail appDetail = fetcher.get(new ApplicationReference(namespace, appName)); assertAllProgramAppDetail(appDetail); + if (mockRemoteClient != null) { + Mockito.verify(mockRemoteClient, Mockito.times(2)).execute(Mockito.any()); + } // Delete the application Assert.assertEquals(