Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MET-6103 Cleanup deprecations on http harvesting #690

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package eu.europeana.metis.harvesting.http;

import eu.europeana.metis.harvesting.FullRecord;
import eu.europeana.metis.harvesting.HarvesterException;
import eu.europeana.metis.harvesting.HarvestingIterator;
import eu.europeana.metis.harvesting.ReportingIteration;
import eu.europeana.metis.harvesting.ReportingIteration.IterationResult;
import eu.europeana.metis.utils.CompressedFileExtension;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Iterator for harvesting
*/
abstract class AbstractHttpHarvestIterator<R> implements HarvestingIterator<R, Path> {

private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Path extractedDirectory;

protected AbstractHttpHarvestIterator(Path extractedDirectory) {
Objects.requireNonNull(extractedDirectory,"Extracted directory is null. This should not happen.");
this.extractedDirectory = extractedDirectory;
}

protected String getExtractedDirectory() {
return extractedDirectory.toString();
}

@Override
public void close() {
try {
FileUtils.deleteDirectory(extractedDirectory.toFile());
} catch (IOException e) {
LOGGER.warn("Could not delete directory.", e);
}
}

/**
* Iterate through the record paths while applying a filter (potentially skipping some records).
*
* @param action The iteration to perform. It needs to return a result.
* @param filter The filter to apply (only records that return true will be sent to the action).
* @throws HarvesterException In case there was a problem while harvesting.
*/
public void forEachPathFiltered(ReportingIteration<Path> action, Predicate<Path> filter)
throws HarvesterException {
try {
Files.walkFileTree(extractedDirectory, new FileIteration(action, filter));
} catch (IOException e) {
throw new HarvesterException("Exception while iterating through the extracted files.", e);
}
}

/**
* Iterate through the {@link FullRecord} while applying a filter (potentially skipping some records).
*
* @param action The iteration to perform. It needs to return a result.
* @param filter The filter to apply (only records that return true will be sent to the action).
* @throws HarvesterException In case there was a problem while harvesting.
*/
public void forEachFileFiltered(ReportingIteration<FullRecord> action, Predicate<Path> filter)
throws HarvesterException {
forEachPathFiltered(path -> {
try (InputStream content = Files.newInputStream(path)) {
return action.process(new FullRecordImpl(extractedDirectory.relativize(path).toString(),
new ByteArrayInputStream(IOUtils.toByteArray(content))));
} catch (RuntimeException e) {
throw new IOException("Could not process path " + path + ".", e);
}
}, filter);
}

@Override
public void forEachNonDeleted(ReportingIteration<R> action) throws HarvesterException {
forEach(action);
}

@Override
public Integer countRecords() throws HarvesterException {
// Go by each path only: no need to inspect the full file.
final AtomicInteger counter = new AtomicInteger(0);
forEachPathFiltered(path -> {
counter.incrementAndGet();
return IterationResult.CONTINUE;
}, path -> true);
return counter.get();
}

private static class FileIteration extends SimpleFileVisitor<Path> {

private static final String MAC_TEMP_FILE = ".DS_Store";
private static final String MAC_TEMP_FOLDER = "__MACOSX";

private final ReportingIteration<Path> action;
private final Predicate<Path> filter;

public FileIteration(ReportingIteration<Path> action, Predicate<Path> filter) {
this.action = action;
this.filter = filter;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (!filter.test(file)) {
return FileVisitResult.CONTINUE;
}
final Path fileName = file.getFileName();
if (fileName != null && MAC_TEMP_FILE.equals(fileName.toString())) {
return FileVisitResult.CONTINUE;
}
if (CompressedFileExtension.forPath(file) != null) {
return FileVisitResult.CONTINUE;
}
final IterationResult result = action.process(file);
if (result == null) {
throw new IllegalArgumentException("Iteration result cannot be null.");
}
return result == IterationResult.TERMINATE ? FileVisitResult.TERMINATE
: FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
final Path dirName = dir.getFileName();
if (dirName != null && MAC_TEMP_FOLDER.equals(dirName.toString())) {
return FileVisitResult.SKIP_SUBTREE;
}
return FileVisitResult.CONTINUE;
}
}

private record FullRecordImpl(String relativeFilePath, ByteArrayInputStream entryContent) implements FullRecord {

@Override
public String getHarvestingIdentifier() {
return relativeFilePath;
}

@Override
public void writeContent(OutputStream outputStream) throws IOException {
IOUtils.copy(entryContent, outputStream);
}

@Override
public ByteArrayInputStream getContent() {
return entryContent;
}

@Override
public boolean isDeleted() {
return false;
}

@Override
public Instant getTimeStamp() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import eu.europeana.metis.harvesting.HarvestingIterator;
import eu.europeana.metis.harvesting.ReportingIteration;
import eu.europeana.metis.utils.CompressedFileExtension;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.function.Consumer;

/**
* Implementations of this interface provide the functionality to harvest from HTTP (compressed archive).
Expand All @@ -27,7 +25,7 @@ public interface HttpHarvester {
* @return An iterator that provides access to the decompressed records.
* @throws HarvesterException In case there was an issue during the harvest.
*/
HttpRecordIterator harvestRecords(String archiveUrl, String downloadDirectory)
HarvestingIterator<Path, Path> harvestRecords(String archiveUrl, String downloadDirectory)
throws HarvesterException;

/**
Expand All @@ -41,34 +39,7 @@ HttpRecordIterator harvestRecords(String archiveUrl, String downloadDirectory)
* @throws HarvesterException In case there was an issue during the harvest.
*/
void harvestFullRecords(InputStream inputStream, CompressedFileExtension compressedFileType,
ReportingIteration<ArchiveEntry> action) throws HarvesterException;

/**
* Harvest from HTTP (compressed archive). This is a convenience method for {@link #harvestRecords(String, String)} that copies
* the input stream to a temporary file (in the system's temporary directory) first. An attempt will be made to remove the
* temporary file before this method returns.
*
* @param inputStream The input stream containing the compressed archive.
* @param compressedFileType The type of the archive.
* @param action The action to be performed.
* @throws HarvesterException In case there was an issue during the harvest.
* @deprecated Use {@link #harvestFullRecords(InputStream, CompressedFileExtension, ReportingIteration)} instead.
*/
@Deprecated
void harvestRecords(InputStream inputStream, CompressedFileExtension compressedFileType,
Consumer<ArchiveEntry> action) throws HarvesterException;

/**
* It creates a {@link HttpRecordIterator} with a InputStream into a temporary file directory. When finished using the created
* iterator, the iterator should be closed to clean up leftover files.
*
* @param input The input stream from which we create the iterator
* @param compressedFileType The type of compressed file type
* @return A HttpRecordIterator based on a temporary file location
* @throws HarvesterException In case there is an issue while using the input stream
*/
HttpRecordIterator createTemporaryHttpHarvestIterator(InputStream input,
CompressedFileExtension compressedFileType) throws HarvesterException;
ReportingIteration<FullRecord> action) throws HarvesterException;

/**
* It creates a {@link HarvestingIterator} with a InputStream into a temporary file directory. When finished using the created
Expand All @@ -79,29 +50,7 @@ HttpRecordIterator createTemporaryHttpHarvestIterator(InputStream input,
* @return A HttpRecordIterator based on a temporary file location
* @throws HarvesterException In case there is an issue while using the input stream
*/
FullRecordHarvestingIterator<ArchiveEntry, Path> createFullRecordHarvestIterator(InputStream input,
FullRecordHarvestingIterator<FullRecord, Path> createFullRecordHarvestIterator(InputStream input,
CompressedFileExtension compressedFileType) throws HarvesterException;

/**
* An object representing an entry in a file archive. The harvesting identifier is the file name
* (including the path relative to the archive root).
*/
interface ArchiveEntry extends FullRecord {
stzanakis marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return The name of the entry. This is the file name (including extension, excluding the path).
* @deprecated Use {@link #getHarvestingIdentifier()} instead.
*/
@Deprecated
default String getEntryName() {
return Path.of(getHarvestingIdentifier()).getFileName().toString();
}

/**
* @return The content of the entry (in memory).
* @deprecated Use {@link #getContent()} instead.
*/
@Deprecated
ByteArrayInputStream getEntryContent();
}
}
Loading