Skip to content

Commit

Permalink
Deduplication Job (#6)
Browse files Browse the repository at this point in the history
Deduplication job will run as a Spring application.
It'll be triggered every 5 min (configurable) and will do the following:

Get state of last run
Check if there is new data to be deduplicated
Check if there are duplicates in new data
Deduplicated
Save new state
Changes:

Adds metrics to track deduplication job
Adds new column 'dedupe' to transactions table
Adds integration test
Adds create-tables.sh script to create all tables needed by hedera-etl
Rename hedera-etl-dataflow to hedera-etl-bigquery

Signed-off-by: Apekshit Sharma [email protected]
  • Loading branch information
apeksharma authored May 19, 2020
1 parent 4ea98c4 commit 45a6e26
Show file tree
Hide file tree
Showing 32 changed files with 1,039 additions and 50 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Duplicates, if inserted, are removed using a deduplication task.
### BigQuery

Schema for BigQuery table to store Hedera transactions is in
[schema.json](hedera-etl-dataflow/src/main/resources/schema.json) file. Please refer corresponding fields'
[schema.json](hedera-etl-bigquery/src/main/resources/schema.json) file. Please refer corresponding fields'
documentation in [hedera-protobuf](https://github.com/hashgraph/hedera-protobuf/tree/master/src/main/proto) for more
info about columns.

Expand All @@ -45,7 +45,7 @@ bq mk \
--time_partitioning_type DAY \
--clustering_fields transactionType \
project_id:dataset.transactions \
hedera-etl-dataflow/src/main/resources/schema.json
hedera-etl-bigquery/src/main/resources/schema.json
```

###### Errors table
Expand All @@ -59,7 +59,7 @@ bq mk \
--table \
--description "Hedera ETL Errors" \
project_id:dataset.errors \
hedera-etl-dataflow/src/main/resources/errors_schema.json
hedera-etl-bigquery/src/main/resources/errors_schema.json
```

##### Deduplication state table
Expand Down Expand Up @@ -99,7 +99,7 @@ ERRORS_TABLE=${PROJECT_ID}:dataset.errors
#### Running locally

```bash
cd hedera-etl-dataflow
cd hedera-etl-bigquery

mvn compile exec:java -PdirectRunner -Dexec.args=" \
--inputSubscription=${SUBSCRIPTION}, \
Expand All @@ -119,7 +119,7 @@ PIPELINE_FOLDER=gs://${BUCKET_NAME}/pipelines/etl-pipeline
2. Build and upload template to GCS bucket

```bash
cd hedera-etl-dataflow
cd hedera-etl-bigquery

mvn compile exec:java \
-Dexec.args=" \
Expand Down
159 changes: 159 additions & 0 deletions hedera-deduplication-bigquery/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<?xml version="1.0" encoding="UTF-8" ?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<artifactId>hedera-deduplication-bigquery</artifactId>
<description>Removes duplicate rows in BigQuery table</description>
<modelVersion>4.0.0</modelVersion>
<name>Hedera BigQuery Deduplication</name>
<packaging>jar</packaging>

<parent>
<groupId>com.hedera</groupId>
<artifactId>hedera-etl</artifactId>
<version>0.0.1</version>
</parent>

<properties>
<lombok.version>1.18.12</lombok.version>
<micrometer.version>1.5.0</micrometer.version>
</properties>

<dependencies>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-stackdriver</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator-annotation-processor</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-bigquery</artifactId>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-dependencies</artifactId>
<version>1.2.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<configuration>
<excludedGroups>gcpBigquery</excludedGroups>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>gcpBigquery</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<configuration>
<groups>gcpBigquery</groups>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>build-info</goal>
</goals>
</execution>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>exec</classifier>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.hedera.dedupe;

/*-
* ‌
* Hedera ETL
* ​
* Copyright (C) 2020 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import com.google.cloud.bigquery.*;
import java.time.Instant;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@RequiredArgsConstructor
public class BigQueryHelper {
@Getter
private final BigQuery bigQuery;
private final DedupeProperties properties;
private final DedupeMetrics dedupeMetrics;

public TableResult runQuery(String query, String jobName) throws InterruptedException {
// timestamp is just for uniqueness
JobId jobId = JobId.of(properties.getProjectId(), "dedupe_" + jobName + "_" + Instant.now().getEpochSecond());
log.info("### Starting job {}", jobId.getJob());
log.info("Query: {}", query);
TableResult tableResult = bigQuery.query(QueryJobConfiguration.newBuilder(query).build(), jobId);
Job job = bigQuery.getJob(jobId);
dedupeMetrics.recordMetrics(jobName, job.getStatistics());
return tableResult;
}

public void ensureTableExists(String dataset, String tableName) {
Table table = bigQuery.getTable(dataset, tableName);
if (table == null) {
throw new IllegalArgumentException("Table does not exist : " + dataset + "." + tableName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.hedera.dedupe;

/*-
* ‌
* Hedera ETL
* ​
* Copyright (C) 2020 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;

@ConfigurationPropertiesScan
@SpringBootApplication
public class DedupeApplication {
public static void main(String[] args) {
SpringApplication.run(DedupeApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.hedera.dedupe;

/*-
* ‌
* Hedera ETL
* ​
* Copyright (C) 2020 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import com.google.api.gax.core.CredentialsProvider;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.stackdriver.StackdriverConfig;
import io.micrometer.stackdriver.StackdriverMeterRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableAsync
@Configuration
@RequiredArgsConstructor
public class DedupeConfiguration {

private final GcpProjectIdProvider projectIdProvider;
private final CredentialsProvider credentialsProvider;
private final DedupeProperties properties;

@Bean
StackdriverConfig stackdriverConfig() {
return new StackdriverConfig() {
@Override
public String projectId() {
return projectIdProvider.getProjectId();
}

// Setting "management.metrics.export.stackdriver: false" is not working
@Override
public boolean enabled() {
return properties.isMetricsEnabled();
}

@Override
public String get(String key) {
return null;
}

@Override
public CredentialsProvider credentials() {
return credentialsProvider;
}
};
}

@Bean
MeterRegistry stackdriverMeterRegistry(StackdriverConfig stackdriverConfig) {
return StackdriverMeterRegistry.builder(stackdriverConfig).build();
}

// Scheduling is disabled for testing
@Configuration
@EnableScheduling
@ConditionalOnProperty(prefix = "hedera.dedupe.scheduling", name = "enabled", matchIfMissing = true)
protected static class SchedulingConfiguration {
}
}
Loading

0 comments on commit 45a6e26

Please sign in to comment.