diff --git a/api/pom.xml b/api/pom.xml index 77e75ea4..3e589d8e 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -6,7 +6,7 @@ ca.bc.gov.educ educ-grad-data-conversion-api - 1.8.45 + 1.8.46 educ-grad-data-conversion-api Ministry of Education GRAD DATA CONVERSION API diff --git a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java index 4dcdd207..126df85c 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java +++ b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/scheduler/JetStreamEventScheduler.java @@ -1,15 +1,15 @@ package ca.bc.gov.educ.api.dataconversion.scheduler; import ca.bc.gov.educ.api.dataconversion.choreographer.ChoreographEventHandler; +import ca.bc.gov.educ.api.dataconversion.entity.Event; import ca.bc.gov.educ.api.dataconversion.repository.EventRepository; +import ca.bc.gov.educ.api.dataconversion.util.EducGradDataConversionApiConstants; import lombok.extern.slf4j.Slf4j; import net.javacrumbs.shedlock.core.LockAssert; import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; - import static ca.bc.gov.educ.api.dataconversion.constant.EventStatus.DB_COMMITTED; /** @@ -27,6 +27,8 @@ public class JetStreamEventScheduler { private final ChoreographEventHandler choreographer; + private final EducGradDataConversionApiConstants constants; + /** * Instantiates a new Stan event scheduler. * @@ -34,26 +36,33 @@ public class JetStreamEventScheduler { * @param choreographer the choreographer */ public JetStreamEventScheduler(final EventRepository eventRepository, - final ChoreographEventHandler choreographer) { + final ChoreographEventHandler choreographer, + final EducGradDataConversionApiConstants constants) { this.eventRepository = eventRepository; this.choreographer = choreographer; + this.constants = constants; } @Scheduled(cron = "${cron.scheduled.process.events.stan.run}") // minimum = every 5 minute @SchedulerLock(name = "PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.stan.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.stan.lockAtMostFor}") public void findAndProcessEvents() { + log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor()); LockAssert.assertLocked(); final var results = this.eventRepository.findAllByEventStatusOrderByCreateDate(DB_COMMITTED.toString()); if (!results.isEmpty()) { - results.stream() - .filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5))) - .forEach(el -> { - try { - choreographer.handleEvent(el); - } catch (final Exception ex) { - log.error("Exception while trying to handle message", ex); - } - }); + int cnt = 0; + for (Event e : results) { + if (cnt++ >= constants.getTraxToGradProcessingThreshold()) { + log.info(" ==> Reached the processing threshold of {}", constants.getTraxToGradProcessingThreshold()); + break; + } + try { + choreographer.handleEvent(e); + } catch (final Exception ex) { + log.error("Exception while trying to handle message", ex); + } + } + log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: processing is completed"); } } diff --git a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java index a4665daf..b2f9274a 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java +++ b/api/src/main/java/ca/bc/gov/educ/api/dataconversion/util/EducGradDataConversionApiConstants.java @@ -206,4 +206,17 @@ public class EducGradDataConversionApiConstants { // Splunk LogHelper Enabled @Value("${splunk.log-helper.enabled}") private boolean splunkLogHelperEnabled; + + // Scheduler: ongoing updates from TRAX to GRAD + @Value("${cron.scheduled.process.events.stan.run}") + private String traxToGradCronRun; + + @Value("${cron.scheduled.process.events.stan.lockAtLeastFor}") + private String traxToGradLockAtLeastFor; + + @Value("${cron.scheduled.process.events.stan.lockAtMostFor}") + private String traxToGradLockAtMostFor; + + @Value("${cron.scheduled.process.events.stan.threshold}") + private int traxToGradProcessingThreshold; } diff --git a/api/src/main/resources/application.yaml b/api/src/main/resources/application.yaml index 89fa2551..a7179628 100644 --- a/api/src/main/resources/application.yaml +++ b/api/src/main/resources/application.yaml @@ -169,6 +169,7 @@ cron: run: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN} lockAtLeastFor: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_LOCK_AT_LEAST_FOR} lockAtMostFor: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_LOCK_AT_MOST_FOR} + threshold: ${CRON_SCHEDULED_PROCESS_EVENTS_STAN_THRESHOLD} #Endpoints endpoint: diff --git a/api/src/test/resources/application.yaml b/api/src/test/resources/application.yaml index 7af8e011..95a6fffe 100644 --- a/api/src/test/resources/application.yaml +++ b/api/src/test/resources/application.yaml @@ -89,6 +89,7 @@ cron: run: 0 0/5 * * * * lockAtLeastFor: 800ms lockAtMostFor: 900ms + threshold: 100 #Endpoints endpoint: