diff --git a/.run/OAIJob.run.xml b/.run/OAIJob.run.xml new file mode 100644 index 0000000..9b8586d --- /dev/null +++ b/.run/OAIJob.run.xml @@ -0,0 +1,18 @@ + + + + \ No newline at end of file diff --git a/.run/ValidationJob.run.xml b/.run/ValidationJob.run.xml new file mode 100644 index 0000000..54363ca --- /dev/null +++ b/.run/ValidationJob.run.xml @@ -0,0 +1,18 @@ + + + + \ No newline at end of file diff --git a/flink-poc-client/pom.xml b/client/pom.xml similarity index 75% rename from flink-poc-client/pom.xml rename to client/pom.xml index 66f9000..3bd2900 100644 --- a/flink-poc-client/pom.xml +++ b/client/pom.xml @@ -3,16 +3,15 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - eu.europeana.cloud.flink - flink-poc-client - 1-SNAPSHOT - + metis-processing-engine-client + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + - 21 - 21 - UTF-8 6.1.4 diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/JobExecutor.java b/client/src/main/java/eu/europeana/cloud/flink/client/JobExecutor.java similarity index 100% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/JobExecutor.java rename to client/src/main/java/eu/europeana/cloud/flink/client/JobExecutor.java diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/entities/JobDetails.java b/client/src/main/java/eu/europeana/cloud/flink/client/entities/JobDetails.java similarity index 100% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/entities/JobDetails.java rename to client/src/main/java/eu/europeana/cloud/flink/client/entities/JobDetails.java diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobRequest.java b/client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobRequest.java similarity index 100% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobRequest.java rename to client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobRequest.java diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobResponse.java b/client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobResponse.java similarity index 100% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobResponse.java rename to client/src/main/java/eu/europeana/cloud/flink/client/entities/SubmitJobResponse.java diff --git a/commons/pom.xml b/commons/pom.xml new file mode 100644 index 0000000..ca5a221 --- /dev/null +++ b/commons/pom.xml @@ -0,0 +1,71 @@ + + + 4.0.0 + + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-commons + + + 21 + 21 + UTF-8 + + + + + org.projectlombok + lombok + 1.18.30 + provided + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + com.zaxxer + HikariCP + 5.1.0 + + + + byte-buddy + net.bytebuddy + ${version.bytebuddy} + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.22.1 + + + + + org.junit.jupiter + junit-jupiter-api + 5.7.2 + test + + + + + org.postgresql + postgresql + 42.7.3 + + + + + \ No newline at end of file diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/DbConnectionProvider.java b/commons/src/main/java/eu/europeana/processing/DbConnectionProvider.java similarity index 87% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/DbConnectionProvider.java rename to commons/src/main/java/eu/europeana/processing/DbConnectionProvider.java index 639d8e8..8727a3b 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/DbConnectionProvider.java +++ b/commons/src/main/java/eu/europeana/processing/DbConnectionProvider.java @@ -1,14 +1,14 @@ -package eu.europeana.cloud.tool; +package eu.europeana.processing; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import java.sql.SQLException; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; import java.io.Serializable; import java.sql.Connection; +import java.sql.SQLException; public class DbConnectionProvider implements Serializable, AutoCloseable { @@ -17,6 +17,7 @@ public class DbConnectionProvider implements Serializable, AutoCloseable { public DbConnectionProvider(ParameterTool parameterTool) { HikariConfig config=new HikariConfig(); + config.setDriverClassName("org.postgresql.Driver"); config.setJdbcUrl(parameterTool.getRequired(JobParamName.DATASOURCE_URL)); config.setUsername(parameterTool.get(JobParamName.DATASOURCE_USERNAME)); config.setPassword(parameterTool.get(JobParamName.DATASOURCE_PASSWORD)); diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/common/MetisJob.java b/commons/src/main/java/eu/europeana/processing/MetisJob.java similarity index 84% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/common/MetisJob.java rename to commons/src/main/java/eu/europeana/processing/MetisJob.java index 73763eb..288f568 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/common/MetisJob.java +++ b/commons/src/main/java/eu/europeana/processing/MetisJob.java @@ -1,16 +1,15 @@ -package eu.europeana.cloud.common; - -import eu.europeana.cloud.flink.client.constants.postgres.JobParam; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.model.TaskInfo; -import eu.europeana.cloud.repository.TaskInfoRepository; -import eu.europeana.cloud.retryable.RetryableMethodExecutor; -import eu.europeana.cloud.sink.DbSinkFunction; -import eu.europeana.cloud.source.DbSourceWithProgressHandling; -import eu.europeana.cloud.tool.DbConnectionProvider; -import eu.europeana.cloud.tool.validation.JobParamValidatorFactory; +package eu.europeana.processing; + +import eu.europeana.processing.job.JobParam; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.model.TaskInfo; +import eu.europeana.processing.repository.TaskInfoRepository; +import eu.europeana.processing.retryable.RetryableMethodExecutor; +import eu.europeana.processing.sink.DbSinkFunction; +import eu.europeana.processing.source.DbSourceWithProgressHandling; +import eu.europeana.processing.validation.JobParamValidatorFactory; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobName.java b/commons/src/main/java/eu/europeana/processing/job/JobName.java similarity index 90% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobName.java rename to commons/src/main/java/eu/europeana/processing/job/JobName.java index 53615b7..c74c80b 100644 --- a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobName.java +++ b/commons/src/main/java/eu/europeana/processing/job/JobName.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.flink.client.constants.postgres; +package eu.europeana.processing.job; public class JobName { diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParam.java b/commons/src/main/java/eu/europeana/processing/job/JobParam.java similarity index 84% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParam.java rename to commons/src/main/java/eu/europeana/processing/job/JobParam.java index 24ea142..2b47315 100644 --- a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParam.java +++ b/commons/src/main/java/eu/europeana/processing/job/JobParam.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.flink.client.constants.postgres; +package eu.europeana.processing.job; public class JobParam { diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParamName.java b/commons/src/main/java/eu/europeana/processing/job/JobParamName.java similarity index 98% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParamName.java rename to commons/src/main/java/eu/europeana/processing/job/JobParamName.java index c4cdeb9..c1768d4 100644 --- a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParamName.java +++ b/commons/src/main/java/eu/europeana/processing/job/JobParamName.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.flink.client.constants.postgres; +package eu.europeana.processing.job; public class JobParamName { diff --git a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParamValue.java b/commons/src/main/java/eu/europeana/processing/job/JobParamValue.java similarity index 78% rename from flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParamValue.java rename to commons/src/main/java/eu/europeana/processing/job/JobParamValue.java index b4bff7e..95668fe 100644 --- a/flink-poc-client/src/main/java/eu/europeana/cloud/flink/client/constants/postgres/JobParamValue.java +++ b/commons/src/main/java/eu/europeana/processing/job/JobParamValue.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.flink.client.constants.postgres; +package eu.europeana.processing.job; public class JobParamValue { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/DataPartition.java b/commons/src/main/java/eu/europeana/processing/model/DataPartition.java similarity index 89% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/model/DataPartition.java rename to commons/src/main/java/eu/europeana/processing/model/DataPartition.java index 57a59c6..c9021f9 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/DataPartition.java +++ b/commons/src/main/java/eu/europeana/processing/model/DataPartition.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.model; +package eu.europeana.processing.model; import org.apache.flink.api.connector.source.SourceSplit; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecord.java b/commons/src/main/java/eu/europeana/processing/model/ExecutionRecord.java similarity index 84% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecord.java rename to commons/src/main/java/eu/europeana/processing/model/ExecutionRecord.java index 5af0278..576bb71 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecord.java +++ b/commons/src/main/java/eu/europeana/processing/model/ExecutionRecord.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.model; +package eu.europeana.processing.model; import lombok.Builder; import lombok.Data; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecordKey.java b/commons/src/main/java/eu/europeana/processing/model/ExecutionRecordKey.java similarity index 86% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecordKey.java rename to commons/src/main/java/eu/europeana/processing/model/ExecutionRecordKey.java index 531a1b4..4160247 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecordKey.java +++ b/commons/src/main/java/eu/europeana/processing/model/ExecutionRecordKey.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.model; +package eu.europeana.processing.model; import lombok.Builder; import lombok.Data; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecordResult.java b/commons/src/main/java/eu/europeana/processing/model/ExecutionRecordResult.java similarity index 98% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecordResult.java rename to commons/src/main/java/eu/europeana/processing/model/ExecutionRecordResult.java index d3f20f8..1878d70 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/ExecutionRecordResult.java +++ b/commons/src/main/java/eu/europeana/processing/model/ExecutionRecordResult.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.model; +package eu.europeana.processing.model; import lombok.Builder; import lombok.Data; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/TaskInfo.java b/commons/src/main/java/eu/europeana/processing/model/TaskInfo.java similarity index 66% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/model/TaskInfo.java rename to commons/src/main/java/eu/europeana/processing/model/TaskInfo.java index f1cb667..c3e1f42 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/model/TaskInfo.java +++ b/commons/src/main/java/eu/europeana/processing/model/TaskInfo.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.model; +package eu.europeana.processing.model; public record TaskInfo(long taskId, long commitCount, long writeCount) { diff --git a/commons/src/main/java/eu/europeana/processing/repository/DbRepository.java b/commons/src/main/java/eu/europeana/processing/repository/DbRepository.java new file mode 100644 index 0000000..6512838 --- /dev/null +++ b/commons/src/main/java/eu/europeana/processing/repository/DbRepository.java @@ -0,0 +1,5 @@ +package eu.europeana.processing.repository; + +public interface DbRepository { + +} diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/ExecutionRecordExceptionLogRepository.java b/commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordExceptionLogRepository.java similarity index 92% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/repository/ExecutionRecordExceptionLogRepository.java rename to commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordExceptionLogRepository.java index 3516964..0d3af07 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/ExecutionRecordExceptionLogRepository.java +++ b/commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordExceptionLogRepository.java @@ -1,9 +1,11 @@ -package eu.europeana.cloud.repository; +package eu.europeana.processing.repository; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.retryable.Retryable; -import eu.europeana.cloud.tool.DbConnectionProvider; +import eu.europeana.processing.DbConnectionProvider; +import eu.europeana.processing.retryable.Retryable; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -11,8 +13,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Retryable(delay = 5000, maxAttempts = 5) public class ExecutionRecordExceptionLogRepository implements DbRepository, Serializable { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/ExecutionRecordRepository.java b/commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordRepository.java similarity index 93% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/repository/ExecutionRecordRepository.java rename to commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordRepository.java index edee8aa..a680ca9 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/ExecutionRecordRepository.java +++ b/commons/src/main/java/eu/europeana/processing/repository/ExecutionRecordRepository.java @@ -1,10 +1,12 @@ -package eu.europeana.cloud.repository; +package eu.europeana.processing.repository; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordKey; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.retryable.Retryable; -import eu.europeana.cloud.tool.DbConnectionProvider; +import eu.europeana.processing.DbConnectionProvider; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordKey; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.retryable.Retryable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -14,8 +16,6 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Retryable(delay = 5000, maxAttempts = 5) public class ExecutionRecordRepository implements DbRepository, Serializable { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/TaskInfoRepository.java b/commons/src/main/java/eu/europeana/processing/repository/TaskInfoRepository.java similarity index 94% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/repository/TaskInfoRepository.java rename to commons/src/main/java/eu/europeana/processing/repository/TaskInfoRepository.java index b3aa893..70f3138 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/TaskInfoRepository.java +++ b/commons/src/main/java/eu/europeana/processing/repository/TaskInfoRepository.java @@ -1,8 +1,8 @@ -package eu.europeana.cloud.repository; +package eu.europeana.processing.repository; -import eu.europeana.cloud.model.TaskInfo; -import eu.europeana.cloud.retryable.Retryable; -import eu.europeana.cloud.tool.DbConnectionProvider; +import eu.europeana.processing.DbConnectionProvider; +import eu.europeana.processing.model.TaskInfo; +import eu.europeana.processing.retryable.Retryable; import java.io.Serializable; import java.sql.Connection; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryInterruptedException.java b/commons/src/main/java/eu/europeana/processing/retryable/RetryInterruptedException.java similarity index 82% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryInterruptedException.java rename to commons/src/main/java/eu/europeana/processing/retryable/RetryInterruptedException.java index bea648f..7cb224c 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryInterruptedException.java +++ b/commons/src/main/java/eu/europeana/processing/retryable/RetryInterruptedException.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.retryable; +package eu.europeana.processing.retryable; public class RetryInterruptedException extends RuntimeException { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/Retryable.java b/commons/src/main/java/eu/europeana/processing/retryable/Retryable.java similarity index 95% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/Retryable.java rename to commons/src/main/java/eu/europeana/processing/retryable/Retryable.java index 134496e..69738a1 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/Retryable.java +++ b/commons/src/main/java/eu/europeana/processing/retryable/Retryable.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.retryable; +package eu.europeana.processing.retryable; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryableMethodExecutor.java b/commons/src/main/java/eu/europeana/processing/retryable/RetryableMethodExecutor.java similarity index 99% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryableMethodExecutor.java rename to commons/src/main/java/eu/europeana/processing/retryable/RetryableMethodExecutor.java index 0bf2e5f..1781712 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryableMethodExecutor.java +++ b/commons/src/main/java/eu/europeana/processing/retryable/RetryableMethodExecutor.java @@ -1,9 +1,5 @@ -package eu.europeana.cloud.retryable; +package eu.europeana.processing.retryable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Optional; import net.bytebuddy.ByteBuddy; import net.bytebuddy.dynamic.DynamicType.Unloaded; import net.bytebuddy.implementation.InvocationHandlerAdapter; @@ -11,6 +7,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Optional; + //TODO This class was copied from the eCloud, we should dependencies and share the code if we decide to //use Flink on production public class RetryableMethodExecutor { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryableProxyCreateException.java b/commons/src/main/java/eu/europeana/processing/retryable/RetryableProxyCreateException.java similarity index 82% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryableProxyCreateException.java rename to commons/src/main/java/eu/europeana/processing/retryable/RetryableProxyCreateException.java index 2f91159..2284af2 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/retryable/RetryableProxyCreateException.java +++ b/commons/src/main/java/eu/europeana/processing/retryable/RetryableProxyCreateException.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.retryable; +package eu.europeana.processing.retryable; public class RetryableProxyCreateException extends RuntimeException { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/sink/DbSinkFunction.java b/commons/src/main/java/eu/europeana/processing/sink/DbSinkFunction.java similarity index 87% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/sink/DbSinkFunction.java rename to commons/src/main/java/eu/europeana/processing/sink/DbSinkFunction.java index a5528a6..f5b62af 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/sink/DbSinkFunction.java +++ b/commons/src/main/java/eu/europeana/processing/sink/DbSinkFunction.java @@ -1,10 +1,10 @@ -package eu.europeana.cloud.sink; +package eu.europeana.processing.sink; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.repository.ExecutionRecordExceptionLogRepository; -import eu.europeana.cloud.repository.ExecutionRecordRepository; -import eu.europeana.cloud.retryable.RetryableMethodExecutor; -import eu.europeana.cloud.tool.DbConnectionProvider; +import eu.europeana.processing.DbConnectionProvider; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.repository.ExecutionRecordExceptionLogRepository; +import eu.europeana.processing.repository.ExecutionRecordRepository; +import eu.europeana.processing.retryable.RetryableMethodExecutor; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbEnumerator.java b/commons/src/main/java/eu/europeana/processing/source/DbEnumerator.java similarity index 92% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbEnumerator.java rename to commons/src/main/java/eu/europeana/processing/source/DbEnumerator.java index f96b7f7..29bd208 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbEnumerator.java +++ b/commons/src/main/java/eu/europeana/processing/source/DbEnumerator.java @@ -1,13 +1,16 @@ -package eu.europeana.cloud.source; - -import eu.europeana.cloud.model.DataPartition; -import eu.europeana.cloud.model.TaskInfo; -import eu.europeana.cloud.repository.ExecutionRecordRepository; -import eu.europeana.cloud.repository.TaskInfoRepository; -import eu.europeana.cloud.retryable.RetryableMethodExecutor; -import eu.europeana.cloud.tool.DbConnectionProvider; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +package eu.europeana.processing.source; + +import eu.europeana.processing.DbConnectionProvider; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.DataPartition; +import eu.europeana.processing.model.TaskInfo; +import eu.europeana.processing.repository.ExecutionRecordRepository; +import eu.europeana.processing.repository.TaskInfoRepository; +import eu.europeana.processing.retryable.RetryableMethodExecutor; +import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; @@ -17,14 +20,9 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.utils.ParameterTool; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - public class DbEnumerator implements SplitEnumerator { private static final Logger LOGGER = LoggerFactory.getLogger(DbEnumerator.class); @@ -182,9 +180,9 @@ public DbEnumeratorState snapshotState(long checkpointId) { @Override public void notifyCheckpointComplete(long checkpointId) { - LOGGER.info("Checkpoint: {} completed. Updating progress... Map:{}", checkpointId,checkpointIdToFinishedRecordCountMap); + LOGGER.info("Checkpoint: {} completed. Updating progress... Map:{}", checkpointId, checkpointIdToFinishedRecordCountMap); SortedMap approvedProgresses = checkpointIdToFinishedRecordCountMap.headMap(checkpointId, true); - Map.Entry lastApprovedProgress = approvedProgresses.lastEntry(); + Entry lastApprovedProgress = approvedProgresses.lastEntry(); if (lastApprovedProgress != null) { //TODO Commit count is not strictly evaluated cause in case of restart of job in case of exception //This value is lost and set to the last snapshot value. So we need to increase it earlier @@ -252,7 +250,7 @@ private List getIncompletePartitionsSnapshot() { } private void validateTaskExists() { - if(taskInfoRepo.findById(taskId).isEmpty()){ + if (taskInfoRepo.findById(taskId).isEmpty()) { LOGGER.error("Task not found in the database. It should never happen."); System.exit(1); } @@ -282,14 +280,14 @@ private long getFinishedPartitionCount() { } /** - * Method trim partition to not contain already completed records. it is needed in case when we need - * to retry given partition for example after job restarting. The result of this method could be split - * with limit 0 in rare cases. + * Method trim partition to not contain already completed records. it is needed in case when we need to retry given partition + * for example after job restarting. The result of this method could be split with limit 0 in rare cases. + * * @param split - original split * @param info - info about progress * @return new trimmed split. */ - private static @NotNull DataPartition createSplitWithoutCompletedRecords(DataPartition split, SplitProgressInfo info) { + private static DataPartition createSplitWithoutCompletedRecords(DataPartition split, SplitProgressInfo info) { return new DataPartition(split.offset() + info.getEmittedRecordCount(), split.limit() - info.getEmittedRecordCount()); } diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbEnumeratorState.java b/commons/src/main/java/eu/europeana/processing/source/DbEnumeratorState.java similarity index 81% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbEnumeratorState.java rename to commons/src/main/java/eu/europeana/processing/source/DbEnumeratorState.java index 32cf084..47d6c0a 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbEnumeratorState.java +++ b/commons/src/main/java/eu/europeana/processing/source/DbEnumeratorState.java @@ -1,11 +1,12 @@ -package eu.europeana.cloud.source; +package eu.europeana.processing.source; -import eu.europeana.cloud.model.DataPartition; -import java.io.Serializable; -import java.util.List; +import eu.europeana.processing.model.DataPartition; import lombok.Builder; import lombok.Data; +import java.io.Serializable; +import java.util.List; + @Data @Builder public class DbEnumeratorState implements Serializable { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbReaderWithProgressHandling.java b/commons/src/main/java/eu/europeana/processing/source/DbReaderWithProgressHandling.java similarity index 95% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbReaderWithProgressHandling.java rename to commons/src/main/java/eu/europeana/processing/source/DbReaderWithProgressHandling.java index 1badbc1..d91404c 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbReaderWithProgressHandling.java +++ b/commons/src/main/java/eu/europeana/processing/source/DbReaderWithProgressHandling.java @@ -1,13 +1,17 @@ -package eu.europeana.cloud.source; - -import eu.europeana.cloud.flink.client.constants.postgres.JobParam; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import eu.europeana.cloud.model.DataPartition; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.repository.ExecutionRecordRepository; -import eu.europeana.cloud.retryable.RetryableMethodExecutor; -import eu.europeana.cloud.tool.DbConnectionProvider; -import java.io.IOException; +package eu.europeana.processing.source; + +import eu.europeana.processing.DbConnectionProvider; +import eu.europeana.processing.job.JobParam; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.DataPartition; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.repository.ExecutionRecordRepository; +import eu.europeana.processing.retryable.RetryableMethodExecutor; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -16,7 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.stream.Collectors; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbSourceWithProgressHandling.java b/commons/src/main/java/eu/europeana/processing/source/DbSourceWithProgressHandling.java similarity index 83% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbSourceWithProgressHandling.java rename to commons/src/main/java/eu/europeana/processing/source/DbSourceWithProgressHandling.java index 8b88574..804f905 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/DbSourceWithProgressHandling.java +++ b/commons/src/main/java/eu/europeana/processing/source/DbSourceWithProgressHandling.java @@ -1,13 +1,21 @@ -package eu.europeana.cloud.source; - -import eu.europeana.cloud.model.DataPartition; -import eu.europeana.cloud.model.ExecutionRecord; -import org.apache.flink.api.connector.source.*; +package eu.europeana.processing.source; + +import eu.europeana.processing.model.DataPartition; +import eu.europeana.processing.model.ExecutionRecord; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.io.SimpleVersionedSerializer; -import java.io.*; - public class DbSourceWithProgressHandling implements Source { private final ParameterTool parameterTool; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/ProgressSnapshotEvent.java b/commons/src/main/java/eu/europeana/processing/source/ProgressSnapshotEvent.java similarity index 84% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/ProgressSnapshotEvent.java rename to commons/src/main/java/eu/europeana/processing/source/ProgressSnapshotEvent.java index 80c971b..38dacb5 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/ProgressSnapshotEvent.java +++ b/commons/src/main/java/eu/europeana/processing/source/ProgressSnapshotEvent.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.source; +package eu.europeana.processing.source; -import eu.europeana.cloud.model.DataPartition; +import eu.europeana.processing.model.DataPartition; import lombok.Value; import org.apache.flink.api.connector.source.SourceEvent; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/SourceConsistencyException.java b/commons/src/main/java/eu/europeana/processing/source/SourceConsistencyException.java similarity index 92% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/SourceConsistencyException.java rename to commons/src/main/java/eu/europeana/processing/source/SourceConsistencyException.java index c8ed4f4..9d7f983 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/SourceConsistencyException.java +++ b/commons/src/main/java/eu/europeana/processing/source/SourceConsistencyException.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.source; +package eu.europeana.processing.source; /** * The exception indicating that something was wrong with coordinating and the state is somehow inconsistent. diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/SplitCompletedEvent.java b/commons/src/main/java/eu/europeana/processing/source/SplitCompletedEvent.java similarity index 77% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/SplitCompletedEvent.java rename to commons/src/main/java/eu/europeana/processing/source/SplitCompletedEvent.java index 8c60ba1..08a2214 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/SplitCompletedEvent.java +++ b/commons/src/main/java/eu/europeana/processing/source/SplitCompletedEvent.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.source; +package eu.europeana.processing.source; -import eu.europeana.cloud.model.DataPartition; +import eu.europeana.processing.model.DataPartition; import lombok.Value; import org.apache.flink.api.connector.source.SourceEvent; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/SplitProgressInfo.java b/commons/src/main/java/eu/europeana/processing/source/SplitProgressInfo.java similarity index 97% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/SplitProgressInfo.java rename to commons/src/main/java/eu/europeana/processing/source/SplitProgressInfo.java index 1f03452..cf1f6de 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/SplitProgressInfo.java +++ b/commons/src/main/java/eu/europeana/processing/source/SplitProgressInfo.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.source; +package eu.europeana.processing.source; import lombok.Data; import org.apache.flink.api.connector.source.SourceEvent; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/EnrichmentJobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/EnrichmentJobParamValidator.java similarity index 83% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/EnrichmentJobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/EnrichmentJobParamValidator.java index 370fa04..5aab206 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/EnrichmentJobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/EnrichmentJobParamValidator.java @@ -1,9 +1,9 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; -public class EnrichmentJobParamValidator implements JobParamValidator{ +public class EnrichmentJobParamValidator implements JobParamValidator { @Override public void validate(ParameterTool parameterTool) { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/IndexingJobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/IndexingJobParamValidator.java similarity index 93% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/IndexingJobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/IndexingJobParamValidator.java index 8bdae88..b5537e1 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/IndexingJobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/IndexingJobParamValidator.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; public class IndexingJobParamValidator implements JobParamValidator { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/JobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/JobParamValidator.java similarity index 76% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/JobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/JobParamValidator.java index 13df5a7..6211ce4 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/JobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/JobParamValidator.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; import org.apache.flink.api.java.utils.ParameterTool; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/JobParamValidatorFactory.java b/commons/src/main/java/eu/europeana/processing/validation/JobParamValidatorFactory.java similarity index 90% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/JobParamValidatorFactory.java rename to commons/src/main/java/eu/europeana/processing/validation/JobParamValidatorFactory.java index bbdcb74..7fc3b6f 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/JobParamValidatorFactory.java +++ b/commons/src/main/java/eu/europeana/processing/validation/JobParamValidatorFactory.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; +import eu.europeana.processing.job.JobName; public class JobParamValidatorFactory { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/MediaJobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/MediaJobParamValidator.java similarity index 73% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/MediaJobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/MediaJobParamValidator.java index 5c2c6f1..d83bf42 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/MediaJobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/MediaJobParamValidator.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; public class MediaJobParamValidator implements JobParamValidator { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/NormalizationJobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/NormalizationJobParamValidator.java similarity index 74% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/NormalizationJobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/NormalizationJobParamValidator.java index 91b98a8..2d06a65 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/NormalizationJobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/NormalizationJobParamValidator.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; public class NormalizationJobParamValidator implements JobParamValidator { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/OAIJobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/OAIJobParamValidator.java similarity index 79% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/OAIJobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/OAIJobParamValidator.java index 7f8d77e..944a2a5 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/OAIJobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/OAIJobParamValidator.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; public class OAIJobParamValidator implements JobParamValidator { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/TransformationJobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/TransformationJobParamValidator.java similarity index 83% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/TransformationJobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/TransformationJobParamValidator.java index dc41c5d..8bc87a4 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/TransformationJobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/TransformationJobParamValidator.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; public class TransformationJobParamValidator implements JobParamValidator { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/ValidationJobParamValidator.java b/commons/src/main/java/eu/europeana/processing/validation/ValidationJobParamValidator.java similarity index 77% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/ValidationJobParamValidator.java rename to commons/src/main/java/eu/europeana/processing/validation/ValidationJobParamValidator.java index 44d77a7..3e715d5 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/validation/ValidationJobParamValidator.java +++ b/commons/src/main/java/eu/europeana/processing/validation/ValidationJobParamValidator.java @@ -1,6 +1,6 @@ -package eu.europeana.cloud.tool.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; public class ValidationJobParamValidator implements JobParamValidator { diff --git a/flinkPoCPG/src/test/java/eu/europeana/cloud/repository/RepositoriesIT.java b/commons/src/test/java/eu/europeana/processing/repository/RepositoriesIT.java similarity index 90% rename from flinkPoCPG/src/test/java/eu/europeana/cloud/repository/RepositoriesIT.java rename to commons/src/test/java/eu/europeana/processing/repository/RepositoriesIT.java index 12d3f66..1e97f94 100644 --- a/flinkPoCPG/src/test/java/eu/europeana/cloud/repository/RepositoriesIT.java +++ b/commons/src/test/java/eu/europeana/processing/repository/RepositoriesIT.java @@ -1,18 +1,20 @@ -package eu.europeana.cloud.repository; +package eu.europeana.processing.repository; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordKey; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.tool.DbConnectionProvider; -import java.io.IOException; -import java.util.Map; + +import eu.europeana.processing.DbConnectionProvider; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordKey; +import eu.europeana.processing.model.ExecutionRecordResult; import org.apache.commons.lang3.RandomStringUtils; import org.apache.flink.api.java.utils.ParameterTool; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Map; + class RepositoriesIT { diff --git a/docker/build-docker-images.sh b/docker/build-docker-images.sh index 11d50b7..c267608 100755 --- a/docker/build-docker-images.sh +++ b/docker/build-docker-images.sh @@ -5,13 +5,9 @@ echo building flink-poc docker image that can be deployed on the openshift clust #images cd flink_java21 echo building :: flink java 21 base $(pwd) -docker build --no-cache -t flink:1.20.0-java21_poc . +docker build --no-cache -t flink:1.18.1-java21_poc . cd .. cd flink-node echo building :: flink-node $(pwd) -docker build --no-cache -t flink-node-poc . -cd .. -cd .. -echo building :: flink-rich-node $(pwd) -docker build --no-cache -t flink-rich-node-poc -f ./docker/flink-node-with-application/Dockerfile . +docker build --no-cache -t metis-processing-engine-flink:1.18.1-java21 . diff --git a/docker/flink-node/Dockerfile b/docker/flink-node/Dockerfile index e88f170..a8fcd43 100644 --- a/docker/flink-node/Dockerfile +++ b/docker/flink-node/Dockerfile @@ -1,4 +1,4 @@ -FROM flink:1.20.0-java21_poc +FROM flink:1.18.1-java21_poc MAINTAINER PSNC diff --git a/docker/flink_java21/Dockerfile b/docker/flink_java21/Dockerfile index 47f8a1b..0b35455 100644 --- a/docker/flink_java21/Dockerfile +++ b/docker/flink_java21/Dockerfile @@ -44,9 +44,9 @@ RUN set -ex; \ gosu nobody true # Configure Flink version -ENV FLINK_TGZ_URL=https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz \ - FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz.asc \ - GPG_KEY=B2D64016B940A7E0B9B72E0D7D0528B28037D8BC \ +ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz \ + FLINK_ASC_URL=https://downloads.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz.asc \ + GPG_KEY=96AE0E32CBE6E0753CE6DF6CB078D1D3253A8D82 \ CHECK_GPG=true # Prepare environment @@ -81,11 +81,11 @@ RUN set -ex; \ chown -R flink:flink .; \ \ # Replace default REST/RPC endpoint bind address to use the container's network interface \ - sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' $FLINK_HOME/conf/config.yaml; \ - sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' $FLINK_HOME/conf/config.yaml; \ - sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/config.yaml; \ - sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/config.yaml; \ - sed -i '/taskmanager.host: localhost/d' $FLINK_HOME/conf/config.yaml; + sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \ + sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \ + sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \ + sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml; \ + sed -i '/taskmanager.host: localhost/d' $FLINK_HOME/conf/flink-conf.yaml; # Configure container COPY docker-entrypoint.sh / diff --git a/enrichment/pom.xml b/enrichment/pom.xml new file mode 100644 index 0000000..ac339f2 --- /dev/null +++ b/enrichment/pom.xml @@ -0,0 +1,83 @@ + + + 4.0.0 + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-enrichment + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + eu.europeana.metis-processing-engine + metis-processing-engine-commons + 1.0-SNAPSHOT + + + + eu.europeana.metis + metis-enrichment-client + ${metis.version} + + + org.apache.maven.surefire + * + + + org.springframework + * + + + org.springframework.security + * + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + + + maven-assembly-plugin + + + make-assembly + + + jar-with-dependencies + + false + + + + eu.europeana.processing.enrichment.EnrichmentJob + + + + + package + + single + + + + + + + + \ No newline at end of file diff --git a/enrichment/src/main/java/eu/europeana/processing/enrichment/EnrichmentJob.java b/enrichment/src/main/java/eu/europeana/processing/enrichment/EnrichmentJob.java new file mode 100644 index 0000000..1e35c8f --- /dev/null +++ b/enrichment/src/main/java/eu/europeana/processing/enrichment/EnrichmentJob.java @@ -0,0 +1,29 @@ +package eu.europeana.processing.enrichment; + +import eu.europeana.processing.MetisJob; +import eu.europeana.processing.enrichment.processor.EnrichmentOperator; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EnrichmentJob extends MetisJob { + + private static final Logger LOGGER = LoggerFactory.getLogger(EnrichmentJob.class); + + protected EnrichmentJob(String[] args) { + super(args, JobName.ENRICHMENT); + } + + public static void main(String[] args) throws Exception { + LOGGER.info("Starting {}...", EnrichmentJob.class.getSimpleName()); + new EnrichmentJob(args).execute(); + } + + @Override + public ProcessFunction getMainOperator() { + return new EnrichmentOperator(); + } +} diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/EnrichmentOperator.java b/enrichment/src/main/java/eu/europeana/processing/enrichment/processor/EnrichmentOperator.java similarity index 92% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/EnrichmentOperator.java rename to enrichment/src/main/java/eu/europeana/processing/enrichment/processor/EnrichmentOperator.java index 8a538c1..0941597 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/EnrichmentOperator.java +++ b/enrichment/src/main/java/eu/europeana/processing/enrichment/processor/EnrichmentOperator.java @@ -1,14 +1,14 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.enrichment.processor; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; import eu.europeana.enrichment.rest.client.EnrichmentWorker; import eu.europeana.enrichment.rest.client.EnrichmentWorkerImpl; import eu.europeana.enrichment.rest.client.dereference.DereferencerProvider; import eu.europeana.enrichment.rest.client.enrichment.EnricherProvider; import eu.europeana.enrichment.rest.client.report.ProcessedResult; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; diff --git a/flinkPoCPG/pom.xml b/flinkPoCPG/pom.xml deleted file mode 100644 index 55f202c..0000000 --- a/flinkPoCPG/pom.xml +++ /dev/null @@ -1,282 +0,0 @@ - - - 4.0.0 - com.example - flinkPoCPG - 0.0.1-SNAPSHOT - flinkPoCPG - flinkPoCPG - - - 21 - 21 - UTF-8 - 1.20.0 - 13-SNAPSHOT - 2.16.1 - 6.1.4 - 1.14.19 - - - - - libs-release - libs-release - https://artifactory.eanadev.org/artifactory/libs-release - - true - - - false - - - - libs-snapshot - libs-snapshot - https://artifactory.eanadev.org/artifactory/libs-snapshot - - false - - - true - - - - - - - org.apache.flink - flink-streaming-java - ${flink-version} - provided - - - - org.apache.logging.log4j - log4j-slf4j-impl - 2.22.1 - - - - org.apache.flink - flink-clients - provided - ${flink-version} - - - - org.projectlombok - lombok - 1.18.30 - provided - - - - - org.postgresql - postgresql - 42.7.3 - - - - eu.europeana.metis - metis-transformation-service - ${metis.version} - - - - eu.europeana.metis - metis-validation-service - ${metis.version} - - - - eu.europeana.metis - metis-normalization - ${metis.version} - - - org.apache.maven.surefire - * - - - org.springframework - * - - - org.springframework.security - * - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - - eu.europeana.metis - metis-enrichment-client - ${metis.version} - - - org.apache.maven.surefire - * - - - org.springframework - * - - - org.springframework.security - * - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - - eu.europeana.metis - metis-media-service - ${metis.version} - - - org.apache.maven.surefire - * - - - org.springframework - * - - - org.springframework.security - * - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - - eu.europeana.metis - metis-indexing - ${metis.version} - - - org.apache.maven.surefire - * - - - org.springframework - * - - - org.springframework.security - * - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - - eu.europeana.metis - metis-harvesting - ${metis.version} - - - com.fasterxml.jackson.core - jackson-core - ${jackson.version} - - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - - - io.netty - netty-transport-native-epoll - 4.1.107.Final - - - io.projectreactor.netty - reactor-netty-http - 1.0.43 - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - - eu.europeana.cloud.flink - flink-poc-client - 1-SNAPSHOT - - - byte-buddy - net.bytebuddy - ${version.bytebuddy} - - - byte-buddy-agent - net.bytebuddy - ${version.bytebuddy} - - - com.zaxxer - HikariCP - 5.1.0 - - - - - - - org.springframework - spring-framework-bom - ${version.spring-framework} - pom - import - - - - - - - - maven-assembly-plugin - - - make-assembly - - flink-poc-postgres - - jar-with-dependencies - - false - - - - eu.europeana.cloud.flink.clustertest.ClusterTestJob - - - - - package - - single - - - - - - - diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/enrichment/EnrichmentJobWithPostgresMultiThreadedOperation.java b/flinkPoCPG/src/main/java/eu/europeana/cloud/job/enrichment/EnrichmentJobWithPostgresMultiThreadedOperation.java deleted file mode 100644 index d908b7e..0000000 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/enrichment/EnrichmentJobWithPostgresMultiThreadedOperation.java +++ /dev/null @@ -1,29 +0,0 @@ -package eu.europeana.cloud.job.enrichment; - -import eu.europeana.cloud.common.MetisJob; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.operator.EnrichmentOperator; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EnrichmentJobWithPostgresMultiThreadedOperation extends MetisJob { - - private static final Logger LOGGER = LoggerFactory.getLogger(EnrichmentJobWithPostgresMultiThreadedOperation.class); - - protected EnrichmentJobWithPostgresMultiThreadedOperation(String[] args) { - super(args, JobName.ENRICHMENT); - } - - public static void main(String[] args) throws Exception { - LOGGER.info("Starting {}...", EnrichmentJobWithPostgresMultiThreadedOperation.class.getSimpleName()); - new EnrichmentJobWithPostgresMultiThreadedOperation(args).execute(); - } - - @Override - public ProcessFunction getMainOperator() { - return new EnrichmentOperator(); - } -} diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/indexing/IndexingJobWithPostgresMultiThreadedOperation.java b/flinkPoCPG/src/main/java/eu/europeana/cloud/job/indexing/IndexingJobWithPostgresMultiThreadedOperation.java deleted file mode 100644 index a4ee144..0000000 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/indexing/IndexingJobWithPostgresMultiThreadedOperation.java +++ /dev/null @@ -1,29 +0,0 @@ -package eu.europeana.cloud.job.indexing; - -import eu.europeana.cloud.common.MetisJob; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.operator.IndexingOperator; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class IndexingJobWithPostgresMultiThreadedOperation extends MetisJob { - - private static final Logger LOGGER = LoggerFactory.getLogger(IndexingJobWithPostgresMultiThreadedOperation.class); - - protected IndexingJobWithPostgresMultiThreadedOperation(String[] args) { - super(args, JobName.INDEXING); - } - - public static void main(String[] args) throws Exception { - LOGGER.info("Starting {}...", IndexingJobWithPostgresMultiThreadedOperation.class.getSimpleName()); - new IndexingJobWithPostgresMultiThreadedOperation(args).execute(); - } - - @Override - public ProcessFunction getMainOperator() { - return new IndexingOperator(); - } -} diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/media/MediaJobWithPostgresMultiThreadedOperation.java b/flinkPoCPG/src/main/java/eu/europeana/cloud/job/media/MediaJobWithPostgresMultiThreadedOperation.java deleted file mode 100644 index 7c3ab00..0000000 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/media/MediaJobWithPostgresMultiThreadedOperation.java +++ /dev/null @@ -1,29 +0,0 @@ -package eu.europeana.cloud.job.media; - -import eu.europeana.cloud.common.MetisJob; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.operator.MediaOperator; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MediaJobWithPostgresMultiThreadedOperation extends MetisJob { - - private static final Logger LOGGER = LoggerFactory.getLogger(MediaJobWithPostgresMultiThreadedOperation.class); - - protected MediaJobWithPostgresMultiThreadedOperation(String[] args) { - super(args, JobName.MEDIA); - } - - public static void main(String[] args) throws Exception { - LOGGER.info("Starting {}...", MediaJobWithPostgresMultiThreadedOperation.class.getSimpleName()); - new MediaJobWithPostgresMultiThreadedOperation(args).execute(); - } - - @Override - public ProcessFunction getMainOperator() { - return new MediaOperator(); - } -} diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/DbRepository.java b/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/DbRepository.java deleted file mode 100644 index 300bc14..0000000 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/repository/DbRepository.java +++ /dev/null @@ -1,5 +0,0 @@ -package eu.europeana.cloud.repository; - -public interface DbRepository { - -} diff --git a/indexing/pom.xml b/indexing/pom.xml new file mode 100644 index 0000000..75e2de0 --- /dev/null +++ b/indexing/pom.xml @@ -0,0 +1,84 @@ + + + 4.0.0 + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-indexing + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + eu.europeana.metis-processing-engine + metis-processing-engine-commons + 1.0-SNAPSHOT + + + + eu.europeana.metis + metis-indexing + ${metis.version} + + + org.apache.maven.surefire + * + + + org.springframework + * + + + org.springframework.security + * + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + + + + maven-assembly-plugin + + + make-assembly + + + jar-with-dependencies + + false + + + + eu.europeana.processing.indexing.IndexingJob + + + + + package + + single + + + + + + + + \ No newline at end of file diff --git a/indexing/src/main/java/eu/europeana/processing/indexing/IndexingJob.java b/indexing/src/main/java/eu/europeana/processing/indexing/IndexingJob.java new file mode 100644 index 0000000..0f4cdf8 --- /dev/null +++ b/indexing/src/main/java/eu/europeana/processing/indexing/IndexingJob.java @@ -0,0 +1,29 @@ +package eu.europeana.processing.indexing; + +import eu.europeana.processing.MetisJob; +import eu.europeana.processing.indexing.processor.IndexingOperator; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexingJob extends MetisJob { + + private static final Logger LOGGER = LoggerFactory.getLogger(IndexingJob.class); + + protected IndexingJob(String[] args) { + super(args, JobName.INDEXING); + } + + public static void main(String[] args) throws Exception { + LOGGER.info("Starting {}...", IndexingJob.class.getSimpleName()); + new IndexingJob(args).execute(); + } + + @Override + public ProcessFunction getMainOperator() { + return new IndexingOperator(); + } +} diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/IndexingOperator.java b/indexing/src/main/java/eu/europeana/processing/indexing/processor/IndexingOperator.java similarity index 88% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/IndexingOperator.java rename to indexing/src/main/java/eu/europeana/processing/indexing/processor/IndexingOperator.java index 732b7e6..492e3c5 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/IndexingOperator.java +++ b/indexing/src/main/java/eu/europeana/processing/indexing/processor/IndexingOperator.java @@ -1,15 +1,14 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.indexing.processor; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; - -import eu.europeana.cloud.tool.IndexingSettingsGenerator; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; import eu.europeana.indexing.Indexer; import eu.europeana.indexing.IndexerFactory; import eu.europeana.indexing.IndexingSettings; import eu.europeana.indexing.exception.IndexingException; +import eu.europeana.processing.indexing.tool.IndexingSettingsGenerator; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/IndexingSettingsGenerator.java b/indexing/src/main/java/eu/europeana/processing/indexing/tool/IndexingSettingsGenerator.java similarity index 97% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/tool/IndexingSettingsGenerator.java rename to indexing/src/main/java/eu/europeana/processing/indexing/tool/IndexingSettingsGenerator.java index 0067879..a38c5d1 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/tool/IndexingSettingsGenerator.java +++ b/indexing/src/main/java/eu/europeana/processing/indexing/tool/IndexingSettingsGenerator.java @@ -1,9 +1,10 @@ -package eu.europeana.cloud.tool; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +package eu.europeana.processing.indexing.tool; + import eu.europeana.indexing.IndexingSettings; import eu.europeana.indexing.exception.IndexingException; import eu.europeana.indexing.exception.SetupRelatedIndexingException; +import eu.europeana.processing.job.JobParamName; import org.apache.flink.api.java.utils.ParameterTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/media/pom.xml b/media/pom.xml new file mode 100644 index 0000000..e551b24 --- /dev/null +++ b/media/pom.xml @@ -0,0 +1,82 @@ + + + 4.0.0 + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-media + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + eu.europeana.metis-processing-engine + metis-processing-engine-commons + 1.0-SNAPSHOT + + + + eu.europeana.metis + metis-media-service + ${metis.version} + + + org.apache.maven.surefire + * + + + org.springframework + * + + + org.springframework.security + * + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + + + maven-assembly-plugin + + + make-assembly + + + jar-with-dependencies + + false + + + + eu.europeana.processing.media.MediaJob + + + + + package + + single + + + + + + + + diff --git a/media/src/main/java/eu/europeana/processing/media/MediaJob.java b/media/src/main/java/eu/europeana/processing/media/MediaJob.java new file mode 100644 index 0000000..204ab7d --- /dev/null +++ b/media/src/main/java/eu/europeana/processing/media/MediaJob.java @@ -0,0 +1,31 @@ +package eu.europeana.processing.media; + + +import eu.europeana.processing.MetisJob; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.media.processor.MediaOperator; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MediaJob extends MetisJob { + + private static final Logger LOGGER = LoggerFactory.getLogger(MediaJob.class); + + protected MediaJob(String[] args) { + super(args, JobName.MEDIA); + } + + public static void main(String[] args) throws Exception { + LOGGER.info("Starting {}...", MediaJob.class.getSimpleName()); + new MediaJob(args).execute(); + } + + @Override + public ProcessFunction getMainOperator() { + return new MediaOperator(); + } + +} diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/MediaOperator.java b/media/src/main/java/eu/europeana/processing/media/processor/MediaOperator.java similarity index 91% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/MediaOperator.java rename to media/src/main/java/eu/europeana/processing/media/processor/MediaOperator.java index 81179da..8ef111a 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/MediaOperator.java +++ b/media/src/main/java/eu/europeana/processing/media/processor/MediaOperator.java @@ -1,10 +1,10 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.media.processor; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import eu.europeana.metis.mediaprocessing.*; +import eu.europeana.metis.mediaprocessing.MediaExtractor; +import eu.europeana.metis.mediaprocessing.MediaProcessorFactory; +import eu.europeana.metis.mediaprocessing.RdfConverterFactory; +import eu.europeana.metis.mediaprocessing.RdfDeserializer; +import eu.europeana.metis.mediaprocessing.RdfSerializer; import eu.europeana.metis.mediaprocessing.exception.MediaExtractionException; import eu.europeana.metis.mediaprocessing.exception.RdfDeserializationException; import eu.europeana.metis.mediaprocessing.exception.RdfSerializationException; @@ -12,6 +12,10 @@ import eu.europeana.metis.mediaprocessing.model.RdfResourceEntry; import eu.europeana.metis.mediaprocessing.model.ResourceExtractionResult; import eu.europeana.metis.mediaprocessing.model.Thumbnail; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; diff --git a/normalization/pom.xml b/normalization/pom.xml new file mode 100644 index 0000000..e48826c --- /dev/null +++ b/normalization/pom.xml @@ -0,0 +1,90 @@ + + + 4.0.0 + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-normalization + + + 21 + 21 + UTF-8 + + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + eu.europeana.metis-processing-engine + metis-processing-engine-commons + 1.0-SNAPSHOT + + + + eu.europeana.metis + metis-normalization + ${metis.version} + + + org.apache.maven.surefire + * + + + org.springframework + * + + + org.springframework.security + * + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + + + + maven-assembly-plugin + + + make-assembly + + + jar-with-dependencies + + false + + + + eu.europeana.processing.normalization.NormalizationJob + + + + + package + + single + + + + + + + + \ No newline at end of file diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/normalization/NormalizationJobWithPostgresMultiThreadedOperation.java b/normalization/src/main/java/eu/europeana/processing/normalization/NormalizationJob.java similarity index 55% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/job/normalization/NormalizationJobWithPostgresMultiThreadedOperation.java rename to normalization/src/main/java/eu/europeana/processing/normalization/NormalizationJob.java index dfb64ec..00d211c 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/normalization/NormalizationJobWithPostgresMultiThreadedOperation.java +++ b/normalization/src/main/java/eu/europeana/processing/normalization/NormalizationJob.java @@ -1,10 +1,10 @@ -package eu.europeana.cloud.job.normalization; +package eu.europeana.processing.normalization; -import eu.europeana.cloud.common.MetisJob; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.operator.NormalizationOperator; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; +import eu.europeana.processing.MetisJob; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.normalization.processor.NormalizationOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,17 +22,17 @@ *
    --datasource.password admin
*

*/ -public class NormalizationJobWithPostgresMultiThreadedOperation extends MetisJob { +public class NormalizationJob extends MetisJob { - private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationJobWithPostgresMultiThreadedOperation.class); + private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationJob.class); - protected NormalizationJobWithPostgresMultiThreadedOperation(String[] args) { + protected NormalizationJob(String[] args) { super(args, JobName.NORMALIZATION); } public static void main(String[] args) throws Exception { - LOGGER.info("Starting {}...", NormalizationJobWithPostgresMultiThreadedOperation.class.getSimpleName()); - new NormalizationJobWithPostgresMultiThreadedOperation(args).execute(); + LOGGER.info("Starting {}...", NormalizationJob.class.getSimpleName()); + new NormalizationJob(args).execute(); } @Override diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/NormalizationOperator.java b/normalization/src/main/java/eu/europeana/processing/normalization/processor/NormalizationOperator.java similarity index 89% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/NormalizationOperator.java rename to normalization/src/main/java/eu/europeana/processing/normalization/processor/NormalizationOperator.java index 6d7f43a..ffbde35 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/NormalizationOperator.java +++ b/normalization/src/main/java/eu/europeana/processing/normalization/processor/NormalizationOperator.java @@ -1,12 +1,12 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.normalization.processor; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; import eu.europeana.normalization.Normalizer; import eu.europeana.normalization.NormalizerFactory; import eu.europeana.normalization.model.NormalizationResult; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; diff --git a/flinkPoCPG/src/test/java/eu/europeana/cloud/operator/NormalizerFactoryTest.java b/normalization/src/test/java/eu/europeana/cloud/operator/NormalizerFactoryTest.java similarity index 99% rename from flinkPoCPG/src/test/java/eu/europeana/cloud/operator/NormalizerFactoryTest.java rename to normalization/src/test/java/eu/europeana/cloud/operator/NormalizerFactoryTest.java index e2bf0df..fa2ebb5 100644 --- a/flinkPoCPG/src/test/java/eu/europeana/cloud/operator/NormalizerFactoryTest.java +++ b/normalization/src/test/java/eu/europeana/cloud/operator/NormalizerFactoryTest.java @@ -5,15 +5,16 @@ import eu.europeana.normalization.model.NormalizationResult; import eu.europeana.normalization.util.NormalizationConfigurationException; import eu.europeana.normalization.util.NormalizationException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.time.StopWatch; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** * Test for normalization factory, the test could be copied to spring batch project, batch-normalization module to compare * behaviour on the spring batch POC library set. There were some small differences in result xml, printed in testNormalization() diff --git a/oai/pom.xml b/oai/pom.xml new file mode 100644 index 0000000..66a620a --- /dev/null +++ b/oai/pom.xml @@ -0,0 +1,85 @@ + + + 4.0.0 + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-oai + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + eu.europeana.metis-processing-engine + metis-processing-engine-commons + 1.0-SNAPSHOT + + + + eu.europeana.metis + metis-transformation-service + ${metis.version} + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.22.1 + + + + org.projectlombok + lombok + 1.18.30 + provided + + + + eu.europeana.metis + metis-harvesting + ${metis.version} + + + + + + + + maven-assembly-plugin + + + make-assembly + + + jar-with-dependencies + + false + + + + eu.europeana.processing.oai.OAIJob + + + + + package + + single + + + + + + + + \ No newline at end of file diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/oai/OAIJob.java b/oai/src/main/java/eu/europeana/processing/oai/OAIJob.java similarity index 76% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/job/oai/OAIJob.java rename to oai/src/main/java/eu/europeana/processing/oai/OAIJob.java index dddd803..ab9054e 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/oai/OAIJob.java +++ b/oai/src/main/java/eu/europeana/processing/oai/OAIJob.java @@ -1,16 +1,15 @@ -package eu.europeana.cloud.job.oai; - -import eu.europeana.cloud.common.MetisJob; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.operator.DeletedRecordFilter; -import eu.europeana.cloud.operator.IdAssigningOperator; -import eu.europeana.cloud.operator.RecordHarvestingOperator; -import eu.europeana.cloud.source.oai.OAIHeadersSource; - -import eu.europeana.cloud.sink.DbSinkFunction; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; +package eu.europeana.processing.oai; + +import eu.europeana.processing.MetisJob; +import eu.europeana.processing.oai.processor.DeletedRecordFilter; +import eu.europeana.processing.oai.processor.IdAssigningOperator; +import eu.europeana.processing.oai.processor.RecordHarvestingOperator; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.sink.DbSinkFunction; +import eu.europeana.processing.oai.reader.OAIHeadersSource; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/DeletedRecordFilter.java b/oai/src/main/java/eu/europeana/processing/oai/processor/DeletedRecordFilter.java similarity index 87% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/DeletedRecordFilter.java rename to oai/src/main/java/eu/europeana/processing/oai/processor/DeletedRecordFilter.java index 763d31c..26dd18c 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/DeletedRecordFilter.java +++ b/oai/src/main/java/eu/europeana/processing/oai/processor/DeletedRecordFilter.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.oai.processor; import eu.europeana.metis.harvesting.oaipmh.OaiRecordHeader; import org.apache.flink.api.common.functions.FilterFunction; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/IdAssigningOperator.java b/oai/src/main/java/eu/europeana/processing/oai/processor/IdAssigningOperator.java similarity index 92% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/IdAssigningOperator.java rename to oai/src/main/java/eu/europeana/processing/oai/processor/IdAssigningOperator.java index 8ffe950..67fa253 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/IdAssigningOperator.java +++ b/oai/src/main/java/eu/europeana/processing/oai/processor/IdAssigningOperator.java @@ -1,11 +1,11 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.oai.processor; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordKey; -import eu.europeana.cloud.model.ExecutionRecordResult; import eu.europeana.metis.transformation.service.EuropeanaGeneratedIdsMap; import eu.europeana.metis.transformation.service.EuropeanaIdCreator; import eu.europeana.metis.transformation.service.EuropeanaIdException; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordKey; +import eu.europeana.processing.model.ExecutionRecordResult; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/RecordHarvestingOperator.java b/oai/src/main/java/eu/europeana/processing/oai/processor/RecordHarvestingOperator.java similarity index 81% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/RecordHarvestingOperator.java rename to oai/src/main/java/eu/europeana/processing/oai/processor/RecordHarvestingOperator.java index 49092e9..b5e3b4a 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/RecordHarvestingOperator.java +++ b/oai/src/main/java/eu/europeana/processing/oai/processor/RecordHarvestingOperator.java @@ -1,24 +1,22 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.oai.processor; -import static eu.europeana.cloud.flink.client.constants.postgres.JobName.OAI_HARVEST; -import static eu.europeana.cloud.flink.client.constants.postgres.JobParamName.DATASET_ID; -import static eu.europeana.cloud.flink.client.constants.postgres.JobParamName.METADATA_PREFIX; -import static eu.europeana.cloud.flink.client.constants.postgres.JobParamName.OAI_REPOSITORY_URL; +import static eu.europeana.processing.job.JobName.OAI_HARVEST; +import static eu.europeana.processing.job.JobParamName.DATASET_ID; +import static eu.europeana.processing.job.JobParamName.METADATA_PREFIX; +import static eu.europeana.processing.job.JobParamName.OAI_REPOSITORY_URL; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecord.ExecutionRecordBuilder; -import eu.europeana.cloud.model.ExecutionRecordKey; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.model.ExecutionRecordResult.ExecutionRecordResultBuilder; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; import eu.europeana.metis.harvesting.HarvesterFactory; import eu.europeana.metis.harvesting.oaipmh.OaiHarvester; import eu.europeana.metis.harvesting.oaipmh.OaiRecord; import eu.europeana.metis.harvesting.oaipmh.OaiRecordHeader; import eu.europeana.metis.harvesting.oaipmh.OaiRepository; -import java.nio.charset.StandardCharsets; -import java.time.Instant; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecord.ExecutionRecordBuilder; +import eu.europeana.processing.model.ExecutionRecordKey; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.model.ExecutionRecordResult.ExecutionRecordResultBuilder; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -27,6 +25,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; +import java.time.Instant; + + public class RecordHarvestingOperator extends ProcessFunction { private static final Logger LOGGER = LoggerFactory.getLogger(RecordHarvestingOperator.class); diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIEnumeratorState.java b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIEnumeratorState.java similarity index 90% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIEnumeratorState.java rename to oai/src/main/java/eu/europeana/processing/oai/reader/OAIEnumeratorState.java index 80eb10b..593e17b 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIEnumeratorState.java +++ b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIEnumeratorState.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.source.oai; +package eu.europeana.processing.oai.reader; import lombok.AllArgsConstructor; import lombok.Builder; diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIEnumeratorStateSerializer.java b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIEnumeratorStateSerializer.java similarity index 96% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIEnumeratorStateSerializer.java rename to oai/src/main/java/eu/europeana/processing/oai/reader/OAIEnumeratorStateSerializer.java index 164d8bb..741db81 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIEnumeratorStateSerializer.java +++ b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIEnumeratorStateSerializer.java @@ -1,11 +1,12 @@ -package eu.europeana.cloud.source.oai; +package eu.europeana.processing.oai.reader; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + public class OAIEnumeratorStateSerializer implements SimpleVersionedSerializer { private static final Logger LOGGER = LoggerFactory.getLogger(OAIEnumeratorStateSerializer.class); diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersReader.java b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersReader.java similarity index 91% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersReader.java rename to oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersReader.java index 27ad2b8..9eb1b9e 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersReader.java +++ b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersReader.java @@ -1,8 +1,4 @@ -package eu.europeana.cloud.source.oai; - -import static eu.europeana.cloud.flink.client.constants.postgres.JobParamName.METADATA_PREFIX; -import static eu.europeana.cloud.flink.client.constants.postgres.JobParamName.OAI_REPOSITORY_URL; -import static eu.europeana.cloud.flink.client.constants.postgres.JobParamName.SET_SPEC; +package eu.europeana.processing.oai.reader; import eu.europeana.metis.harvesting.HarvesterFactory; import eu.europeana.metis.harvesting.HarvestingIterator; @@ -10,8 +6,6 @@ import eu.europeana.metis.harvesting.oaipmh.OaiHarvest; import eu.europeana.metis.harvesting.oaipmh.OaiHarvester; import eu.europeana.metis.harvesting.oaipmh.OaiRecordHeader; -import java.util.List; -import java.util.concurrent.CompletableFuture; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -20,6 +14,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static eu.europeana.processing.job.JobParamName.METADATA_PREFIX; +import static eu.europeana.processing.job.JobParamName.OAI_REPOSITORY_URL; +import static eu.europeana.processing.job.JobParamName.SET_SPEC; + public class OAIHeadersReader implements SourceReader { private static final Logger LOGGER = LoggerFactory.getLogger(OAIHeadersReader.class); diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersSource.java b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersSource.java similarity index 98% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersSource.java rename to oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersSource.java index c359e0a..639a3a4 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersSource.java +++ b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersSource.java @@ -1,7 +1,6 @@ -package eu.europeana.cloud.source.oai; +package eu.europeana.processing.oai.reader; import eu.europeana.metis.harvesting.oaipmh.OaiRecordHeader; -import java.io.IOException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -13,10 +12,11 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.io.SimpleVersionedSerializer; +import java.io.IOException; + public class OAIHeadersSource implements Source, ResultTypeQueryable { - private final ParameterTool parameterTool; public OAIHeadersSource(ParameterTool parameterTool) { diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersSplitEnumerator.java b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersSplitEnumerator.java similarity index 92% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersSplitEnumerator.java rename to oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersSplitEnumerator.java index bfb6a21..df94abb 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAIHeadersSplitEnumerator.java +++ b/oai/src/main/java/eu/europeana/processing/oai/reader/OAIHeadersSplitEnumerator.java @@ -1,14 +1,14 @@ -package eu.europeana.cloud.source.oai; +package eu.europeana.processing.oai.reader; -import java.io.IOException; -import java.util.List; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.List; + public class OAIHeadersSplitEnumerator implements SplitEnumerator { private static final Logger LOGGER = LoggerFactory.getLogger(OAIHeadersSplitEnumerator.class); @@ -27,7 +27,7 @@ public void start() { } @Override - public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + public void handleSplitRequest(int subtaskId, String requesterHostname) { LOGGER.info("Handling split request, subtaskId: {}, host: {}", subtaskId, requesterHostname); if (!state.isSplitAssigned()) { context.assignSplit(new OAISplit(), subtaskId); diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAISplit.java b/oai/src/main/java/eu/europeana/processing/oai/reader/OAISplit.java similarity index 79% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAISplit.java rename to oai/src/main/java/eu/europeana/processing/oai/reader/OAISplit.java index 0c49d7d..9abad63 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/source/oai/OAISplit.java +++ b/oai/src/main/java/eu/europeana/processing/oai/reader/OAISplit.java @@ -1,4 +1,4 @@ -package eu.europeana.cloud.source.oai; +package eu.europeana.processing.oai.reader; import org.apache.flink.api.connector.source.SourceSplit; diff --git a/flinkPoCPG/src/main/resources/log4j2.xml b/oai/src/main/resources/log4j2.xml similarity index 100% rename from flinkPoCPG/src/main/resources/log4j2.xml rename to oai/src/main/resources/log4j2.xml diff --git a/paas/jobmanager-session-deployment.yaml b/paas/jobmanager-session-deployment.yaml index 9f8678e..4c75c03 100644 --- a/paas/jobmanager-session-deployment.yaml +++ b/paas/jobmanager-session-deployment.yaml @@ -19,7 +19,7 @@ spec: - name: image-registry-pull containers: - name: jobmanager - image: registry.paas.psnc.pl/ecloud-poc/flink-rich-node-poc:latest + image: registry.paas.psnc.pl/ecloud-poc/metis-processing-engine-flink:1.20-java21 imagePullPolicy: Always args: ["jobmanager"] ports: diff --git a/paas/taskmanager-session-deployment.yaml b/paas/taskmanager-session-deployment.yaml index 67bab12..83f0008 100644 --- a/paas/taskmanager-session-deployment.yaml +++ b/paas/taskmanager-session-deployment.yaml @@ -19,7 +19,7 @@ spec: - name: image-registry-pull containers: - name: taskmanager - image: registry.paas.psnc.pl/ecloud-poc/flink-rich-node-poc:latest + image: registry.paas.psnc.pl/ecloud-poc/metis-processing-engine-flink:1.20-java21 imagePullPolicy: Always args: ["taskmanager"] ports: diff --git a/pom.xml b/pom.xml index 47a401a..c153676 100644 --- a/pom.xml +++ b/pom.xml @@ -2,20 +2,59 @@ - 4.0.0 + 4.0.0 - org.example - flinkPoC - pom - 1.0-SNAPSHOT + eu.europeana.metis-processing-engine + metis-processing-engine-parent + pom + 1.0-SNAPSHOT - - 21 - 21 - UTF-8 - - - flink-poc-client - flinkPoCPG - + + 21 + 21 + UTF-8 + + 1.20.0 + 1.14.19 + 13-SNAPSHOT + + 2.16.1 + + + + commons + media + client + validation + oai + transformation + normalization + enrichment + indexing + + + + + libs-release + libs-release + https://artifactory.eanadev.org/artifactory/libs-release + + true + + + false + + + + libs-snapshot + libs-snapshot + https://artifactory.eanadev.org/artifactory/libs-snapshot + + false + + + true + + + diff --git a/readme.md b/readme.md index 7dacdf3..1a89905 100644 --- a/readme.md +++ b/readme.md @@ -18,6 +18,10 @@ Info about running the POC on the openshift cluster: 2.3. flink-node-with-application - flin-node image with application contained in it. To only update application code only the last image need to be built. +``docker tag metis-processing-engine-flink:1.20-java21 registry.paas.psnc.pl/ecloud-poc/metis-processing-engine-flink:1.20-java21`` + +``docker push registry.paas.psnc.pl/ecloud-poc/metis-processing-engine-flink:1.20-java21`` + 3. Deploying on the openshift server: 3.1. paas/update-jobs-config.sh - copy config files from the config directory to the openshift secrets 3.2. paas/apache/deploy-apache.sh -deploys apache server used for web dashboard on a openshift cluster it needs to have file: config/apache/.htpasswd generated diff --git a/transformation/pom.xml b/transformation/pom.xml new file mode 100644 index 0000000..853a7ea --- /dev/null +++ b/transformation/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-transformation + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + eu.europeana.metis-processing-engine + metis-processing-engine-commons + 1.0-SNAPSHOT + + + + eu.europeana.metis + metis-transformation-service + ${metis.version} + + + + + + + + maven-assembly-plugin + + + make-assembly + + + jar-with-dependencies + + false + + + + eu.europeana.processing.transformation.TransformationJob + + + + + package + + single + + + + + + + + \ No newline at end of file diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/transformation/TransformationJobWithPostgresMultiThreadedOperation.java b/transformation/src/main/java/eu/europeana/processing/transformation/TransformationJob.java similarity index 63% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/job/transformation/TransformationJobWithPostgresMultiThreadedOperation.java rename to transformation/src/main/java/eu/europeana/processing/transformation/TransformationJob.java index 019f049..402e37d 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/transformation/TransformationJobWithPostgresMultiThreadedOperation.java +++ b/transformation/src/main/java/eu/europeana/processing/transformation/TransformationJob.java @@ -1,10 +1,11 @@ -package eu.europeana.cloud.job.transformation; +package eu.europeana.processing.transformation; -import eu.europeana.cloud.common.MetisJob; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.operator.TransformationOperator; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; + +import eu.europeana.processing.MetisJob; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.transformation.processor.TransformationOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,17 +30,17 @@ *

* */ -public class TransformationJobWithPostgresMultiThreadedOperation extends MetisJob { +public class TransformationJob extends MetisJob { - private static final Logger LOGGER = LoggerFactory.getLogger(TransformationJobWithPostgresMultiThreadedOperation.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TransformationJob.class); - protected TransformationJobWithPostgresMultiThreadedOperation(String[] args) { + protected TransformationJob(String[] args) { super(args, JobName.TRANSFORMATION); } public static void main(String[] args) throws Exception { - LOGGER.info("Starting {}...", TransformationJobWithPostgresMultiThreadedOperation.class.getSimpleName()); - new TransformationJobWithPostgresMultiThreadedOperation(args).execute(); + LOGGER.info("Starting {}...", TransformationJob.class.getSimpleName()); + new TransformationJob(args).execute(); } @Override diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/TransformationOperator.java b/transformation/src/main/java/eu/europeana/processing/transformation/processor/TransformationOperator.java similarity index 84% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/TransformationOperator.java rename to transformation/src/main/java/eu/europeana/processing/transformation/processor/TransformationOperator.java index b0dd3f0..21aba80 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/TransformationOperator.java +++ b/transformation/src/main/java/eu/europeana/processing/transformation/processor/TransformationOperator.java @@ -1,10 +1,14 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.transformation.processor; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.flink.client.constants.postgres.JobName; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import eu.europeana.metis.transformation.service.*; +import eu.europeana.metis.transformation.service.EuropeanaGeneratedIdsMap; +import eu.europeana.metis.transformation.service.EuropeanaIdCreator; +import eu.europeana.metis.transformation.service.EuropeanaIdException; +import eu.europeana.metis.transformation.service.TransformationException; +import eu.europeana.metis.transformation.service.XsltTransformer; +import eu.europeana.processing.job.JobName; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; diff --git a/validation/pom.xml b/validation/pom.xml new file mode 100644 index 0000000..156c148 --- /dev/null +++ b/validation/pom.xml @@ -0,0 +1,72 @@ + + + 4.0.0 + + eu.europeana.metis-processing-engine + metis-processing-engine-parent + 1.0-SNAPSHOT + + + metis-processing-engine-validation + + + + org.apache.flink + flink-streaming-java + ${flink-version} + provided + + + + eu.europeana.metis-processing-engine + metis-processing-engine-commons + 1.0-SNAPSHOT + + + + eu.europeana.metis + metis-transformation-service + ${metis.version} + + + + eu.europeana.metis + metis-validation-service + ${metis.version} + + + + + + + + maven-assembly-plugin + + + make-assembly + + + jar-with-dependencies + + false + + + + eu.europeana.processing.validation.ValidationJob + + + + + package + + single + + + + + + + + \ No newline at end of file diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/validation/ValidationJobWithPostgresMultiThreadedOperation.java b/validation/src/main/java/eu/europeana/processing/validation/ValidationJob.java similarity index 77% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/job/validation/ValidationJobWithPostgresMultiThreadedOperation.java rename to validation/src/main/java/eu/europeana/processing/validation/ValidationJob.java index 876b6a6..a1dd757 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/job/validation/ValidationJobWithPostgresMultiThreadedOperation.java +++ b/validation/src/main/java/eu/europeana/processing/validation/ValidationJob.java @@ -1,10 +1,10 @@ -package eu.europeana.cloud.job.validation; +package eu.europeana.processing.validation; -import eu.europeana.cloud.common.MetisJob; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.operator.ValidationOperator; +import eu.europeana.processing.MetisJob; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; +import eu.europeana.processing.validation.processor.ValidationOperator; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.slf4j.Logger; @@ -33,8 +33,8 @@ * * *

Progress tracking:

- *

In this implementation progress tracking were moved to {@link eu.europeana.cloud.source.DbReaderWithProgressHandling} - * During job startup {@link eu.europeana.cloud.source.DbEnumerator} + *

In this implementation progress tracking were moved to {@link eu.europeana.processing.source.DbReaderWithProgressHandling} + * During job startup {@link eu.europeana.processing.source.DbEnumerator} * will read the status from the DB and resume the job from the first chunk that was not fully processed. * In this approach it is possible that some records will be reprocessed, but in the worse case there will be * Chunk_size records that have to be reprocessed. @@ -59,17 +59,17 @@ *

    --datasource.password admin
*

*/ -public class ValidationJobWithPostgresMultiThreadedOperation extends MetisJob { +public class ValidationJob extends MetisJob { - private static final Logger LOGGER = LoggerFactory.getLogger(ValidationJobWithPostgresMultiThreadedOperation.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ValidationJob.class); - protected ValidationJobWithPostgresMultiThreadedOperation(String[] args) { + protected ValidationJob(String[] args) { super(args, ParameterTool.fromArgs(args).getRequired(JobParamName.VALIDATION_TYPE)); } public static void main(String[] args) throws Exception { - LOGGER.info("Starting {}...", ValidationJobWithPostgresMultiThreadedOperation.class.getSimpleName()); - new ValidationJobWithPostgresMultiThreadedOperation(args).execute(); + LOGGER.info("Starting {}...", ValidationJob.class.getSimpleName()); + new ValidationJob(args).execute(); } @Override diff --git a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/ValidationOperator.java b/validation/src/main/java/eu/europeana/processing/validation/processor/ValidationOperator.java similarity index 95% rename from flinkPoCPG/src/main/java/eu/europeana/cloud/operator/ValidationOperator.java rename to validation/src/main/java/eu/europeana/processing/validation/processor/ValidationOperator.java index 9a68164..64167ca 100644 --- a/flinkPoCPG/src/main/java/eu/europeana/cloud/operator/ValidationOperator.java +++ b/validation/src/main/java/eu/europeana/processing/validation/processor/ValidationOperator.java @@ -1,11 +1,11 @@ -package eu.europeana.cloud.operator; +package eu.europeana.processing.validation.processor; -import eu.europeana.cloud.model.ExecutionRecord; -import eu.europeana.cloud.model.ExecutionRecordResult; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamName; -import eu.europeana.cloud.flink.client.constants.postgres.JobParamValue; import eu.europeana.metis.transformation.service.TransformationException; import eu.europeana.metis.transformation.service.XsltTransformer; +import eu.europeana.processing.job.JobParamName; +import eu.europeana.processing.job.JobParamValue; +import eu.europeana.processing.model.ExecutionRecord; +import eu.europeana.processing.model.ExecutionRecordResult; import eu.europeana.validation.model.ValidationResult; import eu.europeana.validation.service.ValidationExecutionService; import org.apache.flink.api.java.utils.ParameterTool;