Skip to content

Commit

Permalink
Merge pull request #3 from europeana/feat/MET-6224-modules-refactor
Browse files Browse the repository at this point in the history
Feat/met 6224 modules refactor
  • Loading branch information
pWoz authored Dec 13, 2024
2 parents aac6a06 + 7562d5d commit be9fe11
Show file tree
Hide file tree
Showing 90 changed files with 1,125 additions and 671 deletions.
18 changes: 18 additions & 0 deletions .run/OAIJob.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="OAIJob" type="Application" factoryName="Application" nameIsGenerated="true">
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="eu.europeana.processing.oai.OAIJob" />
<module name="metis-processing-engine-oai" />
<option name="PROGRAM_PARAMETERS" value="--datasetId 101 --datasource.url jdbc:postgresql://localhost:5432/batch-framework --datasource.username admin --datasource.password admin --PARALLELISM 100 --oaiRepositoryUrl http://panic.image.ntua.gr:9000/efg/oai --metadataPrefix rdf --setSpec 1076" />
<option name="VM_PARAMETERS" value="--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="eu.europeana.processing.oai.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
18 changes: 18 additions & 0 deletions .run/ValidationJob.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ValidationJob" type="Application" factoryName="Application">
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="eu.europeana.processing.validation.ValidationJob" />
<module name="metis-processing-engine-validation" />
<option name="PROGRAM_PARAMETERS" value="--datasetId 12 --executionId 1755776870684870440 --validationType VALIDATION_INTERNAL --chunkSize 12 --datasource.url jdbc:postgresql://localhost:5432/spring-batch-metis-poc --datasource.username admin --datasource.password admin" />
<option name="VM_PARAMETERS" value="--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="eu.europeana.metis.processing.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
13 changes: 6 additions & 7 deletions flink-poc-client/pom.xml → client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>eu.europeana.cloud.flink</groupId>
<artifactId>flink-poc-client</artifactId>
<version>1-SNAPSHOT</version>

<artifactId>metis-processing-engine-client</artifactId>

<parent>
<groupId>eu.europeana.metis-processing-engine</groupId>
<artifactId>metis-processing-engine-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.spring-framework>6.1.4</version.spring-framework>
</properties>

Expand Down
71 changes: 71 additions & 0 deletions commons/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>eu.europeana.metis-processing-engine</groupId>
<artifactId>metis-processing-engine-parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>metis-processing-engine-commons</artifactId>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink-version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.1.0</version>
</dependency>

<dependency>
<artifactId>byte-buddy</artifactId>
<groupId>net.bytebuddy</groupId>
<version>${version.bytebuddy}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.22.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.2</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package eu.europeana.cloud.tool;
package eu.europeana.processing;


import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import eu.europeana.cloud.flink.client.constants.postgres.JobParamName;
import java.sql.SQLException;
import eu.europeana.processing.job.JobParamName;
import org.apache.flink.api.java.utils.ParameterTool;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;

