Skip to content

Commit

Permalink
GRAD2-2968: enable multi-partitioning to regenerate school reports in…
Browse files Browse the repository at this point in the history
… parallel. (#531)

GRAD2-2968: enable multi-partitioning to regenerate school reports in parallel.
  • Loading branch information
infstar authored Sep 19, 2024
1 parent f6c9119 commit 0197311
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ public RegenerateSchoolReportsWriter itemWriterSchoolReportsRegen() {
@Bean
public Step schoolReportsRegenJobStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, SkipSQLTransactionExceptionsListener skipListener) {
return new StepBuilder("schoolReportsRegenJobStep", jobRepository)
.<List<String>, List<String>>chunk(1, transactionManager)
.<String, String>chunk(1, transactionManager)
.reader(itemReaderSchoolReportsRegen())
.processor(itemProcessorSchoolReportsRegen())
.writer(itemWriterSchoolReportsRegen())
Expand All @@ -1086,10 +1086,11 @@ public Step schoolReportsRegenJobStep(JobRepository jobRepository, PlatformTrans

@Bean
public Step masterStepSchoolReportsRegen(JobRepository jobRepository, PlatformTransactionManager transactionManager, EducGradBatchGraduationApiConstants constants, SkipSQLTransactionExceptionsListener skipListener) {
int partitionSize = constants.getNumberOfPartitions() / 2;
return new StepBuilder("masterStepSchoolReportsRegen", jobRepository)
.partitioner(schoolReportsRegenJobStep(jobRepository, transactionManager, skipListener).getName(), partitionerSchoolReportsRegen())
.step(schoolReportsRegenJobStep(jobRepository, transactionManager, skipListener))
.gridSize(constants.getNumberOfPartitions())
.gridSize(partitionSize != 0? partitionSize : 1)
.taskExecutor(taskExecutor())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package ca.bc.gov.educ.api.batchgraduation.listener;

import ca.bc.gov.educ.api.batchgraduation.entity.BatchGradAlgorithmJobHistoryEntity;
import ca.bc.gov.educ.api.batchgraduation.model.BaseSummaryDTO;
import ca.bc.gov.educ.api.batchgraduation.model.SchoolReportsRegenSummaryDTO;
import ca.bc.gov.educ.api.batchgraduation.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;

import java.util.Date;

import static ca.bc.gov.educ.api.batchgraduation.util.EducGradBatchGraduationApiConstants.SEARCH_REQUEST;

@Slf4j
@Component
public class RegenSchoolReportsCompletionNotificationListener extends BaseRegenSchoolReportsCompletionNotificationListener {
Expand All @@ -22,34 +21,38 @@ public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
long elapsedTimeMillis = getElapsedTimeMillis(jobExecution);
log.info("=======================================================================================");
JobParameters jobParameters = jobExecution.getJobParameters();
ExecutionContext jobContext = jobExecution.getExecutionContext();
Long jobExecutionId = jobExecution.getId();
String jobType = jobParameters.getString("jobType");
log.info("{} Regen School Reports Job {} completed in {} s with jobExecution status {}", jobType, jobExecutionId, elapsedTimeMillis / 1000, jobExecution.getStatus());
log.info("Regen School Reports Job {} completed in {} s with jobExecution status {}", jobExecutionId, elapsedTimeMillis / 1000, jobExecution.getStatus());

String status = jobExecution.getStatus().toString();
Date startTime = DateUtils.toDate(jobExecution.getStartTime());
Date endTime = DateUtils.toDate(jobExecution.getEndTime());
String jobTrigger = jobParameters.getString("jobTrigger");

SchoolReportsRegenSummaryDTO summaryDTO = (SchoolReportsRegenSummaryDTO) jobContext.get("schoolReportsRegenSummaryDTO");

String studentSearchRequest = jobParameters.getString(SEARCH_REQUEST, "{}");
// display Summary Details
log.info("Records read : {}", summaryDTO.getReadCount());
assert summaryDTO != null;
log.info("Records read : {}", summaryDTO.getReadCount());
log.info("Processed count: {}", summaryDTO.getProcessedCount());
log.info(" --------------------------------------------------------------------------------------");
log.info("Errors:{}", summaryDTO.getErrors().size());
log.info(" --------------------------------------------------------------------------------------");
summaryDTO.getSchools().forEach(value -> log.debug("School Report regenerated for {}", value.getMincode()));
// save batch job & error history
saveBatchJobHistory(summaryDTO, jobExecutionId, status, endTime);

updateUserSchedulingJobs(jobParameters);
}
}

String jobParametersDTO = buildJobParametersDTO(jobType, studentSearchRequest, null, null);
// save batch job & error history
processBatchJobHistory(summaryDTO, jobExecutionId, status, jobTrigger, jobType, startTime, endTime, jobParametersDTO);
log.info(" --------------------------------------------------------------------------------------");
summaryDTO.getSchools().forEach((value) -> log.info("School {} number of Regen School Reports : {}", value.getMincode(), value.getNumberOfSchoolReports()));
private void saveBatchJobHistory(BaseSummaryDTO summaryDTO, Long jobExecutionId, String status, Date endTime) {
BatchGradAlgorithmJobHistoryEntity ent = gradBatchHistoryService.getGradAlgorithmJobHistory(jobExecutionId);
if (ent != null) {
ent.setActualStudentsProcessed(summaryDTO.getProcessedCount());
ent.setFailedStudentsProcessed((int) summaryDTO.getErroredCount());
ent.setEndTime(DateUtils.toLocalDateTime(endTime));
ent.setStatus(status);

gradBatchHistoryService.saveGradAlgorithmJobHistory(ent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -11,9 +10,11 @@
@NoArgsConstructor
public class SchoolReportsRegenSummaryDTO extends BaseSummaryDTO {

private String reportBatchType; // REGALG or TVRRUN

private List<ProcessError> errors = new ArrayList<>();
private List<School> globalList = new ArrayList<>();
private List<School> schools = new ArrayList<>();
private StudentSearchRequest studentSearchRequest;

}

Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,28 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
public class RegenerateSchoolReportsProcessor implements ItemProcessor<List<String>, List<String>> {
public class RegenerateSchoolReportsProcessor implements ItemProcessor<String, String> {

@Autowired
RestUtils restUtils;

@Value("#{stepExecutionContext['summary']}")
SchoolReportsRegenSummaryDTO summaryDTO;

@Value("#{stepExecution.jobExecution.id}")
Long batchId;

@Override
public List<String> process(List<String> minCodes) throws Exception {
Long batchId = summaryDTO.getBatchId();
StudentSearchRequest searchRequest = summaryDTO.getStudentSearchRequest();
long countRegeneratedSchoolReports = 0l;
List<String> reportTypes = searchRequest.getReportTypes();
public String process(String minCode) throws Exception {
summaryDTO.setBatchId(batchId);
if(log.isDebugEnabled()) {
log.debug("Process Schools: {}", !minCodes.isEmpty() ? String.join(",", minCodes) : summaryDTO.getSchools().stream().map(School::getMincode).collect(Collectors.joining(",")));
log.debug("Processing {} School Report: {} ", summaryDTO.getReportBatchType(), minCode);
}

String reportType;
if(reportTypes != null && !reportTypes.isEmpty() && "NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0)
reportType = "TVRRUN";
else
reportType = "REGALG";

for (String minCode : minCodes) {
countRegeneratedSchoolReports += restUtils.createAndStoreSchoolReports(minCode, reportType, summaryDTO);
}
long countRegeneratedSchoolReports = restUtils.createAndStoreSchoolReports(minCode, summaryDTO.getReportBatchType(), summaryDTO);

summaryDTO.setProcessedCount(countRegeneratedSchoolReports);
return minCodes;
summaryDTO.setProcessedCount(summaryDTO.getProcessedCount() + countRegeneratedSchoolReports);
return minCode;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ca.bc.gov.educ.api.batchgraduation.reader;

import ca.bc.gov.educ.api.batchgraduation.model.EdwSnapshotSchoolSummaryDTO;
import ca.bc.gov.educ.api.batchgraduation.model.ResponseObj;
import ca.bc.gov.educ.api.batchgraduation.rest.RestUtils;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemReader;
Expand All @@ -22,16 +20,7 @@ public abstract class BaseSchoolReader implements ItemReader<String> {
@Value("#{stepExecutionContext['data']}")
List<String> schools;

@Value("#{stepExecutionContext['summary']}")
EdwSnapshotSchoolSummaryDTO summaryDTO;

@Value("#{stepExecution.jobExecution}")
JobExecution jobExecution;

protected void fetchAccessToken() {
ResponseObj res = restUtils.getTokenResponseObject();
if (res != null) {
summaryDTO.setAccessToken(res.getAccess_token());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;

public class EDWSnapshotSchoolReader extends BaseSchoolReader implements ItemReader<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(EDWSnapshotSchoolReader.class);

@Value("#{stepExecutionContext['summary']}")
EdwSnapshotSchoolSummaryDTO summaryDTO;

@Override
public String read() throws Exception {
String nextSchool = null;
if (nextSchoolForProcessing < schools.size()) {
fetchAccessToken();
nextSchool = schools.get(nextSchoolForProcessing);
LOGGER.info("School: {} - {} of {}", nextSchool, nextSchoolForProcessing + 1, summaryDTO.getReadCount());
nextSchoolForProcessing++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import java.util.*;
import java.util.stream.Collectors;

import static ca.bc.gov.educ.api.batchgraduation.util.EducGradBatchGraduationApiConstants.SEARCH_REQUEST;

@Slf4j
public class RegenerateSchoolReportsPartitioner extends BasePartitioner {

Expand Down Expand Up @@ -51,22 +49,19 @@ public Map<String, ExecutionContext> partition(int gridSize) {
}

summaryDTO.setBatchId(jobExecution.getId());
summaryDTO.setStudentSearchRequest(searchRequest);
summaryDTO.setReportBatchType(determineReportBatchType(searchRequest.getReportTypes()));

Long totalSchoolReportsCount = 0L;
List<String> reportTypes = searchRequest.getReportTypes();
Long schoolReportsCount = 0L;

List<String> finalSchoolDistricts = new ArrayList<>();
List<SchoolReport> schoolReportsLite = new ArrayList<>();
List<SchoolReport> schoolReportsLite;

if(processAllReports) {
if (reportTypes != null && !reportTypes.isEmpty()) {
if ("NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0) {
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType("NONGRADPRJ", summaryDTO);
} else {
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType( "GRADREG", summaryDTO);
}
if ("TVRRUN".compareToIgnoreCase(summaryDTO.getReportBatchType()) == 0) {
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType("NONGRADPRJ", summaryDTO);
} else {
schoolReportsLite = restUtils.getSchoolReportsLiteByReportType( "GRADREG", summaryDTO);
}

if (schoolReportsLite != null && !schoolReportsLite.isEmpty()) {
Expand All @@ -81,41 +76,57 @@ public Map<String, ExecutionContext> partition(int gridSize) {
totalSchoolReportsCount += finalSchoolDistricts.size();
} else {
for (String schoolOfRecord : schoolDistricts) {
if (reportTypes != null && !reportTypes.isEmpty()) {
if ("NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0) {
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "NONGRADPRJ", summaryDTO);
} else {
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "GRADREG", summaryDTO);
}
if (schoolReportsCount > 0) {
finalSchoolDistricts.add(schoolOfRecord);
School school = new School(schoolOfRecord);
school.setNumberOfSchoolReports(schoolReportsCount);
summaryDTO.getSchools().add(school);
totalSchoolReportsCount += schoolReportsCount;
}
schoolReportsCount = 0L;
if ("TVRRUN".compareToIgnoreCase(summaryDTO.getReportBatchType()) == 0) {
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "NONGRADPRJ", summaryDTO);
} else {
schoolReportsCount += restUtils.getTotalReportsForProcessing(List.of(schoolOfRecord), "GRADREG", summaryDTO);
}
if (schoolReportsCount > 0) {
finalSchoolDistricts.add(schoolOfRecord);
School school = new School(schoolOfRecord);
school.setNumberOfSchoolReports(schoolReportsCount);
summaryDTO.getSchools().add(school);
totalSchoolReportsCount += schoolReportsCount;
}
schoolReportsCount = 0L;
}
}

long endTime = System.currentTimeMillis();
long diff = (endTime - startTime)/1000;
log.debug("Total {} schools after filters in {} sec", finalSchoolDistricts.size(), diff);

updateBatchJobHistory(createBatchJobHistory(), totalSchoolReportsCount);
summaryDTO.setReadCount(totalSchoolReportsCount);
summaryDTO.setProcessedCount(0);

Map<String, ExecutionContext> map = new HashMap<>();
ExecutionContext executionContext = new ExecutionContext();
executionContext.put(SEARCH_REQUEST, searchRequest);
executionContext.put("data", finalSchoolDistricts);
executionContext.put("summary", summaryDTO);
executionContext.put("readCount", 0);
map.put("partition0", executionContext);
if (!finalSchoolDistricts.isEmpty()) {
updateBatchJobHistory(createBatchJobHistory(), totalSchoolReportsCount);
int partitionSize = finalSchoolDistricts.size()/gridSize + 1;
List<List<String>> partitions = new LinkedList<>();
for (int i = 0; i < finalSchoolDistricts.size(); i += partitionSize) {
partitions.add(finalSchoolDistricts.subList(i, Math.min(i + partitionSize, finalSchoolDistricts.size())));
}
Map<String, ExecutionContext> map = new HashMap<>(partitions.size());
for (int i = 0; i < partitions.size(); i++) {
ExecutionContext executionContext = new ExecutionContext();
SchoolReportsRegenSummaryDTO partitionSummaryDTO = new SchoolReportsRegenSummaryDTO();
partitionSummaryDTO.setReportBatchType(summaryDTO.getReportBatchType());
List<String> data = partitions.get(i);
executionContext.put("data", data);
partitionSummaryDTO.setReadCount(data.size());
executionContext.put("summary", partitionSummaryDTO);
executionContext.put("index",0);
String key = "partition" + i;
map.put(key, executionContext);
}
log.info("Found {} in total running on {} partitions",finalSchoolDistricts.size(),map.size());
return map;
}
log.info("No Schools Found for School Reports Regeneration");
return new HashMap<>();
}

log.info("Found {} in total running on 1 partitions", totalSchoolReportsCount);
return map;
private String determineReportBatchType(List<String> reportTypes) {
return reportTypes != null && !reportTypes.isEmpty() && "NONGRADPRJ".compareToIgnoreCase(reportTypes.get(0)) == 0 ? "TVRRUN" : "REGALG";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,26 @@

import ca.bc.gov.educ.api.batchgraduation.model.SchoolReportsRegenSummaryDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;

import java.util.List;

@Slf4j
public class RegenerateSchoolReportsReader implements ItemReader<List<String>> {

@Value("#{stepExecutionContext['data']}")
List<String> schools;
public class RegenerateSchoolReportsReader extends BaseSchoolReader implements ItemReader<String> {

@Value("#{stepExecutionContext['summary']}")
SchoolReportsRegenSummaryDTO summaryDTO;

@Value("#{stepExecution.jobExecution}")
JobExecution jobExecution;

@Value("#{stepExecutionContext['readCount']}")
Long readCount;

@Override
public List<String> read() throws Exception {
if(readCount > 0) return null;
readCount++;
if(log.isDebugEnabled()) {
log.info("Read schools Codes -> {} of {} schools", readCount, schools.size());
public String read() throws Exception {
String nextSchool = null;
if (nextSchoolForProcessing < schools.size()) {
nextSchool = schools.get(nextSchoolForProcessing);
log.info("School: {} - {} of {}", nextSchool, nextSchoolForProcessing + 1, summaryDTO.getReadCount());
nextSchoolForProcessing++;
} else {
aggregate("schoolReportsRegenSummaryDTO");
}
aggregate("schoolReportsRegenSummaryDTO");
return schools;
return nextSchool;
}

private void aggregate(String summaryContextName) {
Expand All @@ -41,7 +31,7 @@ private void aggregate(String summaryContextName) {
jobExecution.getExecutionContext().put(summaryContextName, totalSummaryDTO);
}
totalSummaryDTO.setBatchId(summaryDTO.getBatchId());
totalSummaryDTO.setReadCount(summaryDTO.getReadCount());
totalSummaryDTO.setProcessedCount(totalSummaryDTO.getProcessedCount() + summaryDTO.getProcessedCount());
totalSummaryDTO.getGlobalList().addAll(summaryDTO.getGlobalList());
}
}
Loading

0 comments on commit 0197311

Please sign in to comment.