From a171648a91c295820271ceddd56d1f45d2551ef4 Mon Sep 17 00:00:00 2001 From: Jinil Sung Date: Tue, 19 Sep 2023 11:57:29 -0700 Subject: [PATCH] GRAD2-2334: task is completed GRAD2-2334: task is completed --- .../scheduler/JetStreamEventScheduler.java | 33 ++++++++++++------- .../EducGradDataConversionApiConstants.java | 13 ++++++++ api/src/main/resources/application.yaml | 1 + api/src/test/resources/application.yaml | 1 + 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java index 4dcdd207..126df85c 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java +++ b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java @@ -1,15 +1,15 @@ package ca.bc.gov.educ.api.dataconversion.scheduler; import ca.bc.gov.educ.api.dataconversion.choreographer.ChoreographEventHandler; +import ca.bc.gov.educ.api.dataconversion.entity.Event; import ca.bc.gov.educ.api.dataconversion.repository.EventRepository; +import ca.bc.gov.educ.api.dataconversion.util.EducGradDataConversionApiConstants; import lombok.extern.slf4j.Slf4j; import net.javacrumbs.shedlock.core.LockAssert; import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; - import static ca.bc.gov.educ.api.dataconversion.constant.EventStatus.DB_COMMITTED; /** @@ -27,6 +27,8 @@ public class JetStreamEventScheduler { private final ChoreographEventHandler choreographer; + private final EducGradDataConversionApiConstants constants; + /** * Instantiates a new Stan event scheduler. * @@ -34,26 +36,33 @@ public class JetStreamEventScheduler { * @param choreographer the choreographer */ public JetStreamEventScheduler(final EventRepository eventRepository, - final ChoreographEventHandler choreographer) { + final ChoreographEventHandler choreographer, + final EducGradDataConversionApiConstants constants) { this.eventRepository = eventRepository; this.choreographer = choreographer; + this.constants = constants; } @Scheduled(cron = "${cron.scheduled.process.events.stan.run}") // minimum = every 5 minute @SchedulerLock(name = "PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.stan.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.stan.lockAtMostFor}") public void findAndProcessEvents() { + log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor()); LockAssert.assertLocked(); final var results = this.eventRepository.findAllByEventStatusOrderByCreateDate(DB_COMMITTED.toString()); if (!results.isEmpty()) { - results.stream() - .filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5))) - .forEach(el -> { - try { - choreographer.handleEvent(el); - } catch (final Exception ex) { - log.error("Exception while trying to handle message", ex); - } - }); + int cnt = 0; + for (Event e : results) { + if (cnt++ >= constants.getTraxToGradProcessingThreshold()) { + log.info(" ==> Reached the processing threshold of {}", constants.getTraxToGradProcessingThreshold()); + break; + } + try { + choreographer.handleEvent(e); + } catch (final Exception ex) { + log.error("Exception while trying to handle message", ex); + } + } + log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: processing is completed"); } } diff --git a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java index a4665daf..b2f9274a 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java +++ b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java @@ -206,4 +206,17 @@ public class EducGradDataConversionApiConstants { // Splunk LogHelper Enabled @Value("${splunk.log-helper.enabled}") private boolean splunkLogHelperEnabled; + + // Scheduler: ongoing updates from TRAX to GRAD + @Value("${cron.scheduled.process.events.stan.run}") + private String traxToGradCronRun; + + @Value("${cron.scheduled.process.events.stan.lockAtLeastFor}") + private String traxToGradLockAtLeastFor; + + @Value("${cron.scheduled.process.events.stan.lockAtMostFor}") + private String traxToGradLockAtMostFor; + + @Value("${cron.scheduled.process.events.stan.threshold}") + private int traxToGradProcessingThreshold; } diff --git a/api/src/main/resources/application.yaml b/api/src/main/resources/application.yaml index 89fa2551..a7179628 100644 --- a/api/src/main/resources/application.yaml +++ b/api/src/main/resources/application.yaml @@ -169,6 +169,7 @@ cron: run: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN} lockAtLeastFor: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_LOCK_AT_LEAST_FOR} lockAtMostFor: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_LOCK_AT_MOST_FOR} + threshold: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_THRESHOLD} #Endpoints endpoint: diff --git a/api/src/test/resources/application.yaml b/api/src/test/resources/application.yaml index 7af8e011..95a6fffe 100644 --- a/api/src/test/resources/application.yaml +++ b/api/src/test/resources/application.yaml @@ -89,6 +89,7 @@ cron: run: 0 0/5 * * * * lockAtLeastFor: 800ms lockAtMostFor: 900ms + threshold: 100 #Endpoints endpoint: