diff --git a/hedera-dedupe-bigquery/pom.xml b/hedera-dedupe-bigquery/pom.xml
new file mode 100644
index 0000000..5cd0956
--- /dev/null
+++ b/hedera-dedupe-bigquery/pom.xml
@@ -0,0 +1,159 @@
+
+
+
+ hedera-bigquery-deduplication
+ Deduplicates rows in BigQuery table
+ 4.0.0
+ Hedera BigQuery Deduplication
+ jar
+
+
+ com.hedera
+ hedera-etl
+ 0.0.1
+
+
+
+ 1.18.12
+ 1.5.0
+
+
+
+
+ io.micrometer
+ micrometer-registry-stackdriver
+ ${micrometer.version}
+
+
+ org.hibernate.validator
+ hibernate-validator
+
+
+ org.hibernate.validator
+ hibernate-validator-annotation-processor
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ compile
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-log4j2
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-gcp-starter-bigquery
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-gcp-dependencies
+ 1.2.2.RELEASE
+ pom
+ import
+
+
+
+
+
+
+ dev
+
+ true
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ default-test
+
+ gcpBigquery
+
+
+
+
+
+
+
+
+ gcpBigquery
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ default-test
+
+ gcpBigquery
+
+
+
+
+
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ build-info
+
+
+
+ repackage
+
+ repackage
+
+
+ exec
+
+
+
+
+
+
+
+
+
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/BigQueryHelper.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/BigQueryHelper.java
new file mode 100644
index 0000000..3413f16
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/BigQueryHelper.java
@@ -0,0 +1,56 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.google.cloud.bigquery.*;
+import java.time.Instant;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.stereotype.Component;
+
+@Log4j2
+@Component
+@RequiredArgsConstructor
+public class BigQueryHelper {
+ @Getter
+ private final BigQuery bigQuery;
+ private final DedupeProperties properties;
+ private final DedupeMetrics dedupeMetrics;
+
+ public TableResult runQuery(String query, String jobName) throws InterruptedException {
+ // timestamp is just for uniqueness
+ JobId jobId = JobId.of(properties.getProjectId(), "dedupe_" + jobName + "_" + Instant.now().getEpochSecond());
+ log.info("### Starting job {}", jobId.getJob());
+ log.info("Query: {}", query);
+ TableResult tableResult = bigQuery.query(QueryJobConfiguration.newBuilder(query).build(), jobId);
+ Job job = bigQuery.getJob(jobId);
+ dedupeMetrics.recordMetrics(jobName, job.getStatistics());
+ return tableResult;
+ }
+
+ public void ensureTableExists(String dataset, String tableName) {
+ Table table = bigQuery.getTable(dataset, tableName);
+ if (table == null) {
+ throw new IllegalArgumentException("Table does not exist : " + dataset + "." + tableName);
+ }
+ }
+}
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeApplication.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeApplication.java
new file mode 100644
index 0000000..5ac0648
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeApplication.java
@@ -0,0 +1,33 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
+
+@ConfigurationPropertiesScan
+@SpringBootApplication
+public class DedupeApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(DedupeApplication.class, args);
+ }
+}
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeConfiguration.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeConfiguration.java
new file mode 100644
index 0000000..5e851a4
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeConfiguration.java
@@ -0,0 +1,81 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.google.api.gax.core.CredentialsProvider;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.stackdriver.StackdriverConfig;
+import io.micrometer.stackdriver.StackdriverMeterRegistry;
+import lombok.RequiredArgsConstructor;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@EnableAsync
+@Configuration
+@RequiredArgsConstructor
+public class DedupeConfiguration {
+
+ private final GcpProjectIdProvider projectIdProvider;
+ private final CredentialsProvider credentialsProvider;
+ private final DedupeProperties properties;
+
+ @Bean
+ StackdriverConfig stackdriverConfig() {
+ return new StackdriverConfig() {
+ @Override
+ public String projectId() {
+ return projectIdProvider.getProjectId();
+ }
+
+ // Setting "management.metrics.export.stackdriver: false" is not working
+ @Override
+ public boolean enabled() {
+ return properties.isMetricsEnabled();
+ }
+
+ @Override
+ public String get(String key) {
+ return null;
+ }
+
+ @Override
+ public CredentialsProvider credentials() {
+ return credentialsProvider;
+ }
+ };
+ }
+
+ @Bean
+ MeterRegistry stackdriverMeterRegistry(StackdriverConfig stackdriverConfig) {
+ return StackdriverMeterRegistry.builder(stackdriverConfig).build();
+ }
+
+ // Scheduling is disabled for testing
+ @Configuration
+ @EnableScheduling
+ @ConditionalOnProperty(prefix = "hedera.dedupe.scheduling", name = "enabled", matchIfMissing = true)
+ protected static class SchedulingConfiguration {
+ }
+}
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeMetrics.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeMetrics.java
new file mode 100644
index 0000000..e425a6a
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeMetrics.java
@@ -0,0 +1,96 @@
+package com.hedera.dedupe;
+
+import com.google.cloud.bigquery.JobStatistics;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.Value;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.stereotype.Component;
+
+@Log4j2
+@Getter
+@Component
+class DedupeMetrics {
+ private final MeterRegistry meterRegistry;
+ private final Map jobMetrics;
+ private final AtomicLong startTimestampGauge;
+ private final AtomicLong endTimestampGauge;
+ private final AtomicLong delayGauge;
+ private final AtomicLong runtimeGauge;
+ private final Counter duplicatesCounter;
+ private final Counter invocationsCounter;
+ private final Counter failuresCounter;
+
+ public DedupeMetrics(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ this.jobMetrics = new HashMap<>();
+ this.startTimestampGauge = new AtomicLong(0L);
+ Gauge.builder("dedupe.start.timestamp", startTimestampGauge, AtomicLong::get)
+ .description("Start of dedupe window. Last endTimestamp + 1")
+ .baseUnit("ns")
+ .register(meterRegistry);
+ this.endTimestampGauge = new AtomicLong(0L);
+ Gauge.builder("dedupe.end.timestamp", endTimestampGauge, AtomicLong::get)
+ .description("consensusTimestamp of last row in dedupe window.")
+ .baseUnit("ns")
+ .register(meterRegistry);
+ this.runtimeGauge = new AtomicLong(0L);
+ Gauge.builder("dedupe.runtime", runtimeGauge, AtomicLong::get)
+ .description("Total time taken by single dedupe run")
+ .baseUnit("sec")
+ .register(meterRegistry);
+ this.delayGauge = new AtomicLong(0L);
+ Gauge.builder("dedupe.delay", delayGauge, AtomicLong::get)
+ .description("Delay in deduplication (now - startTimestamp)")
+ .baseUnit("sec")
+ .register(meterRegistry);
+ this.duplicatesCounter = Counter.builder("dedupe.duplicates.count")
+ .description("Count of duplicates found")
+ .register(meterRegistry);
+ this.invocationsCounter = Counter.builder("dedupe.invocations").register(meterRegistry);
+ this.failuresCounter = Counter.builder("dedupe.failures").register(meterRegistry);
+ }
+
+ public void recordMetrics(String jobName, JobStatistics.QueryStatistics queryStatistics) {
+ long runTime = queryStatistics.getEndTime() - queryStatistics.getStartTime();
+ Long affectedRows = queryStatistics.getNumDmlAffectedRows();
+ log.info("Job stats: runtime = {}ms, affected rows = {}", runTime, affectedRows);
+ jobMetrics.computeIfAbsent(jobName, k -> new JobMetrics(meterRegistry, k));
+ var metrics = jobMetrics.get(jobName);
+ metrics.getRuntimeGauge().set(runTime);
+ if (affectedRows != null) {
+ metrics.getAffectedRowsGauge().set(affectedRows);
+ }
+ }
+
+ // Individuals jobs (queries) may not run, so reset their metrics' to 0.
+ // All other metrics are always set on each invocation, so no need to reset them.
+ public void resetJobMetrics() {
+ jobMetrics.forEach((key, value) -> value.reset());
+ }
+
+ @Value
+ static class JobMetrics {
+ private final AtomicLong runtimeGauge;
+ private final AtomicLong affectedRowsGauge;
+
+ JobMetrics(MeterRegistry meterRegistry, String jobName) {
+ Collection tags = List.of(Tag.of("name", jobName));
+ runtimeGauge = meterRegistry.gauge("dedupe.job.runtime", tags, new AtomicLong(0L));
+ affectedRowsGauge = meterRegistry.gauge("dedupe.job.rows", tags, new AtomicLong(0L));
+ }
+
+ void reset() {
+ runtimeGauge.set(0L);
+ affectedRowsGauge.set(0L);
+ }
+ }
+}
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeProperties.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeProperties.java
new file mode 100644
index 0000000..5c4977f
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeProperties.java
@@ -0,0 +1,46 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import javax.validation.constraints.NotBlank;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+@Data
+@Validated
+@ConfigurationProperties("hedera.dedupe")
+public class DedupeProperties {
+
+ @NotBlank
+ private String projectId;
+
+ @NotBlank
+ private String datasetName;
+
+ // Can be blank if initStateTable is true
+ private String tableName;
+
+ @NotBlank
+ private String stateTableName;
+
+ private boolean metricsEnabled = false;
+}
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeRunner.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeRunner.java
new file mode 100644
index 0000000..4e6c272
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeRunner.java
@@ -0,0 +1,200 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import static com.hedera.dedupe.DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP;
+import static com.hedera.dedupe.Utility.toBigQueryTimestamp;
+
+import java.time.Duration;
+import java.time.Instant;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+/**
+ * Dedupe runner periodically executes BigQuery job to deduplicate rows.
+ * State across runs is maintained using a separate BigQuery table.
+ * If a run exceeds the configured 'fixedRate', next run will be queued and started only after previous one finishes.
+ *
+ * Deduplication algorithm:
+ * (consensusTimestamp is unique for each transaction)
+ * 1. Get state: startTimestamp = lastValidTimestamp + 1 (from stateTable). 0 if not set.
+ * 2. Flag rows not in the streaming buffer: UPDATE table SET dedupe = 1 WHERE ...;
+ * 3. Get endTimestamp: SELECT MAX(consensusTimestamp) FROM table WHERE dedupe IS NOT NULL ....
+ * 4. Check for duplicates in [startTimestamp , endTimestamp] window
+ * 5. Remove duplicates (if present)
+ * 6. Save state: Set lastValidTimestamp in state table to endTimestamp
+ */
+@Log4j2
+@Component
+public class DedupeRunner {
+ private static final String JOB_NAME_FLAG_ROW = "flag_dml_accessible_rows";
+ private static final String JOB_NAME_GET_END_TIMESTAMP = "get_end_timestamp";
+ private static final String JOB_NAME_GET_DUPLICATES = "get_duplicates";
+ private static final String JOB_NAME_REMOVE_DUPLICATES = "remove_duplicates";
+
+ private final BigQueryHelper bigQueryHelper;
+ private final String transactionsTable;
+ private final DedupeState dedupeState;
+ private final DedupeMetrics metrics;
+
+ public DedupeRunner(DedupeProperties properties, BigQueryHelper bigQueryHelper, DedupeState dedupeState,
+ DedupeMetrics dedupeMetrics) {
+ this.bigQueryHelper = bigQueryHelper;
+ this.dedupeState = dedupeState;
+ this.metrics = dedupeMetrics;
+ String datasetName = properties.getDatasetName();
+ bigQueryHelper.ensureTableExists(datasetName, properties.getTableName());
+ transactionsTable = properties.getProjectId() + "." + datasetName + "." + properties.getTableName();
+ }
+
+ @Scheduled(fixedRateString = "${hedera.dedupe.fixedRate:300000}") // default: 5 min
+ public void run() {
+ try {
+ Instant dedupeStart = Instant.now();
+ metrics.getInvocationsCounter().increment();
+ metrics.resetJobMetrics();
+ runDedupe();
+ metrics.getRuntimeGauge().set(Duration.between(dedupeStart, Instant.now()).getSeconds());
+ } catch (Exception e) {
+ log.error("Failed deduplication", e);
+ metrics.getFailuresCounter().increment();
+ }
+ }
+
+ private void runDedupe() throws Exception {
+ var state = dedupeState.getState();
+
+ // 1. Get state: startTimestamp = lastValidTimestamp + 1 (from stateTable). 0 if not set.
+ long startTimestamp = 0; // in nanos
+ if (state.containsKey(STATE_NAME_LAST_VALID_TIMESTAMP)) {
+ startTimestamp = state.get(STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue() + 1;
+ }
+ metrics.getStartTimestampGauge().set(startTimestamp);
+
+ // 2. Flag rows not in the streaming buffer: UPDATE table SET dedupe = 1 WHERE ...;
+ setDedupeState(startTimestamp);
+
+ // 3. Get endTimestamp: SELECT MAX(consensusTimestamp) FROM table WHERE dedupe IS NOT NULL ....
+ long endTimestamp = getEndTimestamp(startTimestamp);
+ if (endTimestamp == startTimestamp) {
+ return;
+ }
+
+ // 4. Check for duplicates in [startTimestamp , endTimestamp] window
+ boolean hasDuplicates = hasDuplicates(startTimestamp, endTimestamp);
+
+ // 5. Remove duplicates (if present)
+ if (hasDuplicates) {
+ removeDuplicates(startTimestamp, endTimestamp);
+ }
+
+ // 6. Save state: Set lastValidTimestamp in state table to endTimestamp
+ dedupeState.setState(endTimestamp);
+
+ metrics.getDelayGauge().set(
+ Duration.between(Instant.ofEpochSecond(0L, startTimestamp), Instant.now()).getSeconds());
+ }
+
+ private void setDedupeState(long startTimestamp) throws Exception {
+ String query = String.format("UPDATE %s SET dedupe = 1 WHERE %s",
+ transactionsTable, consensusTimestampGte(startTimestamp));
+ bigQueryHelper.runQuery(query, JOB_NAME_FLAG_ROW);
+ }
+
+ private long getEndTimestamp(long startTimestamp) throws Exception {
+ String query = String.format("SELECT MAX(consensusTimestamp) AS ts \n" +
+ "FROM %s \n" +
+ "WHERE %s AND dedupe IS NOT NULL",
+ transactionsTable, consensusTimestampGte(startTimestamp));
+ var tableResult = bigQueryHelper.runQuery(query, JOB_NAME_GET_END_TIMESTAMP);
+ long endTimestamp = 0L;
+ for (var fvl : tableResult.iterateAll()) {
+ if (fvl.get("ts").getValue() == null) {
+ log.info("No new rows");
+ return startTimestamp;
+ }
+ endTimestamp = fvl.get("ts").getLongValue();
+ }
+ metrics.getEndTimestampGauge().set(endTimestamp);
+ log.info("endTimestamp = {}", endTimestamp);
+ return endTimestamp;
+ }
+
+ // SELECT count(*) AS num, consensusTimestamp FROM table
+ // WHERE consensusTimestamp BETWEEN startTimestamp AND endTimestamp
+ // GROUP BY consensusTimestamp HAVING num > 1;
+ private boolean hasDuplicates(long startTimestamp, long endTimestamp) throws Exception {
+ String query = String.format("SELECT count(*) AS num, consensusTimestamp \n" +
+ " FROM %s \n" +
+ " WHERE %s \n" +
+ " GROUP BY consensusTimestamp HAVING num > 1",
+ transactionsTable, consensusTimestampBetween(startTimestamp, endTimestamp, ""));
+ var tableResult = bigQueryHelper.runQuery(query, JOB_NAME_GET_DUPLICATES);
+ if (tableResult.getTotalRows() == 0) {
+ log.info("No duplicates found");
+ return false;
+ }
+ log.info("Duplicates found");
+ log.info("consensusTimestamp, count");
+ long numDuplicates = 0;
+ for (var fvl : tableResult.iterateAll()) {
+ long num = fvl.get("num").getLongValue();
+ log.info("{}, {}", fvl.get("consensusTimestamp").getLongValue(), num);
+ numDuplicates += num - 1; // -1 for original
+ }
+ metrics.getDuplicatesCounter().increment(numDuplicates);
+ return true;
+ }
+
+ private void removeDuplicates(long startTimestamp, long endTimestamp) throws Exception {
+ String query = String.format("MERGE INTO %s AS dest \n" +
+ "USING ( \n" +
+ " SELECT k.* \n" +
+ " FROM ( \n" +
+ " SELECT ARRAY_AGG(original_data LIMIT 1)[OFFSET(0)] k \n" +
+ " FROM %s AS original_data \n" +
+ " WHERE %s \n" +
+ " GROUP BY consensusTimestamp \n" +
+ " ) \n" +
+ ") AS src \n" +
+ "ON FALSE \n" +
+ "WHEN NOT MATCHED BY SOURCE AND %s -- remove all data in partition range \n" +
+ " THEN DELETE \n" +
+ "WHEN NOT MATCHED BY TARGET THEN INSERT ROW",
+ transactionsTable, transactionsTable, consensusTimestampBetween(startTimestamp, endTimestamp, ""),
+ consensusTimestampBetween(startTimestamp, endTimestamp, "dest"));
+ bigQueryHelper.runQuery(query, JOB_NAME_REMOVE_DUPLICATES);
+ }
+
+ private String consensusTimestampGte(long timestamp) {
+ return String.format("(consensusTimestamp >= %d AND consensusTimestampTruncated >= '%s')", timestamp,
+ toBigQueryTimestamp(timestamp));
+ }
+
+ private String consensusTimestampBetween(long startTimestamp, long endTimestamp, String alias) {
+ alias = alias.isEmpty() ? alias : alias + ".";
+ return String.format(
+ "(%sconsensusTimestamp BETWEEN %d AND %d) AND (%sconsensusTimestampTruncated BETWEEN '%s' AND '%s')",
+ alias, startTimestamp, endTimestamp,
+ alias, toBigQueryTimestamp(startTimestamp), toBigQueryTimestamp(endTimestamp));
+ }
+}
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeState.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeState.java
new file mode 100644
index 0000000..316e4e6
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/DedupeState.java
@@ -0,0 +1,82 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.google.cloud.bigquery.*;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.stereotype.Component;
+
+/**
+ * Reads/writes deduplication job's state from/to BigQuery table.
+ */
+@Log4j2
+@Component
+public class DedupeState {
+ public static final String STATE_NAME_LAST_VALID_TIMESTAMP = "lastValidTimestamp";
+ public static final String JOB_NAME_SAVE_STATE = "save_state";
+
+ private final BigQueryHelper bigQueryHelper;
+ private final TableId stateTableId;
+ private final String stateTable;
+
+ public DedupeState(DedupeProperties properties, BigQueryHelper bigQueryHelper) {
+ this.bigQueryHelper = bigQueryHelper;
+ String datasetName = properties.getDatasetName();
+ String tableName = properties.getStateTableName();
+ stateTableId = TableId.of(datasetName, tableName);
+ stateTable = properties.getProjectId() + "." + datasetName + "." + tableName;
+ bigQueryHelper.ensureTableExists(datasetName, tableName);
+ }
+
+ /**
+ * @return state read from the given table. Map's key is state field's name and map's value is state field's value.
+ */
+ public Map getState() {
+ TableResult tableResult = bigQueryHelper.getBigQuery().listTableData(stateTableId,
+ Schema.of(Field.of("name", LegacySQLTypeName.STRING), Field.of("value", StandardSQLTypeName.STRING)));
+ if (tableResult.getTotalRows() == 0) { // state not initialized
+ return Map.of();
+ }
+ Map state = new HashMap<>();
+ // okay to iterateAll since state will contain couple rows at max.
+ tableResult.iterateAll().forEach(fvl -> {
+ state.put(fvl.get("name").getStringValue(), fvl.get("value"));
+ });
+ return state;
+ }
+
+ public void setState(long endTimestamp) throws Exception {
+ // State variable may not be present in stateTable already (for UPDATE). For example, adding new state
+ // variables. Also, deprecating state variable would require deleting it.
+ // Although the query *looks* complicated, this is just atomically resetting state table to the given values.
+ String query = String.format("MERGE INTO %s \n" +
+ "USING ( \n" +
+ " SELECT '%s' AS name, '%d' AS value \n" +
+ ") \n" +
+ "ON FALSE \n" +
+ "WHEN NOT MATCHED BY SOURCE THEN DELETE \n" +
+ "WHEN NOT MATCHED BY TARGET THEN INSERT ROW \n",
+ stateTable, STATE_NAME_LAST_VALID_TIMESTAMP, endTimestamp);
+ bigQueryHelper.runQuery(query, JOB_NAME_SAVE_STATE);
+ }
+}
diff --git a/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/Utility.java b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/Utility.java
new file mode 100644
index 0000000..ed429c3
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/java/com/hedera/dedupe/Utility.java
@@ -0,0 +1,32 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import java.time.Instant;
+
+public class Utility {
+
+ static String toBigQueryTimestamp(long timestamp) {
+ return Instant.ofEpochSecond(0L, (timestamp / 1000) * 1000).toString();
+ }
+
+
+}
diff --git a/hedera-dedupe-bigquery/src/main/resources/application.yml b/hedera-dedupe-bigquery/src/main/resources/application.yml
new file mode 100644
index 0000000..0b6848d
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/resources/application.yml
@@ -0,0 +1,8 @@
+spring:
+ cloud:
+ gcp:
+ credentials:
+ location: file:/Users/appy/git/hedera-mirror-node/gcp_keys/appy-dev-272020-dedupe.json
+ bigquery:
+ projectId: ${hedera.dedupe.projectId}
+ datasetName: ${hedera.dedupe.datasetName}
\ No newline at end of file
diff --git a/hedera-dedupe-bigquery/src/main/resources/banner.txt b/hedera-dedupe-bigquery/src/main/resources/banner.txt
new file mode 100644
index 0000000..5d779ab
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/resources/banner.txt
@@ -0,0 +1,12 @@
+
+ @@@@@@@@@@@@@
+ @@@@@@@@@@@@@@@@@@@
+ @@@@@ 0@@@@@@@@ @@@@@ @@@ @@@ @@@
+ @@@@@@@ @@@@@@@@@ @@@@@@@ @@@ @@@ @@@
+ @@@@@@@@ @@@@@@@@@ @@@@@@@@ @@@ @@@ @@@@@@ @@@@@@@@@ @@@@@@ @@@@@@@@ @@@@@@
+ @@@@@@@@ @@@@@@@@ @@@@@@@@@@@@@@ @@@ @@@ @@@ @@@ @@@ @@@ @@@@@ @@@
+ @@@@@@@@ @@@@@@@@@ @@@@@@@@ @@@ @@@ @@@@@@@@@ @@@ @@@ 8@@@@@@@@ @@@ @@@@@@@@
+ @@@@@@@ @@@@@@@@@ @@@@@@@ @@@ @@@ @@@ @@@ @@@ @@@ @@@ @@@ @@
+ @@@@@ @@@@@@@@@ @@@@@ @@@ @@@ @@@@@@@ @@@@@@@@ @@@@@@@ @@@ @@@@@@@@
+ @@@@@@@@@@@@@@@@@@@
+ @@@@@@@@@@@@@ BigQuery Deduplication ${application.formatted-version}
diff --git a/hedera-dedupe-bigquery/src/main/resources/log4j2.xml b/hedera-dedupe-bigquery/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..f4f8765
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/main/resources/log4j2.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hedera-dedupe-bigquery/src/test/java/com/hedera/dedupe/DedupeIntegrationTest.java b/hedera-dedupe-bigquery/src/test/java/com/hedera/dedupe/DedupeIntegrationTest.java
new file mode 100644
index 0000000..1b25533
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/test/java/com/hedera/dedupe/DedupeIntegrationTest.java
@@ -0,0 +1,131 @@
+package com.hedera.dedupe;
+
+/*-
+ *
+ * Hedera ETL
+ *
+ * Copyright (C) 2020 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import java.util.Random;
+import javax.annotation.Resource;
+import lombok.extern.log4j.Log4j2;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+/**
+ * Due to lack of any fake/mock/emulator for BigQuery, this test requires GCP BigQuery.
+ * Setup:
+ * - Create transactions and state table. See documentation for more details.
+ * - Fill the properties in resources/application-default.yml
+ *
+ * Test is not run as part of 'mvn test'. To run the test, use following command:
+ * - mvn test -PgcpBigquery
+ */
+@Log4j2
+@SpringBootTest
+@Tag("gcpBigquery")
+public class DedupeIntegrationTest {
+ static final long NUM_ROWS = 100;
+
+ @Resource
+ protected DedupeRunner dedupeRunner;
+ @Resource
+ protected BigQueryHelper bigQueryHelper;
+ @Resource
+ protected DedupeProperties properties;
+ @Resource
+ private DedupeState dedupeState;
+
+ private String transactionsTable;
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ transactionsTable = properties.getProjectId() + "." + properties.getDatasetName()
+ + "." + properties.getTableName();
+ bigQueryHelper.runQuery("DELETE FROM " + transactionsTable + " WHERE 1 = 1", "reset_table");
+ dedupeState.setState(0);
+ }
+
+ @Test
+ void testDeduplication() throws Exception {
+ // add data
+ long expectedEndTimestamp = generateDuplicatedData(1, NUM_ROWS);
+
+ // run dedupe, check num rows and state.
+ dedupeRunner.run();
+ long actualNumRows = getNumRows(transactionsTable);
+ var state = dedupeState.getState();
+ assertEquals(NUM_ROWS, actualNumRows);
+ assertEquals(expectedEndTimestamp, state.get(DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue());
+
+ // add more data
+ expectedEndTimestamp = generateDuplicatedData(expectedEndTimestamp + 1, NUM_ROWS);
+
+ // run dedupe, check num rows and state.
+ dedupeRunner.run();
+ actualNumRows = getNumRows(transactionsTable);
+ state = dedupeState.getState();
+ assertEquals(2 * NUM_ROWS, actualNumRows);
+ assertEquals(expectedEndTimestamp, state.get(DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue());
+
+ // No new data.
+ // Run dedupe, check num rows and state.
+ dedupeRunner.run();
+ actualNumRows = getNumRows(transactionsTable);
+ state = dedupeState.getState();
+ assertEquals(2 * NUM_ROWS, actualNumRows);
+ assertEquals(expectedEndTimestamp, state.get(DedupeState.STATE_NAME_LAST_VALID_TIMESTAMP).getLongValue());
+ }
+
+ // Generates fake transaction rows starting with startTimestamp (in nanos). Every 5th row is duplicated.
+ // Returns timestamp of last row.
+ private long generateDuplicatedData(long startTimestamp, long numRows) throws Exception {
+ String query = "INSERT INTO " + transactionsTable
+ + " (consensusTimestampTruncated, consensusTimestamp) VALUES " + makeRow(startTimestamp);
+ long timestamp = startTimestamp;
+ Random rand = new Random();
+ for (int i = 1; i < numRows; i++) {
+ timestamp += rand.nextInt(1_000_000_000); // add nanos
+ String row = makeRow(timestamp);
+ query += ", " + row;
+ if (i % 5 == 0) { // duplicate every 5th row
+ query += ", " + row;
+ }
+ }
+ log.info("Inserting duplicated data: {}", query);
+ bigQueryHelper.runQuery(query, "insert_data");
+ return timestamp;
+ }
+
+ private String makeRow(long timestamp) {
+ return String.format("( '%s', %d )", Utility.toBigQueryTimestamp(timestamp), timestamp);
+ }
+
+ private long getNumRows(String tableName) throws Exception {
+ String query = "SELECT count(*) AS count FROM " + tableName;
+ var tableResult = bigQueryHelper.getBigQuery().query(QueryJobConfiguration.newBuilder(query).build());
+ for (var row : tableResult.iterateAll()) {
+ return row.get("count").getLongValue();
+ }
+ return 0;
+ }
+}
diff --git a/hedera-dedupe-bigquery/src/test/resources/application-default.yml b/hedera-dedupe-bigquery/src/test/resources/application-default.yml
new file mode 100644
index 0000000..1abe9dd
--- /dev/null
+++ b/hedera-dedupe-bigquery/src/test/resources/application-default.yml
@@ -0,0 +1,15 @@
+hedera:
+ dedupe:
+ projectId:
+ datasetName:
+ tableName:
+ stateTableName:
+ metricsEnabled: false
+ scheduling:
+ enabled: false # Dedupe runs are manually invoked in tests
+
+spring:
+ cloud:
+ gcp:
+ credentials:
+ location:
diff --git a/hedera-dedupe-bigquery/state-schema.json b/hedera-dedupe-bigquery/state-schema.json
new file mode 100644
index 0000000..8965ab7
--- /dev/null
+++ b/hedera-dedupe-bigquery/state-schema.json
@@ -0,0 +1,13 @@
+[
+ {
+ "name": "name",
+ "type": "STRING",
+ "mode": "REQUIRED",
+ "description": "Name of the state variable"
+ },
+ {
+ "name": "value",
+ "type": "STRING",
+ "description": "Value of the state variable"
+ }
+]
\ No newline at end of file
diff --git a/hedera-etl-dataflow/pom.xml b/hedera-etl-dataflow/pom.xml
index 02760ad..fbd86e2 100644
--- a/hedera-etl-dataflow/pom.xml
+++ b/hedera-etl-dataflow/pom.xml
@@ -20,7 +20,6 @@
2.20.0
v2-rev20181104-1.27.0
1.6.0
- RELEASE
2.13.1
1.18.12
v1-rev20181105-1.27.0
@@ -87,7 +86,7 @@
org.junit.jupiter
junit-jupiter
- ${junit.version}
+ ${junit-jupiter.version}
test
diff --git a/hedera-etl-dataflow/src/main/resources/schema.json b/hedera-etl-dataflow/src/main/resources/schema.json
index a7ac09b..1ef0a11 100644
--- a/hedera-etl-dataflow/src/main/resources/schema.json
+++ b/hedera-etl-dataflow/src/main/resources/schema.json
@@ -393,5 +393,10 @@
"type": "INTEGER"
}
]
+ },
+ {
+ "name": "dedupe",
+ "type": "INTEGER",
+ "description": "Used internally for deduplication of rows."
}
]
diff --git a/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java b/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java
index 7291e8f..f3f7704 100644
--- a/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java
+++ b/hedera-etl-dataflow/src/test/java/com/hedera/etl/TransactionJsonToTableRowTest.java
@@ -28,13 +28,13 @@
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
public class TransactionJsonToTableRowTest {
private final TransactionJsonToTableRow converter = new TransactionJsonToTableRow();
@Test
- public void testConversion() throws Exception {
+ void testConversion() throws Exception {
// Given
List jsonTransactions = readFileLines("data/TransactionJsonToTableRowTest/transactions.txt");
List expected = readFileLines("data/TransactionJsonToTableRowTest/expectedTableRows.txt");
@@ -51,7 +51,7 @@ public void testConversion() throws Exception {
@Test
- public void testThrowsExceptionForBadJson() throws Exception {
+ void testThrowsExceptionForBadJson() throws Exception {
// given
String badJson = "{\"consensusTimestamp\":1570802944412586000,\"entity\":{\"shardNum\":0,";
diff --git a/pom.xml b/pom.xml
index 1a7be37..6f887b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,6 +32,7 @@
hedera-etl-dataflow
+ hedera-dedupe-bigquery
@@ -39,25 +40,54 @@
https://hedera.com
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.7.RELEASE
+
+
+
0.8.5
11
- 3.7.0
+ 3.8.1
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+
+
+ ${java.version}
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ ${jacoco.version}
+
+
+
+ prepare-agent
+
+
+
+ jacoco-report
+ test
+
+ report
+
+
+
+
+
+
-
- org.apache.maven.plugins
- maven-compiler-plugin
- ${maven-compiler-plugin.version}
-
-
- ${java.version}
-
-
-
org.codehaus.mojo
@@ -79,27 +109,6 @@
-
-
- org.jacoco
- jacoco-maven-plugin
- ${jacoco.version}
-
-
-
- prepare-agent
-
-
-
- jacoco-report
- test
-
- report
-
-
-
-
-
diff --git a/scripts/create-tables.sh b/scripts/create-tables.sh
new file mode 100755
index 0000000..3305bba
--- /dev/null
+++ b/scripts/create-tables.sh
@@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+
+# Usage: PROJECT_ID=.. DATASET_NAME=... ./create-tables.sh
+# Optionally, table names can be set using TRANSACTIONS_TABLE_NAME, ERRORS_TABLE_NAME, DEDUPE_STATE_TABLE_NAME
+
+TRANSACTIONS_TABLE_NAME=${TRANSACTIONS_TABLE_NAME:-transactions}
+ERRORS_TABLE_NAME=${ERRORS_TABLE_NAME:-errors}
+DEDUPE_STATE_TABLE_NAME=${DEDUPE_STATE_TABLE_NAME:-dedupe_state}
+
+# Ensure `bq` cli exists
+if [[ `which bq` == "" ]]; then
+ echo "Couldn't find 'bq' cli. Make sure Cloud SDK is installed. https://cloud.google.com/sdk/docs#install_the_latest_cloud_sdk_version"
+ exit 1
+fi
+
+if [[ "${PROJECT_ID}" == "" ]]; then
+ echo "PROJECT_ID is not set"
+ exit 1
+fi
+
+if [[ "${DATASET_NAME}" == "" ]]; then
+ echo "DATASET_NAME is not set"
+ exit 1
+fi
+
+BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+
+bq mk \
+ --table \
+ --description "Hedera network transactions" \
+ --time_partitioning_field consensusTimestampTruncated \
+ --time_partitioning_type DAY \
+ --clustering_fields transactionType \
+ ${PROJECT_ID}:${DATASET_NAME}.${TRANSACTIONS_TABLE_NAME} \
+ ${BASE_DIR}/../hedera-etl-dataflow/src/main/resources/schema.json
+
+bq mk \
+ --table \
+ --description "Hedera ETL Errors" \
+ ${PROJECT_ID}:${DATASET_NAME}.${ERRORS_TABLE_NAME} \
+ ${BASE_DIR}/../hedera-etl-dataflow/src/main/resources/errors_schema.json
+
+bq mk \
+ --table \
+ --description "BigQuery deduplication job state" \
+ ${PROJECT_ID}:${DATASET_NAME}.${DEDUPE_STATE_TABLE_NAME} \
+ ${BASE_DIR}/../hedera-dedupe-bigquery/state-schema.json
+