Skip to content

Commit

Permalink
make MaxFileDescriptorCount configurable for neptune-export service (#…
Browse files Browse the repository at this point in the history
…209)

* add bucket name to log statement when uploading files to s3

* use value from param for maxFileDescriptorCount in neptune-export service
  • Loading branch information
gopuneet authored Apr 27, 2022
1 parent b93899f commit aaed882
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public void run() {
new PropertyGraphRangeModule().config(),
gremlinFilters.filters(),
cluster.concurrencyConfig(),
targetConfig, featureToggles());
targetConfig, featureToggles(),
getMaxFileDescriptorCount());

graphSchema = exportJob.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void run() {
gremlinFilters.filters(),
cluster.concurrencyConfig(),
targetConfig,
featureToggles());
featureToggles(),
getMaxFileDescriptorCount());

graphSchema = Timer.timedActivity(
"export",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public void run() {
range.config(),
gremlinFilters.filters(),
cluster.concurrencyConfig(),
targetConfig, featureToggles());
targetConfig, featureToggles(),
getMaxFileDescriptorCount());

graphSchema = exportJob.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public abstract class NeptuneExportCommand extends NeptuneExportBaseCommand impl

private boolean isCliInvocation = false;

private int maxFileDescriptorCount;

private NeptuneExportEventHandler eventHandler = NeptuneExportEventHandler.NULL_EVENT_HANDLER;

