Skip to content

Commit

Permalink
Add retries for remote artifact and remote application calls
Browse files Browse the repository at this point in the history
  • Loading branch information
itsankit-google committed Apr 3, 2024
1 parent 7195e0b commit 73c1d7d
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.internal.app.runtime.artifact;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
Expand All @@ -24,6 +25,7 @@
import io.cdap.cdap.api.artifact.ArtifactRange;
import io.cdap.cdap.api.artifact.ArtifactScope;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.retry.RetryableException;
import io.cdap.cdap.common.ArtifactNotFoundException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.api.service.ServiceUnavailableException;

Check warning on line 31 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/artifact/RemoteArtifactRepositoryReader.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.CustomImportOrderCheck

Wrong lexicographical order for 'io.cdap.cdap.api.service.ServiceUnavailableException' import. Should be before 'io.cdap.cdap.common.NotFoundException'.
Expand All @@ -32,6 +34,9 @@
import io.cdap.cdap.common.id.Id;
import io.cdap.cdap.common.internal.remote.RemoteClient;
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
import io.cdap.cdap.gateway.handlers.AppLifecycleHttpHandler;
import io.cdap.cdap.gateway.handlers.ArtifactHttpHandlerInternal;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
Expand All @@ -49,6 +54,8 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;

Expand All @@ -65,6 +72,11 @@ public class RemoteArtifactRepositoryReader implements ArtifactRepositoryReader
}.getType();
private static final Type ARTIFACT_DETAIL_LIST_TYPE = new TypeToken<List<ArtifactDetail>>() {
}.getType();
private static final Predicate<Throwable> RETRYABLE_PREDICATE =
throwable -> (throwable instanceof RetryableException);
private static final int RETRY_BASE_DELAY_MILLIS = 100;
private static final int RETRY_MAX_DELAY_MILLIS = 5000;
private static final int RETRY_TIMEOUT_SECS = 300;

private final RemoteClient remoteClient;
private final LocationFactory locationFactory;
Expand Down Expand Up @@ -98,7 +110,7 @@ public ArtifactDetail getArtifact(Id.Artifact artifactId) throws Exception {
artifactId.getNamespace().getId(),
artifactId.getName(),
artifactId.getVersion());
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url);
HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url);
httpResponse = execute(requestBuilder.build());
ArtifactDetail detail = GSON.fromJson(httpResponse.getResponseBodyAsString(),
ARTIFACT_DETAIL_TYPE);
Expand Down Expand Up @@ -133,7 +145,7 @@ public InputStream newInputStream(Id.Artifact artifactId) throws IOException, No
artifactId.getName(),
artifactId.getVersion(),
scope);
HttpURLConnection urlConn = remoteClient.openConnection(HttpMethod.GET, url);
HttpURLConnection urlConn = getRemoteClient().openConnection(HttpMethod.GET, url);
throwIfError(artifactId, urlConn);

// Use FilterInputStream and override close to ensure the connection is closed once the input stream is closed
Expand Down Expand Up @@ -161,7 +173,7 @@ public List<ArtifactDetail> getArtifactDetails(ArtifactRange range, int limit,
range.getUpper().toString(),
limit,
order.name());
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url);
HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url);
HttpResponse httpResponse = execute(requestBuilder.build());
List<ArtifactDetail> details = GSON.fromJson(httpResponse.getResponseBodyAsString(),
ARTIFACT_DETAIL_LIST_TYPE);
Expand All @@ -187,7 +199,13 @@ protected Location getArtifactLocation(ArtifactDescriptor descriptor)

private HttpResponse execute(HttpRequest request)
throws IOException, NotFoundException, UnauthorizedException {
HttpResponse httpResponse = remoteClient.execute(request);

RetryStrategy baseRetryStrategy = RetryStrategies.exponentialDelay(
RETRY_BASE_DELAY_MILLIS, RETRY_MAX_DELAY_MILLIS, TimeUnit.MILLISECONDS);
HttpResponse httpResponse =
Retries.callWithRetries(() -> getRemoteClient().execute(request),
RetryStrategies.timeLimit(RETRY_TIMEOUT_SECS, TimeUnit.SECONDS, baseRetryStrategy),
RETRYABLE_PREDICATE);
if (httpResponse.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new NotFoundException(httpResponse.getResponseBodyAsString());
}
Expand Down Expand Up @@ -226,4 +244,9 @@ private void throwIfError(Id.Artifact artifactId,
errorMsg));
}
}

