Skip to content

Commit

Permalink
dcache-bulk: use rate limiter to throttle semaphore release
Browse files Browse the repository at this point in the history
Motivation:

master@0b4140b454b819de55c7c412539294537ff0beb8
https://rb.dcache.org/r/14118/

restructured the container job for greater
throughput.  In order to pace execution of
calls on external services, however, two
semaphores were used.  For task execution,
the semaphore is not released until the
future of the execution completes.

This works well but has the drawback of
not allowing submission to continue to
services like PinManager if there
are max permit number of tasks waiting
for a response (e.g., in the case of
actual staging).

On the other hand, releasing the
semaphore immediately upon reception
of the future, causes calls to external
services to pile up, causing timeout errors
of various sorts.

Modification:

Relying on the thread pool size will
not work in this case because the
execution of the activity must
be asynchronous; the turnover
is extremely rapid.

Instead we adopt the solution of
a rate limiter to throttle the
semaphore release.  Each activity
is given a limiter for the
service endpoint it communicates
with.  The rates for these
(PinManager, PnfsManager, QoSEngine)
are configurable.

Result:

Performance and stability is sustained,
but throughput continues when the submitted
task activities are in a state of waiting
for future completion.

Target: master
Request: 9.2 (fixes an important issue)
Patch:  https://rb.dcache.org/r/14136/
Requires-notes: yes  (No longer blocks throughput
of new tasks when the number of tasks waiting
for completion from an external service like
PinManager reaches max available task permits.)
Acked-by: Tigran
  • Loading branch information
alrossi authored and mksahakyan committed Oct 27, 2023
1 parent 4e9864a commit ec5783c
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public final class BulkServiceCommands implements CellCommandListener {
/**
* name | class | type | permits
*/
private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s ";
private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s";

/**
* name | required | description
Expand Down Expand Up @@ -264,8 +264,9 @@ public String nextCommand() {
}
}

private static String formatActivity(Entry<String, BulkActivityProvider> entry) {
private static String formatActivity(Entry<String, BulkActivityProvider> entry, BulkActivityFactory factory) {
BulkActivityProvider provider = entry.getValue();

return String.format(FORMAT_ACTIVITY,
entry.getKey(),
provider.getActivityClass(),
Expand Down Expand Up @@ -544,14 +545,14 @@ public String call() throws Exception {
Sorter sorter = new Sorter(SortOrder.valueOf(sort.toUpperCase()));
String activities = activityFactory.getProviders().entrySet()
.stream()
.map(BulkServiceCommands::formatActivity)
.map(e -> formatActivity(e, activityFactory))
.sorted(sorter)
.collect(joining("\n"));
if (activities == null) {
return "There are no mapped activities!";
}

return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE")
return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE", "RATE")
+ "\n" + activities;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.RateLimiter;
import diskCacheV111.util.FsPath;
import java.util.Collections;
import java.util.EnumSet;
Expand All @@ -76,11 +77,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.vehicles.FileAttributes;

/**
* Base definition for a bulk activity. Specifies the interfaces for executing the action on a
* Base definition for a bulk activity. Specifies the interfaces for executing
* the action on a
* given target and for listening (asynchronously) for a result.
* <p>
* An instance of an activity is constructed on a request-by-request basis
* by the JobFactory. It should not be shared between requests.
* by the JobFactory. It should not be shared between requests.
*
* @param <R> the type of object returned with the listenable future.
*/
Expand All @@ -90,10 +92,10 @@ public enum TargetType {
FILE, DIR, BOTH
}

public static final Set<FileAttribute> MINIMALLY_REQUIRED_ATTRIBUTES
= Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE,
FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY,
FileAttribute.RETENTION_POLICY));
public static final Set<FileAttribute> MINIMALLY_REQUIRED_ATTRIBUTES = Collections
.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE,
FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY,
FileAttribute.RETENTION_POLICY));

private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy();