@Override
Expand Down Expand Up @@ -83,4 +85,12 @@ void handleException(Throwable e) {
FeatureToggles featureToggles() {
return featureToggleModule.featureToggles();
}

public int getMaxFileDescriptorCount() {
return maxFileDescriptorCount;
}

public void setMaxFileDescriptorCount(int maxFileDescriptorCount) {
this.maxFileDescriptorCount = maxFileDescriptorCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
@Command(name = "nesvc", description = "neptune-export service", hidden = true)
public class RunNeptuneExportSvc extends NeptuneExportBaseCommand implements Runnable {

/**
* Same as the default value given in the CFN template at https://docs.aws.amazon.com/neptune/latest/userguide/export-service.html
*/
public static final int DEFAULT_MAX_FILE_DESCRIPTOR_COUNT = 10000;

@Option(name = {"--json"}, description = "JSON")
@Once
private String json;
Expand All @@ -42,13 +47,17 @@ public class RunNeptuneExportSvc extends NeptuneExportBaseCommand implements Run
@Once
private boolean cleanRootPath = false;

@Option(name = {"--max-file-descriptor-count"}, description = "Maximum number of simultaneously open files.", hidden = true)
@Once
private int maxFileDescriptorCount = DEFAULT_MAX_FILE_DESCRIPTOR_COUNT;

@Override
public void run() {

InputStream input = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));

try {
new NeptuneExportLambda(rootPath, cleanRootPath).handleRequest(input, System.out, new Context() {
new NeptuneExportLambda(rootPath, cleanRootPath, maxFileDescriptorCount).handleRequest(input, System.out, new Context() {
@Override
public String getAwsRequestId() {
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private void uploadExportFilesToS3(TransferManager transferManager, File directo

ObjectTaggingProvider taggingProvider = uploadContext -> createObjectTags(profiles);

logger.info("Uploading export files to {}", outputS3ObjectInfo.key());
logger.info("Uploading export files to s3 bucket={} key={}", outputS3ObjectInfo.bucket(), outputS3ObjectInfo.key());

MultipleFileUpload upload = transferManager.uploadDirectory(
outputS3ObjectInfo.bucket(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,25 @@

package com.amazonaws.services.neptune.export;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.neptune.util.EnvironmentVariableUtils;
import com.amazonaws.services.neptune.util.S3ObjectInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;

import java.io.*;
import java.nio.charset.StandardCharsets;

import static com.amazonaws.services.neptune.RunNeptuneExportSvc.DEFAULT_MAX_FILE_DESCRIPTOR_COUNT;
import static java.nio.charset.StandardCharsets.UTF_8;

public class NeptuneExportLambda implements RequestStreamHandler {
Expand All @@ -33,14 +39,16 @@ public class NeptuneExportLambda implements RequestStreamHandler {

private final String localOutputPath;
private final boolean cleanOutputPath;
private final int maxFileDescriptorCount;

public NeptuneExportLambda() {
this(TEMP_PATH, true);
this(TEMP_PATH, true, DEFAULT_MAX_FILE_DESCRIPTOR_COUNT);
}

public NeptuneExportLambda(String localOutputPath, boolean cleanOutputPath) {
public NeptuneExportLambda(String localOutputPath, boolean cleanOutputPath, int maxFileDescriptorCount) {
this.localOutputPath = localOutputPath;
this.cleanOutputPath = cleanOutputPath;
this.maxFileDescriptorCount = maxFileDescriptorCount;
}

@Override
Expand Down Expand Up @@ -145,7 +153,8 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
completionFilePayload,
additionalParams,
maxConcurrency,
s3Region);
s3Region,
maxFileDescriptorCount);

S3ObjectInfo outputS3ObjectInfo = neptuneExportService.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@
import com.amazonaws.services.neptune.NeptuneExportEventHandlerHost;
import org.apache.commons.lang.StringUtils;

import static com.amazonaws.services.neptune.export.NeptuneExportService.MAX_FILE_DESCRIPTOR_COUNT;

public class NeptuneExportRunner {

private final String[] args;
private final NeptuneExportEventHandler eventHandler;
private final boolean isCliInvocation;
private final int maxFileDescriptorCount;

public NeptuneExportRunner(String[] args) {
this(args, NeptuneExportEventHandler.NULL_EVENT_HANDLER, true);
this(args, NeptuneExportEventHandler.NULL_EVENT_HANDLER, true, MAX_FILE_DESCRIPTOR_COUNT);
}

public NeptuneExportRunner(String[] args, NeptuneExportEventHandler eventHandler, boolean isCliInvocation) {
public NeptuneExportRunner(String[] args, NeptuneExportEventHandler eventHandler,
boolean isCliInvocation, int maxFileDescriptorCount) {
this.args = args;
this.eventHandler = eventHandler;
this.isCliInvocation = isCliInvocation;
this.maxFileDescriptorCount = maxFileDescriptorCount;
}

public void run() {
Expand All @@ -57,6 +62,10 @@ public void run() {
((NeptuneExportCommand) cmd).setIsCliInvocation(isCliInvocation);
}

if (NeptuneExportCommand.class.isAssignableFrom(cmd.getClass())){
((NeptuneExportCommand) cmd).setMaxFileDescriptorCount(maxFileDescriptorCount);
}

cmd.run();
} catch (Exception e) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class NeptuneExportService {
public static final List<Tag> NEPTUNE_EXPORT_TAGS = Collections.singletonList(new Tag("application", "neptune-export"));
public static final String NEPTUNE_ML_PROFILE_NAME = "neptune_ml";
public static final String INCREMENTAL_EXPORT_PROFILE_NAME = "incremental_export";
public static final int MAX_FILE_DESCRIPTOR_COUNT = 9000;

private final String cmd;
private final String localOutputPath;
Expand All @@ -58,6 +59,7 @@ public class NeptuneExportService {
private final ObjectNode additionalParams;
private final int maxConcurrency;
private final String s3Region;
private final int maxFileDescriptorCount;

public NeptuneExportService(String cmd,
String localOutputPath,
Expand All @@ -72,7 +74,8 @@ public NeptuneExportService(String cmd,
ObjectNode completionFilePayload,
ObjectNode additionalParams,
int maxConcurrency,
String s3Region) {
String s3Region,
int maxFileDescriptorCount) {
this.cmd = cmd;
this.localOutputPath = localOutputPath;
this.cleanOutputPath = cleanOutputPath;
Expand All @@ -87,6 +90,7 @@ public NeptuneExportService(String cmd,
this.additionalParams = additionalParams;
this.maxConcurrency = maxConcurrency;
this.s3Region = s3Region;
this.maxFileDescriptorCount = maxFileDescriptorCount;
}

public S3ObjectInfo execute() throws IOException {
Expand Down Expand Up @@ -210,12 +214,19 @@ public S3ObjectInfo execute() throws IOException {
eventHandlerCollection.addHandler(incrementalExportEventHandler);
}

/**
* We are removing a buffer of 1000 for maxFileDescriptorCount used at {@link com.amazonaws.services.neptune.propertygraph.io.LabelWriters#put}
* since the value received from neptune-export service is set as the `nofile` ulimit in the AWS Batch
* container properties and there might be other processes on the container having open files.
* This ensures we close the leastRecentlyAccessed files before exceeding the hard limit for `nofile` ulimit.
*/
final int maxFileDescriptorCountAfterRemovingBuffer = Math.max(maxFileDescriptorCount - 1000, MAX_FILE_DESCRIPTOR_COUNT);

eventHandlerCollection.onBeforeExport(args, s3UploadParams);

logger.info("Args after service init: {}", String.join(" ", args.values()));

new NeptuneExportRunner(args.values(), eventHandlerCollection, false).run();
new NeptuneExportRunner(args.values(), eventHandlerCollection, false, maxFileDescriptorCountAfterRemovingBuffer).run();

return exportToS3EventHandler.result();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ExportPropertyGraphJob {
private final ConcurrencyConfig concurrencyConfig;
private final PropertyGraphTargetConfig targetConfig;
private final FeatureToggles featureToggles;
private final int maxFileDescriptorCount;

public ExportPropertyGraphJob(Collection<ExportSpecification> exportSpecifications,
GraphSchema graphSchema,
Expand All @@ -56,7 +57,8 @@ public ExportPropertyGraphJob(Collection<ExportSpecification> exportSpecificatio
GremlinFilters gremlinFilters,
ConcurrencyConfig concurrencyConfig,
PropertyGraphTargetConfig targetConfig,
FeatureToggles featureToggles) {
FeatureToggles featureToggles,
int maxFileDescriptorCount) {
this.exportSpecifications = exportSpecifications;
this.graphSchema = graphSchema;
this.g = g;
Expand All @@ -65,6 +67,7 @@ public ExportPropertyGraphJob(Collection<ExportSpecification> exportSpecificatio
this.concurrencyConfig = concurrencyConfig;
this.targetConfig = targetConfig;
this.featureToggles = featureToggles;
this.maxFileDescriptorCount = maxFileDescriptorCount;
}

public GraphSchema execute() throws Exception {
Expand Down Expand Up @@ -115,7 +118,8 @@ private MasterLabelSchemas export(ExportSpecification exportSpecification) throw
rangeFactory,
status,
fileIndex,
fileDescriptorCount
fileDescriptorCount,
maxFileDescriptorCount
);
futures.add(taskExecutor.submit(exportTask));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public ExportPropertyGraphTask(GraphElementSchemas graphElementSchemas,
GremlinFilters gremlinFilters,
Status status,
AtomicInteger index,
AtomicInteger fileDescriptorCount) {
AtomicInteger fileDescriptorCount,
int maxFileDescriptorCount) {
this.graphElementSchemas = graphElementSchemas;
this.labelsFilter = labelsFilter;
this.graphClient = graphClient;
Expand All @@ -60,7 +61,7 @@ public ExportPropertyGraphTask(GraphElementSchemas graphElementSchemas,
this.gremlinFilters = gremlinFilters;
this.status = status;
this.index = index;
this.labelWriters = new LabelWriters<>(fileDescriptorCount);
this.labelWriters = new LabelWriters<>(fileDescriptorCount, maxFileDescriptorCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ public class LabelWriters<T extends Map<?, ?>> implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(LabelWriters.class);

private static final int MAX_FILE_DESCRIPTOR_COUNT = 9000;
private final int maxFileDescriptorCount;

private final AtomicInteger fileDescriptorCount;
private final LinkedHashMap<Label, LabelWriter<T>> labelWriters = new LinkedHashMap<>(16, 0.75f, true);

public LabelWriters(AtomicInteger fileDescriptorCount) {
public LabelWriters(AtomicInteger fileDescriptorCount, int maxFileDescriptorCount) {
this.fileDescriptorCount = fileDescriptorCount;
this.maxFileDescriptorCount = maxFileDescriptorCount;
}

public boolean containsKey(Label label){
Expand All @@ -39,7 +40,7 @@ public boolean containsKey(Label label){

public void put(Label label, LabelWriter<T> labelWriter) throws Exception {

if (fileDescriptorCount.get() > MAX_FILE_DESCRIPTOR_COUNT && labelWriters.size() > 1){
if (fileDescriptorCount.get() > maxFileDescriptorCount && labelWriters.size() > 1){
Label leastRecentlyAccessedLabel = labelWriters.keySet().iterator().next();
LabelWriter<T> leastRecentlyAccessedLabelWriter = labelWriters.remove(leastRecentlyAccessedLabel);
logger.info("Closing writer for label {} for output {} so as to conserve file descriptors", leastRecentlyAccessedLabel.labelsAsString(), leastRecentlyAccessedLabelWriter.outputId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public ExportPropertyGraphTask<Map<String, Object>> createExportTask(GraphSchema
RangeFactory rangeFactory,
Status status,
AtomicInteger index,
AtomicInteger fileDescriptorCount) {
AtomicInteger fileDescriptorCount,
int maxFileDescriptorCount) {
return new ExportPropertyGraphTask<>(
graphSchema.copyOfGraphElementSchemasFor(graphElementType),
labelsFilter,
Expand All @@ -113,7 +114,8 @@ public ExportPropertyGraphTask<Map<String, Object>> createExportTask(GraphSchema
gremlinFilters,
status,
index,
fileDescriptorCount
fileDescriptorCount,
maxFileDescriptorCount
);
}

Expand Down

0 comments on commit aaed882

Please sign in to comment.