Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resilient downloads #181

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions occurrence-download/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
</dependency>

<!-- Used during build phase only, to generate HQL scripts -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -17,6 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.function.Consumer;

/**
Expand All @@ -31,6 +37,11 @@ public class SearchQueryProcessor {

private static final Logger LOG = LoggerFactory.getLogger(SearchQueryProcessor.class);

private static final RetryRegistry RETRY_REGISTRY = RetryRegistry.of(RetryConfig.custom()
.maxAttempts(5)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(3), 2d))
.build());

/**
* Executes a query and applies the predicate to each result.
*
Expand All @@ -49,26 +60,41 @@ public static void processQuery(DownloadFileWork downloadFileWork, Consumer<Occu
// Creates a search request instance using the search request that comes in the fileJob
SearchSourceBuilder searchSourceBuilder = createSearchQuery(downloadFileWork.getQuery());


while (recordCount < nrOfOutputRecords) {

searchSourceBuilder.size(recordCount + LIMIT > nrOfOutputRecords ? nrOfOutputRecords - recordCount : LIMIT);
searchSourceBuilder.from(downloadFileWork.getFrom() + recordCount);
searchSourceBuilder.fetchSource(null, "all"); //All field is not needed in the response
SearchRequest searchRequest = new SearchRequest().indices(downloadFileWork.getEsIndex()).source(searchSourceBuilder);

SearchResponse searchResponse = downloadFileWork.getEsClient().search(searchRequest, RequestOptions.DEFAULT);
SearchResponse searchResponse = trySearch(searchRequest, downloadFileWork.getEsClient());
consume(searchResponse, resultHandler);

SearchHit[] searchHits = searchResponse.getHits().getHits();
recordCount += searchHits.length;

}
} catch (IOException ex) {
} catch (Exception ex) {
LOG.error("Error querying Elasticsearch", ex);
throw Throwables.propagate(ex);
}
}

/**
* This functions acts as a defensive retry mechanism to perform queries against an Elasticsearch cluster.
* The index being queried can potentially being modified.
*/
private static SearchResponse trySearch(SearchRequest searchRequest, RestHighLevelClient restHighLevelClient) {
return Retry.decorateSupplier(RETRY_REGISTRY.retry("SearchRetry"), () -> {
try {
return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
).get();
}

private static void consume(SearchResponse searchResponse, Consumer<Occurrence> consumer) {
EsResponseParser.buildDownloadResponse(searchResponse, new PagingRequest(0, searchResponse.getHits().getHits().length))
.getResults().forEach(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
import org.gbif.occurrence.download.file.simplecsv.SimpleCsvDownloadAggregator;
import org.gbif.occurrence.download.file.specieslist.SpeciesListDownloadAggregator;
import org.gbif.occurrence.download.oozie.DownloadPrepareAction;
import org.gbif.occurrence.download.util.RegistryClientUtil;
import org.gbif.occurrence.search.es.EsConfig;
import org.gbif.registry.ws.client.OccurrenceDownloadClient;
import org.gbif.wrangler.lock.Mutex;
import org.gbif.wrangler.lock.ReadWriteMutexFactory;;
import org.gbif.wrangler.lock.zookeeper.ZookeeperSharedReadWriteMutex;
import org.gbif.ws.client.ClientFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -50,8 +45,6 @@ public class DownloadWorkflowModule {

private static final String ES_PREFIX = "es.";

private static final String INDEX_LOCKING_PATH = "/indices/";

private final WorkflowConfiguration workflowConfiguration;

private final DownloadJobConfiguration downloadJobConfiguration;
Expand All @@ -61,21 +54,21 @@ public class DownloadWorkflowModule {
* This is the initial action that counts records and its output is used to decide if a download is processed through Hive or Es.
*/
public DownloadPrepareAction downloadPrepareAction() {

//Using the registry client util because it configures the retry mechanism

return DownloadPrepareAction.builder().esClient(esClient())
.esIndex(workflowConfiguration.getSetting(DefaultSettings.ES_INDEX_KEY))
.smallDownloadLimit(workflowConfiguration.getIntSetting(DefaultSettings.MAX_RECORDS_KEY))
.workflowConfiguration(workflowConfiguration)
.occurrenceDownloadService(clientFactory().newInstance(OccurrenceDownloadClient.class))
.occurrenceDownloadService(registryClientUtil().setupOccurrenceDownloadService())
.build();
}

/**
* GBIF Ws client factory.
*/
public ClientFactory clientFactory() {
return new ClientFactory(workflowConfiguration.getSetting(DefaultSettings.DOWNLOAD_USER_KEY),
workflowConfiguration.getSetting(DefaultSettings.DOWNLOAD_PASSWORD_KEY),
workflowConfiguration.getSetting(DefaultSettings.REGISTRY_URL_KEY));
public RegistryClientUtil registryClientUtil() {
return new RegistryClientUtil(workflowConfiguration.getSetting(DefaultSettings.DOWNLOAD_USER_KEY),
workflowConfiguration.getSetting(DefaultSettings.DOWNLOAD_PASSWORD_KEY),
workflowConfiguration.getSetting(DefaultSettings.REGISTRY_URL_KEY));
}

/**
Expand All @@ -100,14 +93,6 @@ public static CuratorFramework curatorFramework(WorkflowConfiguration workflowCo
return curator;
}

/**
* Creates a RW Mutex, used later to create a mutex to synchronize modification to the ES index.
*/
public Mutex provideReadLock(CuratorFramework curatorFramework) {
ReadWriteMutexFactory readWriteMutexFactory = new ZookeeperSharedReadWriteMutex(curatorFramework, INDEX_LOCKING_PATH);
return readWriteMutexFactory
.createReadMutex(workflowConfiguration.getSetting(DefaultSettings.ES_INDEX_KEY));
}

/**
* Factory method for Elasticsearch client.
Expand Down Expand Up @@ -200,17 +185,17 @@ public DownloadAggregator getAggregator() {
switch (downloadFormat) {
case DWCA:
return new DwcaDownloadAggregator(downloadJobConfiguration,
clientFactory().newInstance(OccurrenceDownloadClient.class));
registryClientUtil().setupOccurrenceDownloadService());

case SIMPLE_CSV:
return new SimpleCsvDownloadAggregator(downloadJobConfiguration,
workflowConfiguration,
clientFactory().newInstance(OccurrenceDownloadClient.class));
registryClientUtil().setupOccurrenceDownloadService());

case SPECIES_LIST:
return new SpeciesListDownloadAggregator(downloadJobConfiguration,
workflowConfiguration,
clientFactory().newInstance(OccurrenceDownloadClient.class));
workflowConfiguration,
registryClientUtil().setupOccurrenceDownloadService());
case SIMPLE_AVRO:
case SIMPLE_WITH_VERBATIM_AVRO:
case IUCN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.curator.framework.CuratorFramework;
import org.gbif.wrangler.lock.Mutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,28 +65,24 @@ public static void run(WorkflowConfiguration workflowConfiguration, DownloadJobC
.downloadJobConfiguration(configuration)
.build();

try (CuratorFramework curatorIndices = module.curatorFramework()) {
// Create an Akka system
ActorSystem system = ActorSystem.create("DownloadSystem" + configuration.getDownloadKey());

// Create an Akka system
ActorSystem system = ActorSystem.create("DownloadSystem" + configuration.getDownloadKey());
// create the master
ActorRef master = module.downloadMaster(system);

// create the master
ActorRef master = module.downloadMaster(system);

Mutex readMutex = module.provideReadLock(curatorIndices);
readMutex.acquire();
// start the calculation
master.tell(new DownloadMaster.Start());
while (!master.isTerminated()) {
try {
Thread.sleep(SLEEP_TIME_BEFORE_TERMINATION);
} catch (InterruptedException ie) {
LOG.error("Thread interrupted", ie);
}
// start the calculation
master.tell(new DownloadMaster.Start());
while (!master.isTerminated()) {
try {
Thread.sleep(SLEEP_TIME_BEFORE_TERMINATION);
} catch (InterruptedException ie) {
LOG.error("Thread interrupted", ie);
}
system.shutdown();
readMutex.release();
}
system.shutdown();

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,39 @@
import org.gbif.registry.ws.client.DatasetOccurrenceDownloadUsageClient;
import org.gbif.registry.ws.client.OccurrenceDownloadClient;
import org.gbif.utils.file.properties.PropertiesUtil;
import org.gbif.ws.client.ClientFactory;
import org.gbif.ws.client.ClientBuilder;

import java.io.IOException;
import java.util.Optional;
import java.time.Duration;
import java.util.Properties;

/**
* Utility class to create registry web service clients.
*/
public class RegistryClientUtil {

private final ClientFactory clientFactory;
private static final Duration BACKOFF_INITIAL_INTERVAL = Duration.ofSeconds(4);
private static final double BACKOFF_MULTIPLIER = 2d;
private static final int BACKOFF_MAX_ATTEMPTS = 8;

private final ClientBuilder clientBuilder = new ClientBuilder();

public RegistryClientUtil(String userName, String password, String apiUrl) {
clientFactory = new ClientFactory(userName, password, apiUrl);
clientBuilder
.withUrl(apiUrl)
.withCredentials(userName, password)
.withExponentialBackoffRetry(BACKOFF_INITIAL_INTERVAL, BACKOFF_MULTIPLIER, BACKOFF_MAX_ATTEMPTS);
}

/**
* Constructs an instance using properties class instance.
*/
public RegistryClientUtil(Properties properties, String apiUrl) {
clientFactory = new ClientFactory(properties.getProperty(DownloadWorkflowModule.DefaultSettings.DOWNLOAD_USER_KEY),
properties.getProperty(DownloadWorkflowModule.DefaultSettings.DOWNLOAD_PASSWORD_KEY),
Optional.ofNullable(apiUrl).orElse(properties.getProperty(DownloadWorkflowModule.DefaultSettings.REGISTRY_URL_KEY)));
clientBuilder
.withUrl(apiUrl)
.withCredentials(properties.getProperty(DownloadWorkflowModule.DefaultSettings.DOWNLOAD_USER_KEY),
properties.getProperty(DownloadWorkflowModule.DefaultSettings.DOWNLOAD_PASSWORD_KEY))
.withExponentialBackoffRetry(BACKOFF_INITIAL_INTERVAL, BACKOFF_MULTIPLIER, BACKOFF_MAX_ATTEMPTS);
}


Expand Down Expand Up @@ -63,7 +72,7 @@ public RegistryClientUtil() {
* Sets up an http client with a one minute timeout and http support only.
*/
public DatasetService setupDatasetService() {
return clientFactory.newInstance(DatasetClient.class);
return clientBuilder.build(DatasetClient.class);
}

/**
Expand All @@ -72,7 +81,7 @@ public DatasetService setupDatasetService() {
* Sets up an http client with a one minute timeout and http support only.
*/
public DatasetOccurrenceDownloadUsageService setupDatasetUsageService() {
return clientFactory.newInstance(DatasetOccurrenceDownloadUsageClient.class);
return clientBuilder.build(DatasetOccurrenceDownloadUsageClient.class);
}

/**
Expand All @@ -81,7 +90,7 @@ public DatasetOccurrenceDownloadUsageService setupDatasetUsageService() {
* Sets up an http client with a one minute timeout and http support only.
*/
public OccurrenceDownloadService setupOccurrenceDownloadService() {
return clientFactory.newInstance(OccurrenceDownloadClient.class);
return clientBuilder.build(OccurrenceDownloadClient.class);
}

}
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
<findbugs-jsr305.version>3.0.1</findbugs-jsr305.version>
<servlet-api.version>2.5</servlet-api.version>
<metainf-services.version>1.5</metainf-services.version>
<resilience4j.version>1.5.0</resilience4j.version>

<junit-jupiter.version>5.5.2</junit-jupiter.version>
<mockito-junit-jupiter.version>3.1.0</mockito-junit-jupiter.version>
Expand Down Expand Up @@ -721,6 +722,11 @@
<artifactId>jsr305</artifactId>
<version>${findbugs-jsr305.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4j.version}</version>
</dependency>

<!-- hbase -->
<dependency>
Expand Down