Skip to content

Commit

Permalink
#4079 - Queue Monitoring - Schedulers Refactor (Part 6) (#4177)
Browse files Browse the repository at this point in the history
Refactored the below schedulers and associated E2E tests.
- assessment-workflow-enqueuer
- assessment-workflow-queue-retry
- application-changes-report-integration
- process-notifications
- student-application-notifications
- atbc
  • Loading branch information
andrewsignori-aot authored Dec 26, 2024
1 parent f0253be commit f546f3d
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 371 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { createMock } from "@golevelup/ts-jest";
import { INestApplication } from "@nestjs/common";
import { addDays, formatDate } from "@sims/utilities";
import { createTestingAppModule } from "../../../../../test/helpers";
import {
createTestingAppModule,
mockBullJob,
} from "../../../../../test/helpers";
import { ATBCResponseIntegrationScheduler } from "../atbc-response-integration.scheduler";
import {
E2EDataSources,
createE2EDataSources,
saveFakeStudent,
} from "@sims/test-utils";
import { Job } from "bull";
import { ProcessSummaryResult } from "@sims/integrations/models";
import {
ATBCDisabilityStatusResponse,
ATBCService,
Expand Down Expand Up @@ -38,7 +38,7 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
// Arrange

// Queued job.
const job = createMock<Job<void>>();
const mockedJob = mockBullJob<void>();

// Create students who applied for disability status.
const [
Expand Down Expand Up @@ -75,15 +75,17 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
};

// Act
const processResult = await processor.processPendingPDRequests(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
const expectedResult: ProcessSummaryResult = new ProcessSummaryResult();
expectedResult.summary = [
"Total disability status requests processed: 3",
"Students updated with disability status: 2",
];
expect(processResult).toStrictEqual(expectedResult);
expect(result).toStrictEqual(["Completed processing disability status."]);
expect(
mockedJob.containLogMessages([
"Total disability status requests processed: 3",
"Students updated with disability status: 2",
]),
).toBe(true);

// Validate the disability status and status updated date.
const pdReceivedStudent = await getStudentDisabilityStatusDetails(
db,
Expand All @@ -108,7 +110,7 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
// Arrange

// Queued job.
const job = createMock<Job<void>>();
const mockedJob = mockBullJob<void>();

// Create student who applied and received disability status.
// If the ATBC response is received again with same disability status, no updated should happen.
Expand All @@ -132,15 +134,16 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
};

// Act
const processResult = await processor.processPendingPDRequests(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
const expectedResult: ProcessSummaryResult = new ProcessSummaryResult();
expectedResult.summary = [
"Total disability status requests processed: 1",
"Students updated with disability status: 0",
];
expect(processResult).toStrictEqual(expectedResult);
expect(result).toStrictEqual(["Completed processing disability status."]);
expect(
mockedJob.containLogMessages([
"Total disability status requests processed: 1",
"Students updated with disability status: 0",
]),
).toBe(true);
const pdReceivedStudent = await getStudentDisabilityStatusDetails(
db,
studentAppliedDisabilityAndUpdatedAlready.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Process } from "@nestjs/bull";
import { Job, Queue } from "bull";
import { BaseScheduler } from "../base-scheduler";
import { QueueService } from "@sims/services/queue";
import { ATBCIntegrationProcessingService } from "@sims/integrations/atbc-integration";
import {
QueueProcessSummary,
QueueProcessSummaryResult,
} from "../../models/processors.models";
InjectLogger,
LoggerService,
ProcessSummary,
} from "@sims/utilities/logger";

/**
* Process all the applied PD requests to verify the status with ATBC.
Expand All @@ -21,38 +21,27 @@ export class ATBCResponseIntegrationScheduler extends BaseScheduler<void> {
}

/**
* To be removed once the method {@link process} is implemented.
* This method "hides" the {@link Process} decorator from the base class.
*/
async processQueue(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* When implemented in a derived class, process the queue job.
* To be implemented.
* Process all the applied disability status requests by students.
* @param _job process job.
* @param processSummary process summary for logging.
* @returns processing result.
*/
protected async process(): Promise<string | string[]> {
throw new Error("Method not implemented.");
protected async process(
_job: Job<void>,
processSummary: ProcessSummary,
): Promise<string> {
await this.atbcIntegrationProcessingService.processAppliedDisabilityRequests(
processSummary,
);
return "Completed processing disability status.";
}

/**
* Process all the applied disability status requests by students.
* @param job ATBC response integration job.
* @returns processing result.
* Setting the logger here allows the correct context to be set
* during the property injection.
* Even if the logger is not used, it is required to be set, to
* allow the base classes to write logs using the correct context.
*/
@Process()
async processPendingPDRequests(
job: Job<void>,
): Promise<QueueProcessSummaryResult> {
const summary = new QueueProcessSummary({
appLogger: this.logger,
jobLogger: job,
});
await summary.info("Processing disability status for students.");
const processingResult =
await this.atbcIntegrationProcessingService.processAppliedDisabilityRequests();
await summary.info("Completed processing disability status.");
return processingResult;
}
@InjectLogger()
logger: LoggerService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import {
ApplicationChangesReportHeaders,
APPLICATION_CHANGES_DATE_TIME_FORMAT,
} from "@sims/integrations/esdc-integration";
import MockDate from "mockdate";
import * as dayjs from "dayjs";

describe(
describeProcessorRootTest(QueueNames.ApplicationChangesReportIntegration),
Expand All @@ -49,6 +51,7 @@ describe(
let sftpClientMock: DeepMocked<Client>;
let auditUser: User;
let sharedStudent: Student;
let expectedFileName: string;

beforeAll(async () => {
const { nestApplication, dataSource, sshClientMock } =
Expand All @@ -70,6 +73,10 @@ describe(
},
{ previousDateChangedReportedAssessment: { id: null } },
);
// Mock the current date.
const now = new Date();
MockDate.set(now);
expectedFileName = `DPBC.EDU.APPCHANGES.${formatFileNameDate(now)}.csv`;
});

it("Should generate application changes report and update the reported date when there is one or more application changes which are not reported.", async () => {
Expand All @@ -89,16 +96,14 @@ describe(
const mockedJob = mockBullJob<void>();

// Act
const processingResult = await processor.processApplicationChanges(
mockedJob.job,
);
const result = await processor.processQueue(mockedJob.job);

// Assert
// Assert process result.
expect(processingResult).toContain("Process finalized with success.");
expect(processingResult).toContain("Applications reported: 2");
expect(
mockedJob.containLogMessages(["Found 2 application changes."]),
).toBe(true);
expect(result).toStrictEqual([
"Applications reported: 2",
`Uploaded file name: ${expectedFileName}`,
]);
const uploadedFile = getUploadedFile(sftpClientMock);
const expectedFirstRecord = getExpectedApplicationChangesCSVRecord(
firstApplication,
Expand Down Expand Up @@ -176,16 +181,13 @@ describe(
const mockedJob = mockBullJob<void>();

// Act
const processingResult = await processor.processApplicationChanges(
mockedJob.job,
);
const result = await processor.processQueue(mockedJob.job);
// Assert
// Assert process result.
expect(processingResult).toContain("Process finalized with success.");
expect(processingResult).toContain("Applications reported: 1");
expect(
mockedJob.containLogMessages(["Found 1 application changes."]),
).toBe(true);
expect(result).toStrictEqual([
"Applications reported: 1",
`Uploaded file name: ${expectedFileName}`,
]);
const uploadedFile = getUploadedFile(sftpClientMock);
const expectedFirstRecord = getExpectedApplicationChangesCSVRecord(
application,
Expand Down Expand Up @@ -226,16 +228,19 @@ describe(
const mockedJob = mockBullJob<void>();

// Act
const processingResult = await processor.processApplicationChanges(
mockedJob.job,
);
const result = await processor.processQueue(mockedJob.job);

// Assert
// Assert process result.
expect(processingResult).toContain("Process finalized with success.");
expect(processingResult).toContain("Applications reported: 0");
expect(result).toStrictEqual([
"Applications reported: 0",
`Uploaded file name: ${expectedFileName}`,
]);
expect(
mockedJob.containLogMessages([
"Retrieving all application changes that have not yet been reported.",
"Found 0 application changes.",
`Application changes report with file name ${expectedFileName} has been uploaded successfully.`,
"Report date update not required as no application changes are reported.",
]),
).toBe(true);
Expand Down Expand Up @@ -397,5 +402,14 @@ describe(
newOffering.studyEndDate,
].join(",");
}

/**
* Format date to be used in file name.
* @param date date.
* @returns file name date format.
*/
function formatFileNameDate(date: Date): string {
return dayjs(date).format("YYYY-MM-DD_HH.mm.ss");
}
},
);
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { InjectQueue, Processor } from "@nestjs/bull";
import { QueueService } from "@sims/services/queue";
import { Job, Queue } from "bull";
import { BaseScheduler } from "../../base-scheduler";
Expand All @@ -7,10 +7,6 @@ import {
LoggerService,
ProcessSummary,
} from "@sims/utilities/logger";
import {
getSuccessMessageWithAttentionCheck,
logProcessSummaryToJobLogger,
} from "../../../../utilities";
import { QueueNames } from "@sims/utilities";
import { ApplicationChangesReportProcessingService } from "@sims/integrations/esdc-integration";

Expand All @@ -25,65 +21,36 @@ export class ApplicationChangesReportIntegrationScheduler extends BaseScheduler<
super(schedulerQueue, queueService);
}

/**
* To be removed once the method {@link process} is implemented.
* This method "hides" the {@link Process} decorator from the base class.
*/
async processQueue(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* When implemented in a derived class, process the queue job.
* To be implemented.
*/
protected async process(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* Generate application changes report for the applications which has at least one e-Cert sent
* and the application study dates have changed after the first e-Cert
* or after the last time the application was reported for study dates change
* through application changes report. Once generated upload the report to the ESDC directory
* in SFTP server.
* @param job job.
* @param _job process job.
* @param processSummary process summary for logging.
* @returns process summary.
*/
@Process()
async processApplicationChanges(job: Job<void>): Promise<string[]> {
const processSummary = new ProcessSummary();

try {
processSummary.info(
`Processing application changes report integration job. Job id: ${job.id} and Job name: ${job.name}.`,
);
const integrationProcessSummary = new ProcessSummary();
processSummary.children(integrationProcessSummary);
const { applicationsReported, uploadedFileName } =
await this.applicationChangesReportProcessingService.processApplicationChanges(
integrationProcessSummary,
);
return getSuccessMessageWithAttentionCheck(
[
"Process finalized with success.",
`Applications reported: ${applicationsReported}`,
`Uploaded file name: ${uploadedFileName}`,
],
protected async process(
_job: Job<void>,
processSummary: ProcessSummary,
): Promise<string[]> {
const { applicationsReported, uploadedFileName } =
await this.applicationChangesReportProcessingService.processApplicationChanges(
processSummary,
);
} catch (error: unknown) {
// Translate to friendly error message.
const errorMessage =
"Unexpected error while executing the job to process application changes.";
processSummary.error(errorMessage, error);
throw new Error(errorMessage, { cause: error });
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
}
return [
`Applications reported: ${applicationsReported}`,
`Uploaded file name: ${uploadedFileName}`,
];
}

/**
* Setting the logger here allows the correct context to be set
* during the property injection.
* Even if the logger is not used, it is required to be set, to
* allow the base classes to write logs using the correct context.
*/
@InjectLogger()
logger: LoggerService;
}
Loading

0 comments on commit f546f3d

Please sign in to comment.