From 0197311f5aa2becb222c6de2b228d68cb8cbf5f7 Mon Sep 17 00:00:00 2001 From: Jinil Sung Date: Thu, 19 Sep 2024 14:22:47 -0700 Subject: [PATCH] GRAD2-2968: enable multi-partitioning to regenerate school reports in parallel. (#531) GRAD2-2968: enable multi-partitioning to regenerate school reports in parallel. --- .../config/BatchJobConfig.java | 5 +- ...ReportsCompletionNotificationListener.java | 35 ++++---- .../model/SchoolReportsRegenSummaryDTO.java | 5 +- .../RegenerateSchoolReportsProcessor.java | 31 +++---- .../reader/BaseSchoolReader.java | 11 --- .../reader/EDWSnapshotSchoolReader.java | 5 +- .../RegenerateSchoolReportsPartitioner.java | 81 +++++++++++-------- .../reader/RegenerateSchoolReportsReader.java | 32 +++----- .../writer/RegenerateSchoolReportsWriter.java | 11 +-- 9 files changed, 102 insertions(+), 114 deletions(-) diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/config/BatchJobConfig.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/config/BatchJobConfig.java index f9e0da7c..ab792dd0 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/config/BatchJobConfig.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/config/BatchJobConfig.java @@ -1074,7 +1074,7 @@ public RegenerateSchoolReportsWriter itemWriterSchoolReportsRegen() { @Bean public Step schoolReportsRegenJobStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, SkipSQLTransactionExceptionsListener skipListener) { return new StepBuilder("schoolReportsRegenJobStep", jobRepository) - ., List>chunk(1, transactionManager) + .chunk(1, transactionManager) .reader(itemReaderSchoolReportsRegen()) .processor(itemProcessorSchoolReportsRegen()) .writer(itemWriterSchoolReportsRegen()) @@ -1086,10 +1086,11 @@ public Step schoolReportsRegenJobStep(JobRepository jobRepository, PlatformTrans @Bean public Step masterStepSchoolReportsRegen(JobRepository jobRepository, PlatformTransactionManager transactionManager, EducGradBatchGraduationApiConstants constants, SkipSQLTransactionExceptionsListener skipListener) { + int partitionSize = constants.getNumberOfPartitions() / 2; return new StepBuilder("masterStepSchoolReportsRegen", jobRepository) .partitioner(schoolReportsRegenJobStep(jobRepository, transactionManager, skipListener).getName(), partitionerSchoolReportsRegen()) .step(schoolReportsRegenJobStep(jobRepository, transactionManager, skipListener)) - .gridSize(constants.getNumberOfPartitions()) + .gridSize(partitionSize != 0? partitionSize : 1) .taskExecutor(taskExecutor()) .build(); } diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/listener/RegenSchoolReportsCompletionNotificationListener.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/listener/RegenSchoolReportsCompletionNotificationListener.java index 0093e29c..bcd2f39f 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/listener/RegenSchoolReportsCompletionNotificationListener.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/listener/RegenSchoolReportsCompletionNotificationListener.java @@ -1,18 +1,17 @@ package ca.bc.gov.educ.api.batchgraduation.listener; +import ca.bc.gov.educ.api.batchgraduation.entity.BatchGradAlgorithmJobHistoryEntity; +import ca.bc.gov.educ.api.batchgraduation.model.BaseSummaryDTO; import ca.bc.gov.educ.api.batchgraduation.model.SchoolReportsRegenSummaryDTO; import ca.bc.gov.educ.api.batchgraduation.util.DateUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; -import org.springframework.batch.core.JobParameters; import org.springframework.batch.item.ExecutionContext; import org.springframework.stereotype.Component; import java.util.Date; -import static ca.bc.gov.educ.api.batchgraduation.util.EducGradBatchGraduationApiConstants.SEARCH_REQUEST; - @Slf4j @Component public class RegenSchoolReportsCompletionNotificationListener extends BaseRegenSchoolReportsCompletionNotificationListener { @@ -22,34 +21,38 @@ public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { long elapsedTimeMillis = getElapsedTimeMillis(jobExecution); log.info("======================================================================================="); - JobParameters jobParameters = jobExecution.getJobParameters(); ExecutionContext jobContext = jobExecution.getExecutionContext(); Long jobExecutionId = jobExecution.getId(); - String jobType = jobParameters.getString("jobType"); - log.info("{} Regen School Reports Job {} completed in {} s with jobExecution status {}", jobType, jobExecutionId, elapsedTimeMillis / 1000, jobExecution.getStatus()); + log.info("Regen School Reports Job {} completed in {} s with jobExecution status {}", jobExecutionId, elapsedTimeMillis / 1000, jobExecution.getStatus()); String status = jobExecution.getStatus().toString(); - Date startTime = DateUtils.toDate(jobExecution.getStartTime()); Date endTime = DateUtils.toDate(jobExecution.getEndTime()); - String jobTrigger = jobParameters.getString("jobTrigger"); SchoolReportsRegenSummaryDTO summaryDTO = (SchoolReportsRegenSummaryDTO) jobContext.get("schoolReportsRegenSummaryDTO"); - String studentSearchRequest = jobParameters.getString(SEARCH_REQUEST, "{}"); // display Summary Details - log.info("Records read : {}", summaryDTO.getReadCount()); + assert summaryDTO != null; + log.info("Records read : {}", summaryDTO.getReadCount()); log.info("Processed count: {}", summaryDTO.getProcessedCount()); log.info(" --------------------------------------------------------------------------------------"); log.info("Errors:{}", summaryDTO.getErrors().size()); + log.info(" --------------------------------------------------------------------------------------"); + summaryDTO.getSchools().forEach(value -> log.debug("School Report regenerated for {}", value.getMincode())); + // save batch job & error history + saveBatchJobHistory(summaryDTO, jobExecutionId, status, endTime); - updateUserSchedulingJobs(jobParameters); + } + } - String jobParametersDTO = buildJobParametersDTO(jobType, studentSearchRequest, null, null); - // save batch job & error history - processBatchJobHistory(summaryDTO, jobExecutionId, status, jobTrigger, jobType, startTime, endTime, jobParametersDTO); - log.info(" --------------------------------------------------------------------------------------"); - summaryDTO.getSchools().forEach((value) -> log.info("School {} number of Regen School Reports : {}", value.getMincode(), value.getNumberOfSchoolReports())); + private void saveBatchJobHistory(BaseSummaryDTO summaryDTO, Long jobExecutionId, String status, Date endTime) { + BatchGradAlgorithmJobHistoryEntity ent = gradBatchHistoryService.getGradAlgorithmJobHistory(jobExecutionId); + if (ent != null) { + ent.setActualStudentsProcessed(summaryDTO.getProcessedCount()); + ent.setFailedStudentsProcessed((int) summaryDTO.getErroredCount()); + ent.setEndTime(DateUtils.toLocalDateTime(endTime)); + ent.setStatus(status); + gradBatchHistoryService.saveGradAlgorithmJobHistory(ent); } } } diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/model/SchoolReportsRegenSummaryDTO.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/model/SchoolReportsRegenSummaryDTO.java index b2acaa3b..8d498e1f 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/model/SchoolReportsRegenSummaryDTO.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/model/SchoolReportsRegenSummaryDTO.java @@ -2,7 +2,6 @@ import lombok.Data; import lombok.NoArgsConstructor; -import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; @@ -11,9 +10,11 @@ @NoArgsConstructor public class SchoolReportsRegenSummaryDTO extends BaseSummaryDTO { + private String reportBatchType; // REGALG or TVRRUN + private List errors = new ArrayList<>(); private List globalList = new ArrayList<>(); private List schools = new ArrayList<>(); - private StudentSearchRequest studentSearchRequest; } + diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/processor/RegenerateSchoolReportsProcessor.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/processor/RegenerateSchoolReportsProcessor.java index ddae55f7..77b3f10d 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/processor/RegenerateSchoolReportsProcessor.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/processor/RegenerateSchoolReportsProcessor.java @@ -7,11 +7,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import java.util.List; -import java.util.stream.Collectors; - @Slf4j -public class RegenerateSchoolReportsProcessor implements ItemProcessor, List> { +public class RegenerateSchoolReportsProcessor implements ItemProcessor { @Autowired RestUtils restUtils; @@ -19,27 +16,19 @@ public class RegenerateSchoolReportsProcessor implements ItemProcessor process(List minCodes) throws Exception { - Long batchId = summaryDTO.getBatchId(); - StudentSearchRequest searchRequest = summaryDTO.getStudentSearchRequest(); - long countRegeneratedSchoolReports = 0l; - List reportTypes = searchRequest.getReportTypes(); + public String process(String minCode) throws Exception { + summaryDTO.setBatchId(batchId); if(log.isDebugEnabled()) { - log.debug("Process Schools: {}", !minCodes.isEmpty() ? String.join(",", minCodes) : summaryDTO.getSchools().stream().map(School::getMincode).collect(Collectors.joining(","))); + log.debug("Processing {} School Report: {} ", summaryDTO.getReportBatchType(), minCode); } - String reportType; - if(reportTypes != null && !reportTypes.isEmpty() && "NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0) - reportType = "TVRRUN"; - else - reportType = "REGALG"; - - for (String minCode : minCodes) { - countRegeneratedSchoolReports += restUtils.createAndStoreSchoolReports(minCode, reportType, summaryDTO); - } + long countRegeneratedSchoolReports = restUtils.createAndStoreSchoolReports(minCode, summaryDTO.getReportBatchType(), summaryDTO); - summaryDTO.setProcessedCount(countRegeneratedSchoolReports); - return minCodes; + summaryDTO.setProcessedCount(summaryDTO.getProcessedCount() + countRegeneratedSchoolReports); + return minCode; } } diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/BaseSchoolReader.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/BaseSchoolReader.java index 705bb1d8..f5646549 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/BaseSchoolReader.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/BaseSchoolReader.java @@ -1,7 +1,5 @@ package ca.bc.gov.educ.api.batchgraduation.reader; -import ca.bc.gov.educ.api.batchgraduation.model.EdwSnapshotSchoolSummaryDTO; -import ca.bc.gov.educ.api.batchgraduation.model.ResponseObj; import ca.bc.gov.educ.api.batchgraduation.rest.RestUtils; import org.springframework.batch.core.JobExecution; import org.springframework.batch.item.ItemReader; @@ -22,16 +20,7 @@ public abstract class BaseSchoolReader implements ItemReader { @Value("#{stepExecutionContext['data']}") List schools; - @Value("#{stepExecutionContext['summary']}") - EdwSnapshotSchoolSummaryDTO summaryDTO; - @Value("#{stepExecution.jobExecution}") JobExecution jobExecution; - protected void fetchAccessToken() { - ResponseObj res = restUtils.getTokenResponseObject(); - if (res != null) { - summaryDTO.setAccessToken(res.getAccess_token()); - } - } } \ No newline at end of file diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/EDWSnapshotSchoolReader.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/EDWSnapshotSchoolReader.java index 7177e06c..afc398ea 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/EDWSnapshotSchoolReader.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/EDWSnapshotSchoolReader.java @@ -4,16 +4,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemReader; +import org.springframework.beans.factory.annotation.Value; public class EDWSnapshotSchoolReader extends BaseSchoolReader implements ItemReader { private static final Logger LOGGER = LoggerFactory.getLogger(EDWSnapshotSchoolReader.class); + @Value("#{stepExecutionContext['summary']}") + EdwSnapshotSchoolSummaryDTO summaryDTO; + @Override public String read() throws Exception { String nextSchool = null; if (nextSchoolForProcessing < schools.size()) { - fetchAccessToken(); nextSchool = schools.get(nextSchoolForProcessing); LOGGER.info("School: {} - {} of {}", nextSchool, nextSchoolForProcessing + 1, summaryDTO.getReadCount()); nextSchoolForProcessing++; diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsPartitioner.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsPartitioner.java index 2c8adc80..f712d01b 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsPartitioner.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsPartitioner.java @@ -11,8 +11,6 @@ import java.util.*; import java.util.stream.Collectors; -import static ca.bc.gov.educ.api.batchgraduation.util.EducGradBatchGraduationApiConstants.SEARCH_REQUEST; - @Slf4j public class RegenerateSchoolReportsPartitioner extends BasePartitioner { @@ -51,22 +49,19 @@ public Map partition(int gridSize) { } summaryDTO.setBatchId(jobExecution.getId()); - summaryDTO.setStudentSearchRequest(searchRequest); + summaryDTO.setReportBatchType(determineReportBatchType(searchRequest.getReportTypes())); Long totalSchoolReportsCount = 0L; - List reportTypes = searchRequest.getReportTypes(); Long schoolReportsCount = 0L; List finalSchoolDistricts = new ArrayList<>(); - List schoolReportsLite = new ArrayList<>(); + List schoolReportsLite; if(processAllReports) { - if (reportTypes != null && !reportTypes.isEmpty()) { - if ("NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0) { - schoolReportsLite = restUtils.getSchoolReportsLiteByReportType("NONGRADPRJ", summaryDTO); - } else { - schoolReportsLite = restUtils.getSchoolReportsLiteByReportType( "GRADREG", summaryDTO); - } + if ("TVRRUN".compareToIgnoreCase(summaryDTO.getReportBatchType()) == 0) { + schoolReportsLite = restUtils.getSchoolReportsLiteByReportType("NONGRADPRJ", summaryDTO); + } else { + schoolReportsLite = restUtils.getSchoolReportsLiteByReportType( "GRADREG", summaryDTO); } if (schoolReportsLite != null && !schoolReportsLite.isEmpty()) { @@ -81,21 +76,19 @@ public Map partition(int gridSize) { totalSchoolReportsCount += finalSchoolDistricts.size(); } else { for (String schoolOfRecord : schoolDistricts) { - if (reportTypes != null && !reportTypes.isEmpty()) { - if ("NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0) { - schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "NONGRADPRJ", summaryDTO); - } else { - schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "GRADREG", summaryDTO); - } - if (schoolReportsCount > 0) { - finalSchoolDistricts.add(schoolOfRecord); - School school = new School(schoolOfRecord); - school.setNumberOfSchoolReports(schoolReportsCount); - summaryDTO.getSchools().add(school); - totalSchoolReportsCount += schoolReportsCount; - } - schoolReportsCount = 0L; + if ("TVRRUN".compareToIgnoreCase(summaryDTO.getReportBatchType()) == 0) { + schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "NONGRADPRJ", summaryDTO); + } else { + schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "GRADREG", summaryDTO); + } + if (schoolReportsCount > 0) { + finalSchoolDistricts.add(schoolOfRecord); + School school = new School(schoolOfRecord); + school.setNumberOfSchoolReports(schoolReportsCount); + summaryDTO.getSchools().add(school); + totalSchoolReportsCount += schoolReportsCount; } + schoolReportsCount = 0L; } } @@ -103,19 +96,37 @@ public Map partition(int gridSize) { long diff = (endTime - startTime)/1000; log.debug("Total {} schools after filters in {} sec", finalSchoolDistricts.size(), diff); - updateBatchJobHistory(createBatchJobHistory(), totalSchoolReportsCount); summaryDTO.setReadCount(totalSchoolReportsCount); summaryDTO.setProcessedCount(0); - Map map = new HashMap<>(); - ExecutionContext executionContext = new ExecutionContext(); - executionContext.put(SEARCH_REQUEST, searchRequest); - executionContext.put("data", finalSchoolDistricts); - executionContext.put("summary", summaryDTO); - executionContext.put("readCount", 0); - map.put("partition0", executionContext); + if (!finalSchoolDistricts.isEmpty()) { + updateBatchJobHistory(createBatchJobHistory(), totalSchoolReportsCount); + int partitionSize = finalSchoolDistricts.size()/gridSize + 1; + List> partitions = new LinkedList<>(); + for (int i = 0; i < finalSchoolDistricts.size(); i += partitionSize) { + partitions.add(finalSchoolDistricts.subList(i, Math.min(i + partitionSize, finalSchoolDistricts.size()))); + } + Map map = new HashMap<>(partitions.size()); + for (int i = 0; i < partitions.size(); i++) { + ExecutionContext executionContext = new ExecutionContext(); + SchoolReportsRegenSummaryDTO partitionSummaryDTO = new SchoolReportsRegenSummaryDTO(); + partitionSummaryDTO.setReportBatchType(summaryDTO.getReportBatchType()); + List data = partitions.get(i); + executionContext.put("data", data); + partitionSummaryDTO.setReadCount(data.size()); + executionContext.put("summary", partitionSummaryDTO); + executionContext.put("index",0); + String key = "partition" + i; + map.put(key, executionContext); + } + log.info("Found {} in total running on {} partitions",finalSchoolDistricts.size(),map.size()); + return map; + } + log.info("No Schools Found for School Reports Regeneration"); + return new HashMap<>(); + } - log.info("Found {} in total running on 1 partitions", totalSchoolReportsCount); - return map; + private String determineReportBatchType(List reportTypes) { + return reportTypes != null && !reportTypes.isEmpty() && "NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0 ? "TVRRUN" : "REGALG"; } } diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsReader.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsReader.java index 4b6016c1..b7eb1976 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsReader.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/reader/RegenerateSchoolReportsReader.java @@ -2,36 +2,26 @@ import ca.bc.gov.educ.api.batchgraduation.model.SchoolReportsRegenSummaryDTO; import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.core.JobExecution; import org.springframework.batch.item.ItemReader; import org.springframework.beans.factory.annotation.Value; -import java.util.List; - @Slf4j -public class RegenerateSchoolReportsReader implements ItemReader> { - - @Value("#{stepExecutionContext['data']}") - List schools; +public class RegenerateSchoolReportsReader extends BaseSchoolReader implements ItemReader { @Value("#{stepExecutionContext['summary']}") SchoolReportsRegenSummaryDTO summaryDTO; - @Value("#{stepExecution.jobExecution}") - JobExecution jobExecution; - - @Value("#{stepExecutionContext['readCount']}") - Long readCount; - @Override - public List read() throws Exception { - if(readCount > 0) return null; - readCount++; - if(log.isDebugEnabled()) { - log.info("Read schools Codes -> {} of {} schools", readCount, schools.size()); + public String read() throws Exception { + String nextSchool = null; + if (nextSchoolForProcessing < schools.size()) { + nextSchool = schools.get(nextSchoolForProcessing); + log.info("School: {} - {} of {}", nextSchool, nextSchoolForProcessing + 1, summaryDTO.getReadCount()); + nextSchoolForProcessing++; + } else { + aggregate("schoolReportsRegenSummaryDTO"); } - aggregate("schoolReportsRegenSummaryDTO"); - return schools; + return nextSchool; } private void aggregate(String summaryContextName) { @@ -41,7 +31,7 @@ private void aggregate(String summaryContextName) { jobExecution.getExecutionContext().put(summaryContextName, totalSummaryDTO); } totalSummaryDTO.setBatchId(summaryDTO.getBatchId()); - totalSummaryDTO.setReadCount(summaryDTO.getReadCount()); + totalSummaryDTO.setProcessedCount(totalSummaryDTO.getProcessedCount() + summaryDTO.getProcessedCount()); totalSummaryDTO.getGlobalList().addAll(summaryDTO.getGlobalList()); } } diff --git a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/writer/RegenerateSchoolReportsWriter.java b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/writer/RegenerateSchoolReportsWriter.java index 9e4d076f..03a31fdc 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/writer/RegenerateSchoolReportsWriter.java +++ b/api/src/main/java/ca/bc/gov/educ/api/batchgraduation/writer/RegenerateSchoolReportsWriter.java @@ -3,15 +3,16 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemWriter; - -import java.util.List; +import org.springframework.lang.NonNull; @Slf4j -public class RegenerateSchoolReportsWriter implements ItemWriter> { +public class RegenerateSchoolReportsWriter implements ItemWriter { @Override - public void write(Chunk> chunk) throws Exception { - log.info("Regenerate School Reports Writer"); + public void write(@NonNull Chunk list) throws Exception { + if(log.isDebugEnabled()) { + log.info("Regenerate School Reports Writer: chunk size = {}", list.size()); + } } }