Skip to content

Commit

Permalink
Create DWCA downloads in HDFS, rather than on local disk (which may f…
Browse files Browse the repository at this point in the history
…ill).
  • Loading branch information
MattBlissett committed Jul 29, 2021
1 parent 16ea848 commit c365017
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ public String getTempDir() {
return settings.getProperty(DownloadWorkflowModule.DefaultSettings.TMP_DIR_KEY);
}

/**
*
* @return HDFS temp dir where downloads files are created
*/
public String getHdfsTempDir() {
Preconditions.checkNotNull(settings);
return settings.getProperty(DownloadWorkflowModule.DefaultSettings.HDFS_TMP_DIR_KEY);
}

/**
*
* @param downloadKey download id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.gbif.hadoop.compress.d2.D2CombineInputStream;
import org.gbif.hadoop.compress.d2.D2Utils;
import org.gbif.hadoop.compress.d2.zip.ModalZipOutputStream;
import org.gbif.hadoop.compress.d2.zip.ZipEntry;
import org.gbif.occurrence.common.download.DownloadException;
import org.gbif.occurrence.download.conf.WorkflowConfiguration;
import org.gbif.occurrence.download.file.DownloadJobConfiguration;
Expand All @@ -27,7 +28,6 @@
import org.gbif.occurrence.query.TitleLookupService;
import org.gbif.occurrence.query.TitleLookupServiceFactory;
import org.gbif.registry.metadata.EMLWriter;
import org.gbif.utils.file.CompressionUtil;
import org.gbif.utils.file.FileUtils;

import java.io.BufferedOutputStream;
Expand All @@ -44,27 +44,29 @@
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
Expand Down Expand Up @@ -149,7 +151,7 @@ public static void buildArchive(DownloadJobConfiguration configuration, Workflow
titleLookup,
configuration,
workflowConfiguration);
generator.buildArchive(new File(tmpDir, configuration.getDownloadKey() + ".zip"));
generator.buildArchive();
}

private static void writeCitation(Writer citationWriter, Dataset dataset)
Expand Down Expand Up @@ -194,17 +196,14 @@ protected DwcaArchiveBuilder(DatasetService datasetService, OccurrenceDownloadSe
}

/**
* Main method to assemble the dwc archive and do all the work until we have a final zip file.
*
* @param zipFile the final zip file holding the entire archive
* Main method to assemble the DwC archive and do all the work until we have a final zip file.
*/
public void buildArchive(File zipFile) throws DownloadException {
LOG.info("Start building the archive {} ", zipFile.getPath());
public void buildArchive() throws DownloadException {
LOG.info("Start building the archive for {} ", configuration.getDownloadKey());

String zipFileName = configuration.getDownloadKey() + ".zip";

try {
if (zipFile.exists()) {
zipFile.delete();
}
if (!configuration.isSmallDownload()) {
// oozie might try several times to run this job, so make sure our filesystem is clean
cleanupFS();
Expand All @@ -226,15 +225,26 @@ public void buildArchive(File zipFile) throws DownloadException {
DwcArchiveUtils.createArchiveDescriptor(archiveDir);

// zip up
LOG.info("Zipping archive {}", archiveDir);
CompressionUtil.zipDir(archiveDir, zipFile, true);
Path hdfsTmpZipPath = new Path(workflowConfiguration.getHdfsTempDir(), zipFileName);
LOG.info("Zipping archive {} to HDFS temporary location {}", archiveDir, hdfsTmpZipPath);

// add the large download data files to the zip stream
if (!configuration.isSmallDownload()) {
appendPreCompressedFiles(zipFile);
try (
FSDataOutputStream zipped = targetFs.create(hdfsTmpZipPath, true);
ModalZipOutputStream zos = new ModalZipOutputStream(new BufferedOutputStream(zipped, 10*1024*1024))
) {
zipLocalFiles(zos);

// add the large download data files to the zip stream
if (!configuration.isSmallDownload()) {
appendPreCompressedFiles(zos);
}

zos.finish();
}
targetFs.moveFromLocalFile(new Path(zipFile.getPath()),
new Path(workflowConfiguration.getHdfsOutputPath(), zipFile.getName()));

LOG.info("Moving Zip from HDFS temporary location to final destination.");
targetFs.rename(hdfsTmpZipPath,
new Path(workflowConfiguration.getHdfsOutputPath(), zipFileName));

} catch (IOException e) {
throw new DownloadException(e);
Expand All @@ -246,6 +256,30 @@ public void buildArchive(File zipFile) throws DownloadException {

}

/**
* Merges the file using the standard java libraries java.util.zip.
*/
private void zipLocalFiles(ModalZipOutputStream zos) {
try {
Collection<File> files = org.apache.commons.io.FileUtils.listFiles(archiveDir, null, true);

for (File f : files) {
LOG.debug("Adding local file {} to archive", f);
FileInputStream fileInZipInputStream = new FileInputStream(f);
String zipPath = StringUtils.removeStart(f.getAbsolutePath(), archiveDir.getAbsolutePath() + File.separator);
ZipEntry entry = new ZipEntry(zipPath);
zos.putNextEntry(entry, ModalZipOutputStream.MODE.DEFAULT);
ByteStreams.copy(fileInZipInputStream, zos);
fileInZipInputStream.close();
}
zos.closeEntry();

} catch (Exception ex) {
//LOG.error(ERROR_ZIP_MSG, ex);
throw Throwables.propagate(ex);
}
}

public void createEmlFile(UUID constituentId, File emlDir) {
try (InputStream in = datasetService.getMetadataDocument(constituentId)) {
// store dataset EML as constituent metadata
Expand Down Expand Up @@ -300,52 +334,24 @@ protected DataDescription createDataDescription() {
}

/**
* Rewrites the zip file by opening the original and appending the pre-compressed content on the fly.
* Append the pre-compressed content to the zip stream
*/
private void appendPreCompressedFiles(File zipFile) throws IOException {

LOG.info("Appending pre-compressed occurrence content to the Zip: {}", zipFile.getAbsolutePath());

File tempZip = new File(archiveDir, zipFile.getName() + ".part");
boolean renameOk = zipFile.renameTo(tempZip);
if (renameOk) {
try (ZipInputStream zin = new ZipInputStream(new FileInputStream(tempZip));
ModalZipOutputStream out = new ModalZipOutputStream(new BufferedOutputStream(new FileOutputStream(zipFile)))
) {

// copy existing entries
ZipEntry entry = zin.getNextEntry();
while (entry != null) {
out.putNextEntry(new org.gbif.hadoop.compress.d2.zip.ZipEntry(entry.getName()),
ModalZipOutputStream.MODE.DEFAULT);
ByteStreams.copy(zin, out);
entry = zin.getNextEntry();
}

// NOTE: hive lowercases all the paths
appendPreCompressedFile(out,
new Path(configuration.getInterpretedDataFileName()),
INTERPRETED_FILENAME,
HeadersFileUtil.getInterpretedTableHeader());
appendPreCompressedFile(out,
new Path(configuration.getVerbatimDataFileName()),
VERBATIM_FILENAME,
HeadersFileUtil.getVerbatimTableHeader());
appendPreCompressedFile(out,
new Path(configuration.getMultimediaDataFileName()),
MULTIMEDIA_FILENAME,
HeadersFileUtil.getMultimediaTableHeader());

} finally {
// we've rewritten so remove the original
if (tempZip != null) {
tempZip.delete();
}
}

} else {
throw new IllegalStateException("Unable to rename existing zip, to allow appending occurrence data");
}
private void appendPreCompressedFiles(ModalZipOutputStream out) throws IOException {
LOG.info("Appending pre-compressed occurrence content to the Zip");

// NOTE: hive lowercases all the paths
appendPreCompressedFile(out,
new Path(configuration.getInterpretedDataFileName()),
INTERPRETED_FILENAME,
HeadersFileUtil.getInterpretedTableHeader());
appendPreCompressedFile(out,
new Path(configuration.getVerbatimDataFileName()),
VERBATIM_FILENAME,
HeadersFileUtil.getVerbatimTableHeader());
appendPreCompressedFile(out,
new Path(configuration.getMultimediaDataFileName()),
MULTIMEDIA_FILENAME,
HeadersFileUtil.getMultimediaTableHeader());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ public class ArchiveDownloadAction {
* Entry point for assembling the DWC archive.
* The thrown exception is the only way of telling Oozie that this job has failed.
*
* The expected parameters are:
* 0. downloadKey: occurrence download key.
* 1. username: download user
* 2. predicate: download query filter
* 3. isSmallDownload
* 4. download table/file name
*
* @throws java.io.IOException if any read/write operation failed
*/
public static void main(String[] args) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ private DefaultSettings() {
public static final String DOWNLOAD_PASSWORD_KEY = PROPERTIES_PREFIX + "ws.password";
public static final String DOWNLOAD_LINK_KEY = PROPERTIES_PREFIX + "link";
public static final String HDFS_OUTPUT_PATH_KEY = PROPERTIES_PREFIX + "hdfsOutputPath";
public static final String HDFS_TMP_DIR_KEY = PROPERTIES_PREFIX + "hdfs.tmp.dir";
public static final String TMP_DIR_KEY = PROPERTIES_PREFIX + "tmp.dir";
public static final String HIVE_DB_PATH_KEY = PROPERTIES_PREFIX + "hive.hdfs.out";

Expand Down

0 comments on commit c365017

Please sign in to comment.