@VisibleForTesting
public RemoteClient getRemoteClient() {
return remoteClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@

package io.cdap.cdap.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.google.inject.Inject;
import io.cdap.cdap.api.retry.Idempotency;

Check warning on line 25 in cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/RemoteApplicationDetailFetcher.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.imports.UnusedImportsCheck

Unused import - io.cdap.cdap.api.retry.Idempotency.
import io.cdap.cdap.api.retry.RetryableException;
import io.cdap.cdap.common.NamespaceNotFoundException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.http.DefaultHttpRequestConfig;
import io.cdap.cdap.common.internal.remote.RemoteClient;
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.id.ApplicationReference;
Expand All @@ -40,7 +46,9 @@
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Fetch application detail via internal REST API calls.
Expand All @@ -57,6 +65,11 @@ public class RemoteApplicationDetailFetcher implements ApplicationDetailFetcher
private static final String APPLICATIONS_KEY = "applications";
private static final String NEXT_PAGE_TOKEN_KEY = "nextPageToken";

private static final Predicate<Throwable> RETRYABLE_PREDICATE =
throwable -> (throwable instanceof RetryableException);
private static final int RETRY_BASE_DELAY_MILLIS = 100;
private static final int RETRY_MAX_DELAY_MILLIS = 5000;
private static final int RETRY_TIMEOUT_SECS = 300;
private final RemoteClient remoteClient;

/**
Expand All @@ -79,7 +92,7 @@ public ApplicationDetail get(ApplicationReference appRef)
throws IOException, NotFoundException, UnauthorizedException {
String url = String.format("namespaces/%s/app/%s",
appRef.getNamespace(), appRef.getApplication());
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url);
HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url);
HttpResponse httpResponse;
httpResponse = execute(requestBuilder.build());
return GSON.fromJson(httpResponse.getResponseBodyAsString(), ApplicationDetail.class);
Expand All @@ -95,7 +108,7 @@ public void scan(String namespace, Consumer<ApplicationDetail> consumer, Integer
String token;

do {
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.GET, url);
HttpRequest.Builder requestBuilder = getRemoteClient().requestBuilder(HttpMethod.GET, url);
HttpResponse httpResponse;
try {
httpResponse = execute(requestBuilder.build());
Expand Down Expand Up @@ -126,7 +139,12 @@ public void scan(String namespace, Consumer<ApplicationDetail> consumer, Integer

private HttpResponse execute(HttpRequest request)
throws IOException, NotFoundException, UnauthorizedException {
HttpResponse httpResponse = remoteClient.execute(request);
RetryStrategy baseRetryStrategy = RetryStrategies.exponentialDelay(
RETRY_BASE_DELAY_MILLIS, RETRY_MAX_DELAY_MILLIS, TimeUnit.MILLISECONDS);
HttpResponse httpResponse =
Retries.callWithRetries(() -> getRemoteClient().execute(request),
RetryStrategies.timeLimit(RETRY_TIMEOUT_SECS, TimeUnit.SECONDS, baseRetryStrategy),
RETRYABLE_PREDICATE);
if (httpResponse.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new NotFoundException(httpResponse.getResponseBodyAsString());
}
Expand All @@ -135,4 +153,9 @@ private HttpResponse execute(HttpRequest request)
}
return httpResponse;
}

@VisibleForTesting
public RemoteClient getRemoteClient() {
return remoteClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
import io.cdap.cdap.api.artifact.ArtifactRange;
import io.cdap.cdap.api.artifact.ArtifactScope;
import io.cdap.cdap.api.artifact.ArtifactVersion;
import io.cdap.cdap.api.retry.RetryableException;
import io.cdap.cdap.common.id.Id;
import io.cdap.cdap.common.internal.remote.RemoteClient;
import io.cdap.cdap.internal.app.runtime.artifact.ArtifactDetail;
import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepositoryReader;
import io.cdap.cdap.internal.app.runtime.artifact.RemoteArtifactRepositoryReader;
import io.cdap.cdap.proto.artifact.ArtifactSortOrder;
import io.cdap.cdap.proto.id.ArtifactId;
import io.cdap.cdap.proto.id.NamespaceId;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;

public class ArtifactHttpHandlerInternalTest extends ArtifactHttpHandlerTestBase {
@Test
Expand Down Expand Up @@ -106,15 +111,51 @@ public void testRemoteArtifactRepositoryReaderGetArtifact() throws Exception {
// Add a system artifact
String systemArtfiactName = "sysApp";
String systemArtifactVeresion = "1.0.0";
ArtifactId systemArtifactId = NamespaceId.SYSTEM.artifact(systemArtfiactName, systemArtifactVeresion);
ArtifactId systemArtifactId =
NamespaceId.SYSTEM.artifact(systemArtfiactName, systemArtifactVeresion);
addAppAsSystemArtifacts(systemArtifactId);

// Fetch ArtifactDetail using RemoteArtifactRepositoryReader and verify
ArtifactRepositoryReader remoteReader = getInjector().getInstance(RemoteArtifactRepositoryReader.class);
ArtifactRepositoryReader remoteReader =
getInjector().getInstance(RemoteArtifactRepositoryReader.class);
ArtifactDetail detail = remoteReader.getArtifact(Id.Artifact.fromEntityId(systemArtifactId));
Assert.assertNotNull(detail);
ArtifactDetail expectedDetail = getArtifactDetailFromRepository(Id.Artifact.fromEntityId(systemArtifactId));
Assert.assertTrue(detail.equals(expectedDetail));
ArtifactDetail expectedDetail =
getArtifactDetailFromRepository(Id.Artifact.fromEntityId(systemArtifactId));
Assert.assertEquals(detail, expectedDetail);

wipeData();
}

@Test
public void testRemoteArtifactRepositoryReaderGetArtifactWithRetries() throws Exception {
// Add a system artifact
String systemArtfiactName = "sysApp";
String systemArtifactVeresion = "1.0.0";
ArtifactId systemArtifactId =
NamespaceId.SYSTEM.artifact(systemArtfiactName, systemArtifactVeresion);
addAppAsSystemArtifacts(systemArtifactId);
// Fetch ArtifactDetail using RemoteArtifactRepositoryReader and verify
RemoteArtifactRepositoryReader remoteReader =
getInjector().getInstance(RemoteArtifactRepositoryReader.class);
RemoteClient remoteClient = remoteReader.getRemoteClient();

RemoteClient mockRemoteClient = Mockito.spy(remoteClient);
RemoteArtifactRepositoryReader mockRemoteReader = Mockito.spy(remoteReader);
Mockito.doReturn(mockRemoteClient).when(mockRemoteReader).getRemoteClient();

Mockito
.doThrow(new RetryableException(new SocketTimeoutException()))
.doAnswer(InvocationOnMock::callRealMethod)
.when(mockRemoteClient).execute(Mockito.any());

ArtifactDetail detail =
mockRemoteReader.getArtifact(Id.Artifact.fromEntityId(systemArtifactId));
Assert.assertNotNull(detail);
ArtifactDetail expectedDetail =
getArtifactDetailFromRepository(Id.Artifact.fromEntityId(systemArtifactId));
Assert.assertEquals(detail, expectedDetail);
Mockito.verify(mockRemoteClient, Mockito.times(2)).execute(Mockito.any());

wipeData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import com.google.common.collect.ImmutableList;
import io.cdap.cdap.AllProgramsApp;
import io.cdap.cdap.AppWithSchedule;
import io.cdap.cdap.api.retry.RetryableException;
import io.cdap.cdap.common.NamespaceNotFoundException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.internal.remote.RemoteClient;
import io.cdap.cdap.gateway.handlers.AppLifecycleHttpHandlerInternal;
import io.cdap.cdap.internal.app.services.http.AppFabricTestBase;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.id.ApplicationReference;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -35,6 +38,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;

/**
* Tests for {@link RemoteApplicationDetailFetcher} {@link LocalApplicationDetailFetcher} and
Expand Down Expand Up @@ -113,9 +118,30 @@ public void testGetApplication() throws Exception {

// Deploy the application
deploy(AllProgramsApp.class, 200, Constants.Gateway.API_VERSION_3_TOKEN, namespace);

// Get and validate the application
RemoteClient mockRemoteClient = null;
if (fetcherType == ApplicationDetailFetcherType.REMOTE) {
// test remote application detail fetcher retries
RemoteClient remoteClient = ((RemoteApplicationDetailFetcher) fetcher).getRemoteClient();

mockRemoteClient = Mockito.spy(remoteClient);
fetcher = Mockito.spy((RemoteApplicationDetailFetcher) fetcher);
Mockito
.doReturn(mockRemoteClient)
.when((RemoteApplicationDetailFetcher) fetcher).getRemoteClient();

Mockito
.doThrow(new RetryableException(new SocketTimeoutException()))
.doAnswer(InvocationOnMock::callRealMethod)
.when(mockRemoteClient).execute(Mockito.any());
}

ApplicationDetail appDetail = fetcher.get(new ApplicationReference(namespace, appName));
assertAllProgramAppDetail(appDetail);
if (mockRemoteClient != null) {
Mockito.verify(mockRemoteClient, Mockito.times(2)).execute(Mockito.any());
}

// Delete the application
Assert.assertEquals(
Expand Down

0 comments on commit 73c1d7d

Please sign in to comment.