Expand All @@ -102,6 +104,7 @@ public enum TargetType {

protected Subject subject;
protected Restriction restriction;
protected RateLimiter rateLimiter;
protected BulkTargetRetryPolicy retryPolicy;
protected Set<BulkActivityArgumentDescriptor> descriptors;

Expand All @@ -127,6 +130,20 @@ public void setRetryPolicy(BulkTargetRetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}

public void throttle() {
if (rateLimiter != null) {
rateLimiter.acquire();
}
}

public RateLimiter getRateLimiter() {
return rateLimiter;
}

public void setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

public TargetType getTargetType() {
return targetType;
}
Expand Down Expand Up @@ -154,17 +171,18 @@ public void setDescriptors(Set<BulkActivityArgumentDescriptor> descriptors) {
/**
* Performs the activity.
*
* @param rid of the request.
* @param tid of the target.
* @param path of the target on which to perform the activity.
* @param rid of the request.
* @param tid of the target.
* @param path of the target on which to perform the activity.
* @return future result of the activity.
* @throws BulkServiceException
*/
public abstract ListenableFuture<R> perform(String rid, long tid, FsPath path, FileAttributes attributes)
throws BulkServiceException;
throws BulkServiceException;

/**
* An activity instance is on a request-by-request basis, so the parameters need to be
* An activity instance is on a request-by-request basis, so the parameters need
* to be
* configured by the factory.
*
* @param arguments parameters of the specific activity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import diskCacheV111.poolManager.PoolManagerAware;
import diskCacheV111.util.NamespaceHandlerAware;
import diskCacheV111.util.PnfsHandler;
Expand All @@ -70,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.dcache.auth.Subjects;
import org.dcache.auth.attributes.Restriction;
Expand All @@ -90,17 +92,20 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
/**
* Creates activities on the basis of activity mappings.
* <p>
* For each activity (such as pinning, deletion, etc.), there must be an SPI provider which creates
* For each activity (such as pinning, deletion, etc.), there must be an SPI
* provider which creates
* the class implementing the activity API contract.
*/
public final class BulkActivityFactory implements CellMessageSender, EnvironmentAware {

private static final Logger LOGGER = LoggerFactory.getLogger(BulkActivityFactory.class);

private final Map<String, BulkActivityProvider> providers = Collections.synchronizedMap(
new HashMap<>());
new HashMap<>());

private Map<String, BulkTargetRetryPolicy> retryPolicies;
private Map<String, RateLimiter> rateLimiters;
private Map<String, String> rateLimiterActivityIndex;
private Map<String, Object> environment;

private CellStub pnfsManager;
Expand All @@ -114,7 +119,8 @@ public final class BulkActivityFactory implements CellMessageSender, Environment
private boolean initialized;

/**
* Generates an instance of the plugin-specific activity to be used by the request jobs.
* Generates an instance of the plugin-specific activity to be used by the
* request jobs.
*
* @param request being serviced.
* @param subject of user who submitted the request.
Expand All @@ -137,11 +143,16 @@ public BulkActivity createActivity(BulkRequest request, Subject subject,
BulkActivity bulkActivity = provider.createActivity();
bulkActivity.setSubject(subject);
bulkActivity.setRestriction(restriction);
String rateLimiterType = rateLimiterActivityIndex.get(activity);
if (rateLimiterType != null) {
bulkActivity.setRateLimiter(rateLimiters.get(rateLimiterType));
}

BulkTargetRetryPolicy retryPolicy = retryPolicies.get(activity);
if (retryPolicy != null) {
bulkActivity.setRetryPolicy(retryPolicy);
}

configureEndpoints(bulkActivity);
bulkActivity.configure(request.getArguments());

Expand Down Expand Up @@ -207,6 +218,17 @@ public void setQoSResponseReceiver(QoSResponseReceiver qoSResponseReceiver) {
this.qoSResponseReceiver = qoSResponseReceiver;
}

@Required
public void setRateLimiters(Map<String, Double> rates) {
rateLimiters = rates.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> RateLimiter.create(e.getValue())));
}

@Required
public void setRateLimiterActivityIndex(Map<String, String> rateLimiterActivityIndex) {
this.rateLimiterActivityIndex = rateLimiterActivityIndex;
}

@Required
public void setRetryPolicies(Map<String, BulkTargetRetryPolicy> retryPolicies) {
this.retryPolicies = retryPolicies;
Expand Down
Loading

0 comments on commit ec5783c

Please sign in to comment.