Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#4079 - Queue Monitoring - Schedulers Refactor (Part 6) #4177

Merged
merged 7 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { createMock } from "@golevelup/ts-jest";
import { INestApplication } from "@nestjs/common";
import { addDays, formatDate } from "@sims/utilities";
import { createTestingAppModule } from "../../../../../test/helpers";
import {
createTestingAppModule,
mockBullJob,
} from "../../../../../test/helpers";
import { ATBCResponseIntegrationScheduler } from "../atbc-response-integration.scheduler";
import {
E2EDataSources,
createE2EDataSources,
saveFakeStudent,
} from "@sims/test-utils";
import { Job } from "bull";
import { ProcessSummaryResult } from "@sims/integrations/models";
import {
ATBCDisabilityStatusResponse,
ATBCService,
Expand Down Expand Up @@ -38,7 +38,7 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
// Arrange

// Queued job.
const job = createMock<Job<void>>();
const mockedJob = mockBullJob<void>();

// Create students who applied for disability status.
const [
Expand Down Expand Up @@ -75,15 +75,17 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
};

// Act
const processResult = await processor.processPendingPDRequests(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
const expectedResult: ProcessSummaryResult = new ProcessSummaryResult();
expectedResult.summary = [
"Total disability status requests processed: 3",
"Students updated with disability status: 2",
];
expect(processResult).toStrictEqual(expectedResult);
expect(result).toStrictEqual(["Completed processing disability status."]);
expect(
mockedJob.containLogMessages([
"Total disability status requests processed: 3",
"Students updated with disability status: 2",
]),
).toBe(true);

// Validate the disability status and status updated date.
const pdReceivedStudent = await getStudentDisabilityStatusDetails(
db,
Expand All @@ -108,7 +110,7 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
// Arrange

// Queued job.
const job = createMock<Job<void>>();
const mockedJob = mockBullJob<void>();

// Create student who applied and received disability status.
// If the ATBC response is received again with same disability status, no updated should happen.
Expand All @@ -132,15 +134,16 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", (
};

// Act
const processResult = await processor.processPendingPDRequests(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
const expectedResult: ProcessSummaryResult = new ProcessSummaryResult();
expectedResult.summary = [
"Total disability status requests processed: 1",
"Students updated with disability status: 0",
];
expect(processResult).toStrictEqual(expectedResult);
expect(result).toStrictEqual(["Completed processing disability status."]);
expect(
mockedJob.containLogMessages([
"Total disability status requests processed: 1",
"Students updated with disability status: 0",
]),
).toBe(true);
const pdReceivedStudent = await getStudentDisabilityStatusDetails(
db,
studentAppliedDisabilityAndUpdatedAlready.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Process } from "@nestjs/bull";
import { Job, Queue } from "bull";
import { BaseScheduler } from "../base-scheduler";
import { QueueService } from "@sims/services/queue";
import { ATBCIntegrationProcessingService } from "@sims/integrations/atbc-integration";
import {
QueueProcessSummary,
QueueProcessSummaryResult,
} from "../../models/processors.models";
InjectLogger,
LoggerService,
ProcessSummary,
} from "@sims/utilities/logger";

/**
* Process all the applied PD requests to verify the status with ATBC.
Expand All @@ -21,38 +21,27 @@ export class ATBCResponseIntegrationScheduler extends BaseScheduler<void> {
}

/**
* To be removed once the method {@link process} is implemented.
* This method "hides" the {@link Process} decorator from the base class.
*/
async processQueue(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* When implemented in a derived class, process the queue job.
* To be implemented.
* Process all the applied disability status requests by students.
* @param _job process job.
* @param processSummary process summary for logging.
* @returns processing result.
*/
protected async process(): Promise<string | string[]> {
throw new Error("Method not implemented.");
protected async process(
_job: Job<void>,
processSummary: ProcessSummary,
): Promise<string> {
await this.atbcIntegrationProcessingService.processAppliedDisabilityRequests(
processSummary,
);
return "Completed processing disability status.";
}

/**
* Process all the applied disability status requests by students.
* @param job ATBC response integration job.
* @returns processing result.
* Setting the logger here allows the correct context to be set
* during the property injection.
* Even if the logger is not used, it is required to be set, to
* allow the base classes to write logs using the correct context.
*/
@Process()
async processPendingPDRequests(
job: Job<void>,
): Promise<QueueProcessSummaryResult> {
const summary = new QueueProcessSummary({
appLogger: this.logger,
jobLogger: job,
});
await summary.info("Processing disability status for students.");
const processingResult =
await this.atbcIntegrationProcessingService.processAppliedDisabilityRequests();
await summary.info("Completed processing disability status.");
return processingResult;
}
@InjectLogger()
logger: LoggerService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import {
ApplicationChangesReportHeaders,
APPLICATION_CHANGES_DATE_TIME_FORMAT,
} from "@sims/integrations/esdc-integration";
import MockDate from "mockdate";
import * as dayjs from "dayjs";

describe(
describeProcessorRootTest(QueueNames.ApplicationChangesReportIntegration),
Expand All @@ -49,6 +51,7 @@ describe(
let sftpClientMock: DeepMocked<Client>;
let auditUser: User;
let sharedStudent: Student;
let expectedFileName: string;

beforeAll(async () => {
const { nestApplication, dataSource, sshClientMock } =
Expand All @@ -70,6 +73,10 @@ describe(
},
{ previousDateChangedReportedAssessment: { id: null } },
);
// Mock the current date.
const now = new Date();
MockDate.set(now);
expectedFileName = `DPBC.EDU.APPCHANGES.${formatFileNameDate(now)}.csv`;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

});

it("Should generate application changes report and update the reported date when there is one or more application changes which are not reported.", async () => {
Expand All @@ -89,16 +96,14 @@ describe(
const mockedJob = mockBullJob<void>();

// Act
const processingResult = await processor.processApplicationChanges(
mockedJob.job,
);
const result = await processor.processQueue(mockedJob.job);

// Assert
// Assert process result.
expect(processingResult).toContain("Process finalized with success.");
expect(processingResult).toContain("Applications reported: 2");
expect(
mockedJob.containLogMessages(["Found 2 application changes."]),
).toBe(true);
expect(result).toStrictEqual([
"Applications reported: 2",
`Uploaded file name: ${expectedFileName}`,
]);
const uploadedFile = getUploadedFile(sftpClientMock);
const expectedFirstRecord = getExpectedApplicationChangesCSVRecord(
firstApplication,
Expand Down Expand Up @@ -176,16 +181,13 @@ describe(
const mockedJob = mockBullJob<void>();

// Act
const processingResult = await processor.processApplicationChanges(
mockedJob.job,
);
const result = await processor.processQueue(mockedJob.job);
// Assert
// Assert process result.
expect(processingResult).toContain("Process finalized with success.");
expect(processingResult).toContain("Applications reported: 1");
expect(
mockedJob.containLogMessages(["Found 1 application changes."]),
).toBe(true);
expect(result).toStrictEqual([
"Applications reported: 1",
`Uploaded file name: ${expectedFileName}`,
]);
const uploadedFile = getUploadedFile(sftpClientMock);
const expectedFirstRecord = getExpectedApplicationChangesCSVRecord(
application,
Expand Down Expand Up @@ -226,16 +228,19 @@ describe(
const mockedJob = mockBullJob<void>();

// Act
const processingResult = await processor.processApplicationChanges(
mockedJob.job,
);
const result = await processor.processQueue(mockedJob.job);

// Assert
// Assert process result.
expect(processingResult).toContain("Process finalized with success.");
expect(processingResult).toContain("Applications reported: 0");
expect(result).toStrictEqual([
"Applications reported: 0",
`Uploaded file name: ${expectedFileName}`,
]);
expect(
mockedJob.containLogMessages([
"Retrieving all application changes which were not reported already.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor: maybe it could be rephrased to: "Retrieving all application changes that have not yet been reported."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested.

"Found 0 application changes.",
`Application changes report with file name ${expectedFileName} has been uploaded successfully.`,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"Report date update not required as no application changes are reported.",
]),
).toBe(true);
Expand Down Expand Up @@ -397,5 +402,14 @@ describe(
newOffering.studyEndDate,
].join(",");
}

/**
* Format date to be used in file name.
* @param date date.
* @returns file name date format.
*/
function formatFileNameDate(date: Date): string {
return dayjs(date).format("YYYY-MM-DD_HH.mm.ss");
}
},
);
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { InjectQueue, Processor } from "@nestjs/bull";
import { QueueService } from "@sims/services/queue";
import { Job, Queue } from "bull";
import { BaseScheduler } from "../../base-scheduler";
Expand All @@ -7,10 +7,6 @@ import {
LoggerService,
ProcessSummary,
} from "@sims/utilities/logger";
import {
getSuccessMessageWithAttentionCheck,
logProcessSummaryToJobLogger,
} from "../../../../utilities";
import { QueueNames } from "@sims/utilities";
import { ApplicationChangesReportProcessingService } from "@sims/integrations/esdc-integration";

Expand All @@ -25,65 +21,36 @@ export class ApplicationChangesReportIntegrationScheduler extends BaseScheduler<
super(schedulerQueue, queueService);
}

/**
* To be removed once the method {@link process} is implemented.
* This method "hides" the {@link Process} decorator from the base class.
*/
async processQueue(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* When implemented in a derived class, process the queue job.
* To be implemented.
*/
protected async process(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* Generate application changes report for the applications which has at least one e-Cert sent
* and the application study dates have changed after the first e-Cert
* or after the last time the application was reported for study dates change
* through application changes report. Once generated upload the report to the ESDC directory
* in SFTP server.
* @param job job.
* @param _job process job.
* @param processSummary process summary for logging.
* @returns process summary.
*/
@Process()
async processApplicationChanges(job: Job<void>): Promise<string[]> {
const processSummary = new ProcessSummary();

try {
processSummary.info(
`Processing application changes report integration job. Job id: ${job.id} and Job name: ${job.name}.`,
);
const integrationProcessSummary = new ProcessSummary();
processSummary.children(integrationProcessSummary);
const { applicationsReported, uploadedFileName } =
await this.applicationChangesReportProcessingService.processApplicationChanges(
integrationProcessSummary,
);
return getSuccessMessageWithAttentionCheck(
[
"Process finalized with success.",
`Applications reported: ${applicationsReported}`,
`Uploaded file name: ${uploadedFileName}`,
],
protected async process(
_job: Job<void>,
processSummary: ProcessSummary,
): Promise<string[]> {
const { applicationsReported, uploadedFileName } =
await this.applicationChangesReportProcessingService.processApplicationChanges(
processSummary,
);
} catch (error: unknown) {
// Translate to friendly error message.
const errorMessage =
"Unexpected error while executing the job to process application changes.";
processSummary.error(errorMessage, error);
throw new Error(errorMessage, { cause: error });
} finally {
this.logger.logProcessSummary(processSummary);
await logProcessSummaryToJobLogger(processSummary, job);
}
return [
`Applications reported: ${applicationsReported}`,
`Uploaded file name: ${uploadedFileName}`,
];
}

/**
* Setting the logger here allows the correct context to be set
* during the property injection.
* Even if the logger is not used, it is required to be set, to
* allow the base classes to write logs using the correct context.
*/
@InjectLogger()
logger: LoggerService;
}
Loading
Loading