Skip to content

Commit

Permalink
MET-6263 more javadocs mainly for operators and validators
Browse files Browse the repository at this point in the history
  • Loading branch information
pWoz committed Dec 17, 2024
1 parent 4a30ac9 commit 87f6a5a
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import lombok.Data;

/**
* Contains information about job. Used by {@link eu.europeana.cloud.flink.client.JobExecutor}
* while monitoring job progress
*/
@Data
public class JobDetails {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
import lombok.Builder;
import lombok.Value;

/**
* Describes arguments used while submitting job to the JobManager
*/
@Value
@Builder
public class SubmitJobRequest {

private String entryClass;
private String parallelism;
private String programArgs;
private String savepointPath;
private boolean allowNonRestoredState;
String entryClass;
String parallelism;
String programArgs;
String savepointPath;
boolean allowNonRestoredState;

public static class SubmitJobRequestBuilder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import eu.europeana.processing.config.FlinkConfigurationProperties;
import eu.europeana.processing.config.JobsConfigurationProperties;
import eu.europeana.processing.job.JobParamValue;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -79,40 +78,40 @@ public static void createExecutor(@Autowired FlinkConfigurationProperties proper
}

@Test
public void step1_shouldExecuteOAIHarvestComplietellyWithoutErrors() throws Exception {
void step1_shouldExecuteOAIHarvestComplietellyWithoutErrors() throws Exception {

executeStep(1, "eu.europeana.processing.oai.OAIJob",
Map.of(OAI_REPOSITORY_URL, sourceProperties.getUrl(), SET_SPEC, sourceProperties.getSetSpec(), METADATA_PREFIX,
sourceProperties.getMetadataPrefix()));
}

@Test
public void step2_shouldExecuteExternalValidationWithoutErrors() throws Exception {
void step2_shouldExecuteExternalValidationWithoutErrors() throws Exception {
executeStep(2, "eu.europeana.processing.validation.ValidationJob",
Map.of(VALIDATION_TYPE, JobParamValue.VALIDATION_EXTERNAL));
}


@Test
public void step3_shouldExecuteXsltTransformationWithoutErrors() throws Exception {
void step3_shouldExecuteXsltTransformationWithoutErrors() throws Exception {
executeStep(3, "eu.europeana.processing.transformation.TransformationJob",
Map.of(METIS_DATASET_NAME, "idA_metisDatasetNameA", METIS_DATASET_COUNTRY, "Greece", METIS_DATASET_LANGUAGE, "el",
METIS_XSLT_URL, "https://metis-core-rest.test.eanadev.org/datasets/xslt/6204e5e2514e773e6745f7e9"));
}

@Test
public void step4_shouldExecuteIternalValidationWithoutErrors() throws Exception {
void step4_shouldExecuteIternalValidationWithoutErrors() throws Exception {
executeStep(4, "eu.europeana.processing.validation.ValidationJob",
Map.of(VALIDATION_TYPE, JobParamValue.VALIDATION_INTERNAL));
}

@Test
public void step5_shouldExecuteNormalizationWithoutErrors() throws Exception {
void step5_shouldExecuteNormalizationWithoutErrors() throws Exception {
executeStep(5, "eu.europeana.processing.normalization.NormalizationJob", Collections.emptyMap());
}

@Test
public void step6_shouldExecuteEnrichmentWithoutErrors() throws Exception {
void step6_shouldExecuteEnrichmentWithoutErrors() throws Exception {
executeStep(6, "eu.europeana.processing.enrichment.EnrichmentJob",
Map.of(DEREFERENCE_SERVICE_URL, jobsConfigurationProperties.getEnrichment().getDereferenceUrl(),
ENRICHMENT_ENTITY_MANAGEMENT_URL, jobsConfigurationProperties.getEnrichment().getEntityManagementUrl(),
Expand All @@ -122,12 +121,12 @@ public void step6_shouldExecuteEnrichmentWithoutErrors() throws Exception {
}

@Test
public void step7_shouldExecuteMediaWithoutErrors() throws Exception {
void step7_shouldExecuteMediaWithoutErrors() throws Exception {
executeStep(7, "eu.europeana.processing.media.MediaJob", Collections.emptyMap());
}

@Test
public void step8_shouldExecuteIndexingWithoutErrors() throws Exception {
void step8_shouldExecuteIndexingWithoutErrors() throws Exception {
Map<String, String> specialParameters = new HashMap<>();
specialParameters.put(INDEXING_PRESERVETIMESTAMPS, jobsConfigurationProperties.getIndexing().getPreserveTimestamps());
specialParameters.put(INDEXING_PERFORMREDIRECTS, jobsConfigurationProperties.getIndexing().getPerformRedirects());
Expand All @@ -153,7 +152,7 @@ public void step8_shouldExecuteIndexingWithoutErrors() throws Exception {
executeStep(8, "eu.europeana.cloud.job.indexing.IndexingJobWithPostgresMultiThreadedOperation", specialParameters);
}

public void executeStep(int stepNumber, String jobClass, Map<String, String> specialParameters)
void executeStep(int stepNumber, String jobClass, Map<String, String> specialParameters)
throws Exception {
beforeEach(stepNumber);
String datasetId = testProperties.getDatasetId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import java.util.stream.Collectors;

/**
* <p>Main operator for {@link eu.europeana.processing.enrichment.EnrichmentJob}.</p>
* <p>It is responsible for enriching records using {@link EnrichmentWorker}</p>
*/
public class EnrichmentOperator extends ProcessFunction<ExecutionRecord, ExecutionRecordResult> {

@Serial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import eu.europeana.processing.validation.JobParamValidator;
import org.apache.flink.api.java.utils.ParameterTool;

/**
* Validates parameters provided for {@link eu.europeana.processing.enrichment.EnrichmentJob}
* during task startup.
*/
public class EnrichmentJobParamValidator implements JobParamValidator {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.util.Collections;
import java.util.Date;

/**
* <p>Main operator for {@link eu.europeana.processing.indexing.IndexingJob}.</p>
* <p>It uses {@link Indexer} to push records to Solr and Mongo</p>
*/
public class IndexingOperator extends ProcessFunction<ExecutionRecord, ExecutionRecordResult> {

@Serial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import eu.europeana.processing.validation.JobParamValidator;
import org.apache.flink.api.java.utils.ParameterTool;

/**
* Validates parameters provided for {@link eu.europeana.processing.indexing.IndexingJob}
* during task startup.
*/
public class IndexingJobParamValidator implements JobParamValidator {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

import static java.util.Objects.nonNull;

/**
* <p>Main operator for {@link eu.europeana.processing.media.MediaJob}.</p>
* <p>It uses Metis provided libraries: {@link RdfSerializer}, {@link RdfDeserializer}, {@link MediaExtractor}</p>
*/
public class MediaOperator extends ProcessFunction<ExecutionRecord, ExecutionRecordResult> {

@Serial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import eu.europeana.processing.validation.JobParamValidator;
import org.apache.flink.api.java.utils.ParameterTool;

/**
* Validates parameters provided for {@link eu.europeana.processing.media.MediaJob}
* during task startup.
*/

public class MediaJobParamValidator implements JobParamValidator {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <p>Main operator for {@link eu.europeana.processing.normalization.NormalizationJob}.</p>
* <p>It uses Metis provided {@link Normalizer}</p>
*/
public class NormalizationOperator extends ProcessFunction<ExecutionRecord, ExecutionRecordResult> {

@Serial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import eu.europeana.processing.validation.JobParamValidator;
import org.apache.flink.api.java.utils.ParameterTool;

/**
* Validates parameters provided for {@link eu.europeana.processing.normalization.NormalizationJob}
* during task startup.
*/

public class NormalizationJobParamValidator implements JobParamValidator {
@Override
public void validate(ParameterTool parameterTool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Operator used by {@link eu.europeana.processing.oai.OAIJob} to generate record identifiers
* based on Europeana policies.
*/
public class IdAssigningOperator extends ProcessFunction<ExecutionRecordResult, ExecutionRecordResult> {

@Serial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import java.nio.charset.StandardCharsets;
import java.time.Instant;

/**
* Operator used by {@link eu.europeana.processing.oai.OAIJob} responsible for downloading records
* via OAI endpoint.
*/
public class RecordHarvestingOperator extends ProcessFunction<OaiRecordHeader, ExecutionRecordResult> {

@Serial
Expand Down

0 comments on commit 87f6a5a

Please sign in to comment.