public class DbConnectionProvider implements Serializable, AutoCloseable {

Expand All @@ -17,6 +17,7 @@ public class DbConnectionProvider implements Serializable, AutoCloseable {

public DbConnectionProvider(ParameterTool parameterTool) {
HikariConfig config=new HikariConfig();
config.setDriverClassName("org.postgresql.Driver");
config.setJdbcUrl(parameterTool.getRequired(JobParamName.DATASOURCE_URL));
config.setUsername(parameterTool.get(JobParamName.DATASOURCE_USERNAME));
config.setPassword(parameterTool.get(JobParamName.DATASOURCE_PASSWORD));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package eu.europeana.cloud.common;

import eu.europeana.cloud.flink.client.constants.postgres.JobParam;
import eu.europeana.cloud.flink.client.constants.postgres.JobParamName;
import eu.europeana.cloud.model.ExecutionRecord;
import eu.europeana.cloud.model.ExecutionRecordResult;
import eu.europeana.cloud.model.TaskInfo;
import eu.europeana.cloud.repository.TaskInfoRepository;
import eu.europeana.cloud.retryable.RetryableMethodExecutor;
import eu.europeana.cloud.sink.DbSinkFunction;
import eu.europeana.cloud.source.DbSourceWithProgressHandling;
import eu.europeana.cloud.tool.DbConnectionProvider;
import eu.europeana.cloud.tool.validation.JobParamValidatorFactory;
package eu.europeana.processing;

import eu.europeana.processing.job.JobParam;
import eu.europeana.processing.job.JobParamName;
import eu.europeana.processing.model.ExecutionRecord;
import eu.europeana.processing.model.ExecutionRecordResult;
import eu.europeana.processing.model.TaskInfo;
import eu.europeana.processing.repository.TaskInfoRepository;
import eu.europeana.processing.retryable.RetryableMethodExecutor;
import eu.europeana.processing.sink.DbSinkFunction;
import eu.europeana.processing.source.DbSourceWithProgressHandling;
import eu.europeana.processing.validation.JobParamValidatorFactory;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.flink.client.constants.postgres;
package eu.europeana.processing.job;

public class JobName {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.flink.client.constants.postgres;
package eu.europeana.processing.job;

public class JobParam {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.flink.client.constants.postgres;
package eu.europeana.processing.job;

public class JobParamName {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.flink.client.constants.postgres;
package eu.europeana.processing.job;

public class JobParamValue {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.model;
package eu.europeana.processing.model;

import org.apache.flink.api.connector.source.SourceSplit;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.model;
package eu.europeana.processing.model;

import lombok.Builder;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.model;
package eu.europeana.processing.model;

import lombok.Builder;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.model;
package eu.europeana.processing.model;

import lombok.Builder;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.model;
package eu.europeana.processing.model;

public record TaskInfo(long taskId, long commitCount, long writeCount) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package eu.europeana.processing.repository;

public interface DbRepository {

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package eu.europeana.cloud.repository;
package eu.europeana.processing.repository;

import eu.europeana.cloud.model.ExecutionRecord;
import eu.europeana.cloud.model.ExecutionRecordResult;
import eu.europeana.cloud.retryable.Retryable;
import eu.europeana.cloud.tool.DbConnectionProvider;
import eu.europeana.processing.DbConnectionProvider;
import eu.europeana.processing.retryable.Retryable;
import eu.europeana.processing.model.ExecutionRecord;
import eu.europeana.processing.model.ExecutionRecordResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Retryable(delay = 5000, maxAttempts = 5)
public class ExecutionRecordExceptionLogRepository implements DbRepository, Serializable {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package eu.europeana.cloud.repository;
package eu.europeana.processing.repository;

import eu.europeana.cloud.model.ExecutionRecord;
import eu.europeana.cloud.model.ExecutionRecordKey;
import eu.europeana.cloud.model.ExecutionRecordResult;
import eu.europeana.cloud.retryable.Retryable;
import eu.europeana.cloud.tool.DbConnectionProvider;
import eu.europeana.processing.DbConnectionProvider;
import eu.europeana.processing.model.ExecutionRecord;
import eu.europeana.processing.model.ExecutionRecordKey;
import eu.europeana.processing.model.ExecutionRecordResult;
import eu.europeana.processing.retryable.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -14,8 +16,6 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Retryable(delay = 5000, maxAttempts = 5)
public class ExecutionRecordRepository implements DbRepository, Serializable {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package eu.europeana.cloud.repository;
package eu.europeana.processing.repository;

import eu.europeana.cloud.model.TaskInfo;
import eu.europeana.cloud.retryable.Retryable;
import eu.europeana.cloud.tool.DbConnectionProvider;
import eu.europeana.processing.DbConnectionProvider;
import eu.europeana.processing.model.TaskInfo;
import eu.europeana.processing.retryable.Retryable;

import java.io.Serializable;
import java.sql.Connection;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.retryable;
package eu.europeana.processing.retryable;

public class RetryInterruptedException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.retryable;
package eu.europeana.processing.retryable;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package eu.europeana.cloud.retryable;
package eu.europeana.processing.retryable;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Optional;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.DynamicType.Unloaded;
import net.bytebuddy.implementation.InvocationHandlerAdapter;
import net.bytebuddy.matcher.ElementMatchers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Optional;

//TODO This class was copied from the eCloud, we should dependencies and share the code if we decide to
//use Flink on production
public class RetryableMethodExecutor {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.europeana.cloud.retryable;
package eu.europeana.processing.retryable;

public class RetryableProxyCreateException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package eu.europeana.cloud.sink;
package eu.europeana.processing.sink;

import eu.europeana.cloud.model.ExecutionRecordResult;
import eu.europeana.cloud.repository.ExecutionRecordExceptionLogRepository;
import eu.europeana.cloud.repository.ExecutionRecordRepository;
import eu.europeana.cloud.retryable.RetryableMethodExecutor;
import eu.europeana.cloud.tool.DbConnectionProvider;
import eu.europeana.processing.DbConnectionProvider;
import eu.europeana.processing.model.ExecutionRecordResult;
import eu.europeana.processing.repository.ExecutionRecordExceptionLogRepository;
import eu.europeana.processing.repository.ExecutionRecordRepository;
import eu.europeana.processing.retryable.RetryableMethodExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
Expand Down
Loading

0 comments on commit be9fe11

Please sign in to comment.