From f546f3db2f51931198a17245a246bfc0a2319210 Mon Sep 17 00:00:00 2001 From: Andrew Boni Signori <61259237+andrewsignori-aot@users.noreply.github.com> Date: Thu, 26 Dec 2024 12:04:37 -0800 Subject: [PATCH] #4079 - Queue Monitoring - Schedulers Refactor (Part 6) (#4177) Refactored the below schedulers and associated E2E tests. - assessment-workflow-enqueuer - assessment-workflow-queue-retry - application-changes-report-integration - process-notifications - student-application-notifications - atbc --- ...response-integration.scheduler.e2e-spec.ts | 43 ++++---- .../atbc-response-integration.scheduler.ts | 55 ++++----- ...s-report-integration.scheduler.e2e-spec.ts | 56 ++++++---- ...on-changes-report-integration.scheduler.ts | 71 ++++-------- .../process-notifications.scheduler.ts | 59 +++++----- ...cation-notifications.scheduler.e2e-spec.ts | 6 +- ...ent-application-notifications.scheduler.ts | 104 +++++++----------- ...nt-workflow-enqueuer.scheduler.e2e-spec.ts | 28 ++--- ...workflow-queue-retry.scheduler.e2e-spec.ts | 32 +++--- .../assessment-workflow-enqueuer.scheduler.ts | 83 +++++--------- ...sessment-workflow-queue-retry.scheduler.ts | 84 +++++--------- .../atbc-integration.processing.service.ts | 22 ++-- ...ation-changes-report.processing.service.ts | 2 +- 13 files changed, 274 insertions(+), 371 deletions(-) diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/_tests_/atbc-response-integration.scheduler.e2e-spec.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/_tests_/atbc-response-integration.scheduler.e2e-spec.ts index 7eb1554778..cefe025d50 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/_tests_/atbc-response-integration.scheduler.e2e-spec.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/_tests_/atbc-response-integration.scheduler.e2e-spec.ts @@ -1,15 +1,15 @@ -import { createMock } from "@golevelup/ts-jest"; import { INestApplication } from "@nestjs/common"; import { addDays, formatDate } from "@sims/utilities"; -import { createTestingAppModule } from "../../../../../test/helpers"; +import { + createTestingAppModule, + mockBullJob, +} from "../../../../../test/helpers"; import { ATBCResponseIntegrationScheduler } from "../atbc-response-integration.scheduler"; import { E2EDataSources, createE2EDataSources, saveFakeStudent, } from "@sims/test-utils"; -import { Job } from "bull"; -import { ProcessSummaryResult } from "@sims/integrations/models"; import { ATBCDisabilityStatusResponse, ATBCService, @@ -38,7 +38,7 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", ( // Arrange // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Create students who applied for disability status. const [ @@ -75,15 +75,17 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", ( }; // Act - const processResult = await processor.processPendingPDRequests(job); + const result = await processor.processQueue(mockedJob.job); // Assert - const expectedResult: ProcessSummaryResult = new ProcessSummaryResult(); - expectedResult.summary = [ - "Total disability status requests processed: 3", - "Students updated with disability status: 2", - ]; - expect(processResult).toStrictEqual(expectedResult); + expect(result).toStrictEqual(["Completed processing disability status."]); + expect( + mockedJob.containLogMessages([ + "Total disability status requests processed: 3", + "Students updated with disability status: 2", + ]), + ).toBe(true); + // Validate the disability status and status updated date. const pdReceivedStudent = await getStudentDisabilityStatusDetails( db, @@ -108,7 +110,7 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", ( // Arrange // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Create student who applied and received disability status. // If the ATBC response is received again with same disability status, no updated should happen. @@ -132,15 +134,16 @@ describe.skip("describeProcessorRootTest(QueueNames.ATBCResponseIntegration)", ( }; // Act - const processResult = await processor.processPendingPDRequests(job); + const result = await processor.processQueue(mockedJob.job); // Assert - const expectedResult: ProcessSummaryResult = new ProcessSummaryResult(); - expectedResult.summary = [ - "Total disability status requests processed: 1", - "Students updated with disability status: 0", - ]; - expect(processResult).toStrictEqual(expectedResult); + expect(result).toStrictEqual(["Completed processing disability status."]); + expect( + mockedJob.containLogMessages([ + "Total disability status requests processed: 1", + "Students updated with disability status: 0", + ]), + ).toBe(true); const pdReceivedStudent = await getStudentDisabilityStatusDetails( db, studentAppliedDisabilityAndUpdatedAlready.id, diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/atbc-response-integration.scheduler.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/atbc-response-integration.scheduler.ts index 7a602e3d04..e643285c64 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/atbc-response-integration.scheduler.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/atbc-respone-integration/atbc-response-integration.scheduler.ts @@ -1,12 +1,12 @@ -import { Process } from "@nestjs/bull"; import { Job, Queue } from "bull"; import { BaseScheduler } from "../base-scheduler"; import { QueueService } from "@sims/services/queue"; import { ATBCIntegrationProcessingService } from "@sims/integrations/atbc-integration"; import { - QueueProcessSummary, - QueueProcessSummaryResult, -} from "../../models/processors.models"; + InjectLogger, + LoggerService, + ProcessSummary, +} from "@sims/utilities/logger"; /** * Process all the applied PD requests to verify the status with ATBC. @@ -21,38 +21,27 @@ export class ATBCResponseIntegrationScheduler extends BaseScheduler { } /** - * To be removed once the method {@link process} is implemented. - * This method "hides" the {@link Process} decorator from the base class. - */ - async processQueue(): Promise { - throw new Error("Method not implemented."); - } - - /** - * When implemented in a derived class, process the queue job. - * To be implemented. + * Process all the applied disability status requests by students. + * @param _job process job. + * @param processSummary process summary for logging. + * @returns processing result. */ - protected async process(): Promise { - throw new Error("Method not implemented."); + protected async process( + _job: Job, + processSummary: ProcessSummary, + ): Promise { + await this.atbcIntegrationProcessingService.processAppliedDisabilityRequests( + processSummary, + ); + return "Completed processing disability status."; } /** - * Process all the applied disability status requests by students. - * @param job ATBC response integration job. - * @returns processing result. + * Setting the logger here allows the correct context to be set + * during the property injection. + * Even if the logger is not used, it is required to be set, to + * allow the base classes to write logs using the correct context. */ - @Process() - async processPendingPDRequests( - job: Job, - ): Promise { - const summary = new QueueProcessSummary({ - appLogger: this.logger, - jobLogger: job, - }); - await summary.info("Processing disability status for students."); - const processingResult = - await this.atbcIntegrationProcessingService.processAppliedDisabilityRequests(); - await summary.info("Completed processing disability status."); - return processingResult; - } + @InjectLogger() + logger: LoggerService; } diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/_tests_/application-changes-report-integration.scheduler.e2e-spec.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/_tests_/application-changes-report-integration.scheduler.e2e-spec.ts index 7ddce83bc9..d37cd8329d 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/_tests_/application-changes-report-integration.scheduler.e2e-spec.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/_tests_/application-changes-report-integration.scheduler.e2e-spec.ts @@ -39,6 +39,8 @@ import { ApplicationChangesReportHeaders, APPLICATION_CHANGES_DATE_TIME_FORMAT, } from "@sims/integrations/esdc-integration"; +import MockDate from "mockdate"; +import * as dayjs from "dayjs"; describe( describeProcessorRootTest(QueueNames.ApplicationChangesReportIntegration), @@ -49,6 +51,7 @@ describe( let sftpClientMock: DeepMocked; let auditUser: User; let sharedStudent: Student; + let expectedFileName: string; beforeAll(async () => { const { nestApplication, dataSource, sshClientMock } = @@ -70,6 +73,10 @@ describe( }, { previousDateChangedReportedAssessment: { id: null } }, ); + // Mock the current date. + const now = new Date(); + MockDate.set(now); + expectedFileName = `DPBC.EDU.APPCHANGES.${formatFileNameDate(now)}.csv`; }); it("Should generate application changes report and update the reported date when there is one or more application changes which are not reported.", async () => { @@ -89,16 +96,14 @@ describe( const mockedJob = mockBullJob(); // Act - const processingResult = await processor.processApplicationChanges( - mockedJob.job, - ); + const result = await processor.processQueue(mockedJob.job); + // Assert // Assert process result. - expect(processingResult).toContain("Process finalized with success."); - expect(processingResult).toContain("Applications reported: 2"); - expect( - mockedJob.containLogMessages(["Found 2 application changes."]), - ).toBe(true); + expect(result).toStrictEqual([ + "Applications reported: 2", + `Uploaded file name: ${expectedFileName}`, + ]); const uploadedFile = getUploadedFile(sftpClientMock); const expectedFirstRecord = getExpectedApplicationChangesCSVRecord( firstApplication, @@ -176,16 +181,13 @@ describe( const mockedJob = mockBullJob(); // Act - const processingResult = await processor.processApplicationChanges( - mockedJob.job, - ); + const result = await processor.processQueue(mockedJob.job); // Assert // Assert process result. - expect(processingResult).toContain("Process finalized with success."); - expect(processingResult).toContain("Applications reported: 1"); - expect( - mockedJob.containLogMessages(["Found 1 application changes."]), - ).toBe(true); + expect(result).toStrictEqual([ + "Applications reported: 1", + `Uploaded file name: ${expectedFileName}`, + ]); const uploadedFile = getUploadedFile(sftpClientMock); const expectedFirstRecord = getExpectedApplicationChangesCSVRecord( application, @@ -226,16 +228,19 @@ describe( const mockedJob = mockBullJob(); // Act - const processingResult = await processor.processApplicationChanges( - mockedJob.job, - ); + const result = await processor.processQueue(mockedJob.job); + // Assert // Assert process result. - expect(processingResult).toContain("Process finalized with success."); - expect(processingResult).toContain("Applications reported: 0"); + expect(result).toStrictEqual([ + "Applications reported: 0", + `Uploaded file name: ${expectedFileName}`, + ]); expect( mockedJob.containLogMessages([ + "Retrieving all application changes that have not yet been reported.", "Found 0 application changes.", + `Application changes report with file name ${expectedFileName} has been uploaded successfully.`, "Report date update not required as no application changes are reported.", ]), ).toBe(true); @@ -397,5 +402,14 @@ describe( newOffering.studyEndDate, ].join(","); } + + /** + * Format date to be used in file name. + * @param date date. + * @returns file name date format. + */ + function formatFileNameDate(date: Date): string { + return dayjs(date).format("YYYY-MM-DD_HH.mm.ss"); + } }, ); diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/application-changes-report-integration.scheduler.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/application-changes-report-integration.scheduler.ts index 5f09581111..0f906cfdb6 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/application-changes-report-integration.scheduler.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/esdc-integration/application-changes-report-integration/application-changes-report-integration.scheduler.ts @@ -1,4 +1,4 @@ -import { InjectQueue, Process, Processor } from "@nestjs/bull"; +import { InjectQueue, Processor } from "@nestjs/bull"; import { QueueService } from "@sims/services/queue"; import { Job, Queue } from "bull"; import { BaseScheduler } from "../../base-scheduler"; @@ -7,10 +7,6 @@ import { LoggerService, ProcessSummary, } from "@sims/utilities/logger"; -import { - getSuccessMessageWithAttentionCheck, - logProcessSummaryToJobLogger, -} from "../../../../utilities"; import { QueueNames } from "@sims/utilities"; import { ApplicationChangesReportProcessingService } from "@sims/integrations/esdc-integration"; @@ -25,65 +21,36 @@ export class ApplicationChangesReportIntegrationScheduler extends BaseScheduler< super(schedulerQueue, queueService); } - /** - * To be removed once the method {@link process} is implemented. - * This method "hides" the {@link Process} decorator from the base class. - */ - async processQueue(): Promise { - throw new Error("Method not implemented."); - } - - /** - * When implemented in a derived class, process the queue job. - * To be implemented. - */ - protected async process(): Promise { - throw new Error("Method not implemented."); - } - /** * Generate application changes report for the applications which has at least one e-Cert sent * and the application study dates have changed after the first e-Cert * or after the last time the application was reported for study dates change * through application changes report. Once generated upload the report to the ESDC directory * in SFTP server. - * @param job job. + * @param _job process job. + * @param processSummary process summary for logging. * @returns process summary. */ - @Process() - async processApplicationChanges(job: Job): Promise { - const processSummary = new ProcessSummary(); - - try { - processSummary.info( - `Processing application changes report integration job. Job id: ${job.id} and Job name: ${job.name}.`, - ); - const integrationProcessSummary = new ProcessSummary(); - processSummary.children(integrationProcessSummary); - const { applicationsReported, uploadedFileName } = - await this.applicationChangesReportProcessingService.processApplicationChanges( - integrationProcessSummary, - ); - return getSuccessMessageWithAttentionCheck( - [ - "Process finalized with success.", - `Applications reported: ${applicationsReported}`, - `Uploaded file name: ${uploadedFileName}`, - ], + protected async process( + _job: Job, + processSummary: ProcessSummary, + ): Promise { + const { applicationsReported, uploadedFileName } = + await this.applicationChangesReportProcessingService.processApplicationChanges( processSummary, ); - } catch (error: unknown) { - // Translate to friendly error message. - const errorMessage = - "Unexpected error while executing the job to process application changes."; - processSummary.error(errorMessage, error); - throw new Error(errorMessage, { cause: error }); - } finally { - this.logger.logProcessSummary(processSummary); - await logProcessSummaryToJobLogger(processSummary, job); - } + return [ + `Applications reported: ${applicationsReported}`, + `Uploaded file name: ${uploadedFileName}`, + ]; } + /** + * Setting the logger here allows the correct context to be set + * during the property injection. + * Even if the logger is not used, it is required to be set, to + * allow the base classes to write logs using the correct context. + */ @InjectLogger() logger: LoggerService; } diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/notification/process-notifications.scheduler.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/notification/process-notifications.scheduler.ts index 03c0ec376b..9e1df5a364 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/notification/process-notifications.scheduler.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/notification/process-notifications.scheduler.ts @@ -1,11 +1,15 @@ -import { InjectQueue, Process, Processor } from "@nestjs/bull"; +import { InjectQueue, Processor } from "@nestjs/bull"; import { Job, Queue } from "bull"; import { NotificationService } from "@sims/services/notifications"; import { ProcessNotificationsQueueInDTO } from "./models/notification.dto"; import { BaseScheduler } from "../base-scheduler"; import { QueueNames } from "@sims/utilities"; import { QueueService } from "@sims/services/queue"; -import { QueueProcessSummaryResult } from "../../models/processors.models"; +import { + InjectLogger, + LoggerService, + ProcessSummary, +} from "@sims/utilities/logger"; /** * Process notifications which are unsent. @@ -21,22 +25,6 @@ export class ProcessNotificationScheduler extends BaseScheduler { - throw new Error("Method not implemented."); - } - - /** - * When implemented in a derived class, process the queue job. - * To be implemented. - */ - protected async process(): Promise { - throw new Error("Method not implemented."); - } - protected async payload(): Promise { const queuePollingRecordsLimit = await this.queueService.getQueuePollingRecordLimit( @@ -48,21 +36,40 @@ export class ProcessNotificationScheduler extends BaseScheduler, - ): Promise { + processSummary: ProcessSummary, + ): Promise { const processNotificationResponse = await this.notificationService.processUnsentNotifications( job.data.pollingRecordsLimit, ); - const processSummaryResult: string[] = [ - `Total notifications processed ${processNotificationResponse.notificationsProcessed}`, - `Total notifications successfully processed ${processNotificationResponse.notificationsSuccessfullyProcessed}`, + if ( + processNotificationResponse.notificationsProcessed !== + processNotificationResponse.notificationsSuccessfullyProcessed + ) { + processSummary.warn( + "Not all pending notifications were successfully processed.", + ); + } + const processedLogResult = [ + `Total notifications processed ${processNotificationResponse.notificationsProcessed}.`, + `Total notifications successfully processed ${processNotificationResponse.notificationsSuccessfullyProcessed}.`, ]; - return { summary: processSummaryResult } as QueueProcessSummaryResult; + processedLogResult.forEach((log) => processSummary.info(log)); + return processedLogResult; } + + /** + * Setting the logger here allows the correct context to be set + * during the property injection. + * Even if the logger is not used, it is required to be set, to + * allow the base classes to write logs using the correct context. + */ + @InjectLogger() + logger: LoggerService; } diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/_tests_/student-application-notifications.scheduler.e2e-spec.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/_tests_/student-application-notifications.scheduler.e2e-spec.ts index c41f327438..2db351376c 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/_tests_/student-application-notifications.scheduler.e2e-spec.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/_tests_/student-application-notifications.scheduler.e2e-spec.ts @@ -86,7 +86,7 @@ describe( const mockedJob = mockBullJob(); // Act - await processor.studentApplicationNotifications(mockedJob.job); + await processor.processQueue(mockedJob.job); // Assert expect( @@ -172,7 +172,7 @@ describe( const mockedJob = mockBullJob(); // Act - await processor.studentApplicationNotifications(mockedJob.job); + await processor.processQueue(mockedJob.job); // Assert expect( @@ -271,7 +271,7 @@ describe( const mockedJob = mockBullJob(); // Act - await processor.studentApplicationNotifications(mockedJob.job); + await processor.processQueue(mockedJob.job); // Assert expect( diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/student-application-notifications.scheduler.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/student-application-notifications.scheduler.ts index 4b68402b8b..6ab838da6a 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/student-application-notifications.scheduler.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/student-application-notifications/student-application-notifications.scheduler.ts @@ -1,4 +1,4 @@ -import { InjectQueue, Process, Processor } from "@nestjs/bull"; +import { InjectQueue, Processor } from "@nestjs/bull"; import { QueueService } from "@sims/services/queue"; import { Job, Queue } from "bull"; import { BaseScheduler } from "../base-scheduler"; @@ -7,10 +7,6 @@ import { LoggerService, ProcessSummary, } from "@sims/utilities/logger"; -import { - getSuccessMessageWithAttentionCheck, - logProcessSummaryToJobLogger, -} from "../../../utilities"; import { QueueNames } from "@sims/utilities"; import { ApplicationService } from "../../../services"; import { StudentPDPPDNotification } from "@sims/services/notifications"; @@ -28,78 +24,54 @@ export class StudentApplicationNotificationsScheduler extends BaseScheduler { - throw new Error("Method not implemented."); - } - - /** - * When implemented in a derived class, process the queue job. - * To be implemented. - */ - protected async process(): Promise { - throw new Error("Method not implemented."); - } - /** * Process Student for Email notification - PD/PPD Student reminder email 8 weeks before end date. + * @param _job process job. + * @param processSummary process summary for logging. + * @returns processing result. */ - @Process() - async studentApplicationNotifications(job: Job): Promise { - const processSummary = new ProcessSummary(); - - try { - this.logger.log( - `Processing student application notifications job. Job id: ${job.id} and Job name: ${job.name}.`, - ); + protected async process( + _job: Job, + processSummary: ProcessSummary, + ): Promise { + const eligibleApplications = + await this.applicationService.getApplicationWithPDPPStatusMismatch(); - const eligibleApplications = - await this.applicationService.getApplicationWithPDPPStatusMismatch(); + const notifications = eligibleApplications.map( + (application) => ({ + userId: application.student.user.id, + givenNames: application.student.user.firstName, + lastName: application.student.user.lastName, + email: application.student.user.email, + applicationNumber: application.applicationNumber, + assessmentId: application.currentAssessment.id, + }), + ); - const notifications = eligibleApplications.map( - (application) => ({ - userId: application.student.user.id, - givenNames: application.student.user.firstName, - lastName: application.student.user.lastName, - email: application.student.user.email, - applicationNumber: application.applicationNumber, - assessmentId: application.currentAssessment.id, - }), - ); + await this.notificationActionsService.saveStudentApplicationPDPPDNotification( + notifications, + ); - await this.notificationActionsService.saveStudentApplicationPDPPDNotification( - notifications, + if (eligibleApplications.length) { + processSummary.info( + `PD/PPD mismatch assessments that generated notifications: ${eligibleApplications + .map((app) => app.currentAssessment.id) + .join(", ")}`, ); - - if (eligibleApplications.length) { - processSummary.info( - `PD/PPD mismatch assessments that generated notifications: ${eligibleApplications - .map((app) => app.currentAssessment.id) - .join(", ")}`, - ); - } else { - processSummary.info( - `No assessments found to generate PD/PPD mismatch notifications.`, - ); - } - - return getSuccessMessageWithAttentionCheck( - ["Process finalized with success."], - processSummary, + } else { + processSummary.info( + `No assessments found to generate PD/PPD mismatch notifications.`, ); - } catch (error: unknown) { - const errorMessage = "Unexpected error while executing the job."; - processSummary.error(errorMessage, error); - throw new Error(errorMessage, { cause: error }); - } finally { - this.logger.logProcessSummary(processSummary); - await logProcessSummaryToJobLogger(processSummary, job); } + return "Process finalized with success."; } + /** + * Setting the logger here allows the correct context to be set + * during the property injection. + * Even if the logger is not used, it is required to be set, to + * allow the base classes to write logs using the correct context. + */ @InjectLogger() logger: LoggerService; } diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-enqueuer.scheduler.e2e-spec.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-enqueuer.scheduler.e2e-spec.ts index c4f5609997..3920ad4309 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-enqueuer.scheduler.e2e-spec.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-enqueuer.scheduler.e2e-spec.ts @@ -1,10 +1,10 @@ -import { Job, Queue } from "bull"; -import { createMock } from "@golevelup/ts-jest"; +import { Queue } from "bull"; import { INestApplication } from "@nestjs/common"; import { QueueNames, addDays } from "@sims/utilities"; import { createTestingAppModule, describeProcessorRootTest, + mockBullJob, } from "../../../../../test/helpers"; import { AssessmentWorkflowEnqueuerScheduler } from "../assessment-workflow-enqueuer.scheduler"; import { @@ -72,10 +72,10 @@ describe( // Application submitted with original assessment. const application = await createDefaultApplication(); // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Act - await processor.enqueueAssessmentOperations(job); + await processor.processQueue(mockedJob.job); // Assert // Load application and related assessments for assertion. @@ -122,10 +122,10 @@ describe( application.currentAssessment = newestAssessment; await db.application.save(application); // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Act - await processor.enqueueAssessmentOperations(job); + await processor.processQueue(mockedJob.job); // Assert // Load application and related assessments for assertion. @@ -166,10 +166,10 @@ describe( await db.studentAssessment.save(submittedAssessment); // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Act - await processor.enqueueAssessmentOperations(job); + await processor.processQueue(mockedJob.job); // Assert expect(startApplicationAssessmentQueueMock.add).not.toBeCalled(); @@ -186,10 +186,10 @@ describe( await db.application.save(application); // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Act - await processor.enqueueAssessmentOperations(job); + await processor.processQueue(mockedJob.job); // Assert expect(startApplicationAssessmentQueueMock.add).not.toBeCalled(); @@ -206,10 +206,10 @@ describe( await db.application.save(application); // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Act - await processor.enqueueAssessmentOperations(job); + await processor.processQueue(mockedJob.job); // Assert // Load application and related assessments for assertion. @@ -248,10 +248,10 @@ describe( await db.application.save(application); // Queued job. - const job = createMock>(); + const mockedJob = mockBullJob(); // Act - await processor.enqueueAssessmentOperations(job); + await processor.processQueue(mockedJob.job); // Assert expect(cancelApplicationAssessmentQueueMock.add).not.toBeCalled(); diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-queue-retry.scheduler.e2e-spec.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-queue-retry.scheduler.e2e-spec.ts index 3143690694..0f3484bf7d 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-queue-retry.scheduler.e2e-spec.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/_tests_/assessment-workflow-queue-retry.scheduler.e2e-spec.ts @@ -1,10 +1,10 @@ -import { Job, Queue } from "bull"; -import { createMock } from "@golevelup/ts-jest"; +import { Queue } from "bull"; import { INestApplication } from "@nestjs/common"; import { QueueNames, addHours } from "@sims/utilities"; import { createTestingAppModule, describeProcessorRootTest, + mockBullJob, } from "../../../../../test/helpers"; import { E2EDataSources, @@ -78,11 +78,12 @@ describe( await db.studentAssessment.save(currentAssessment); // Retry job. - const job = createMock>(); - job.data.amountHoursAssessmentRetry = 6; + const mockedJob = mockBullJob({ + amountHoursAssessmentRetry: 6, + }); // Act - await retryProcessor.enqueueAssessmentRetryOperations(job); + await retryProcessor.processQueue(mockedJob.job); // Assert // Assert item was added to the queue. @@ -104,11 +105,12 @@ describe( ); await db.studentAssessment.save(currentAssessment); // Retry job. - const job = createMock>(); - job.data.amountHoursAssessmentRetry = 6; + const mockedJob = mockBullJob({ + amountHoursAssessmentRetry: 6, + }); // Act - await retryProcessor.enqueueAssessmentRetryOperations(job); + await retryProcessor.processQueue(mockedJob.job); // Assert // Assert item was added to the queue. @@ -133,11 +135,12 @@ describe( await db.studentAssessment.save(currentAssessment); // Retry job. - const job = createMock>(); - job.data.amountHoursAssessmentRetry = 6; + const mockedJob = mockBullJob({ + amountHoursAssessmentRetry: 6, + }); // Act - await retryProcessor.enqueueAssessmentRetryOperations(job); + await retryProcessor.processQueue(mockedJob.job); // Assert // Assert item was added to the queue. @@ -162,11 +165,12 @@ describe( await db.studentAssessment.save(currentAssessment); // Retry job. - const job = createMock>(); - job.data.amountHoursAssessmentRetry = 6; + const mockedJob = mockBullJob({ + amountHoursAssessmentRetry: 6, + }); // Act - await retryProcessor.enqueueAssessmentRetryOperations(job); + await retryProcessor.processQueue(mockedJob.job); // Assert // Assert item was added to the queue. diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-enqueuer.scheduler.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-enqueuer.scheduler.ts index 34c591599b..f9ba48c1be 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-enqueuer.scheduler.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-enqueuer.scheduler.ts @@ -1,4 +1,4 @@ -import { InjectQueue, Process, Processor } from "@nestjs/bull"; +import { InjectQueue, Processor } from "@nestjs/bull"; import { Job, Queue } from "bull"; import { BaseScheduler } from "../base-scheduler"; import { QueueNames } from "@sims/utilities"; @@ -9,10 +9,6 @@ import { LoggerService, ProcessSummary, } from "@sims/utilities/logger"; -import { - getSuccessMessageWithAttentionCheck, - logProcessSummaryToJobLogger, -} from "../../../utilities"; /** * Search for assessments that have some pending operation, for instance, @@ -29,60 +25,31 @@ export class AssessmentWorkflowEnqueuerScheduler extends BaseScheduler { super(schedulerQueue, queueService); } - /** - * To be removed once the method {@link process} is implemented. - * This method "hides" the {@link Process} decorator from the base class. - */ - async processQueue(): Promise { - throw new Error("Method not implemented."); - } - - /** - * When implemented in a derived class, process the queue job. - * To be implemented. - */ - protected async process(): Promise { - throw new Error("Method not implemented."); - } - /** * Process all applications with pending assessments to be calculated. - * @param job job information. + * @param _job process job. + * @param processSummary process summary for logging. * @returns processing result. */ - @Process() - async enqueueAssessmentOperations(job: Job): Promise { - const processSummary = new ProcessSummary(); - try { - processSummary.info( - "Checking application assessments to be queued for start.", - ); - // Check for applications with assessments to be cancelled. - await this.executeEnqueueProcess( - processSummary, - this.workflowEnqueuerService.enqueueCancelAssessmentWorkflows.bind( - this.workflowEnqueuerService, - ), - ); - // Check for applications with assessments to be started. - await this.executeEnqueueProcess( - processSummary, - this.workflowEnqueuerService.enqueueStartAssessmentWorkflows.bind( - this.workflowEnqueuerService, - ), - ); - return getSuccessMessageWithAttentionCheck( - ["Process finalized with success."], - processSummary, - ); - } catch (error: unknown) { - const errorMessage = "Unexpected error while executing the job."; - processSummary.error(errorMessage, error); - return [errorMessage]; - } finally { - this.logger.logProcessSummary(processSummary); - await logProcessSummaryToJobLogger(processSummary, job); - } + protected async process( + _job: Job, + processSummary: ProcessSummary, + ): Promise { + // Check for applications with assessments to be cancelled. + await this.executeEnqueueProcess( + processSummary, + this.workflowEnqueuerService.enqueueCancelAssessmentWorkflows.bind( + this.workflowEnqueuerService, + ), + ); + // Check for applications with assessments to be started. + await this.executeEnqueueProcess( + processSummary, + this.workflowEnqueuerService.enqueueStartAssessmentWorkflows.bind( + this.workflowEnqueuerService, + ), + ); + return "Process finalized with success."; } /** @@ -108,6 +75,12 @@ export class AssessmentWorkflowEnqueuerScheduler extends BaseScheduler { } } + /** + * Setting the logger here allows the correct context to be set + * during the property injection. + * Even if the logger is not used, it is required to be set, to + * allow the base classes to write logs using the correct context. + */ @InjectLogger() logger: LoggerService; } diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-queue-retry.scheduler.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-queue-retry.scheduler.ts index 0ba7701353..e3dbbc7f7e 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-queue-retry.scheduler.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/workflow/assessment-workflow-queue-retry.scheduler.ts @@ -1,4 +1,4 @@ -import { InjectQueue, Process, Processor } from "@nestjs/bull"; +import { InjectQueue, Processor } from "@nestjs/bull"; import { Job, Queue } from "bull"; import { BaseScheduler } from "../base-scheduler"; import { addHours, QueueNames } from "@sims/utilities"; @@ -10,10 +10,6 @@ import { LoggerService, ProcessSummary, } from "@sims/utilities/logger"; -import { - getSuccessMessageWithAttentionCheck, - logProcessSummaryToJobLogger, -} from "../../../utilities"; import { AssessmentWorkflowQueueRetryInDTO } from "./models/assessment-workflow-queue-retry.dto"; /** @@ -30,61 +26,33 @@ export class WorkflowQueueRetryScheduler extends BaseScheduler { - throw new Error("Method not implemented."); - } - - /** - * When implemented in a derived class, process the queue job. - * To be implemented. - */ - protected async process(): Promise { - throw new Error("Method not implemented."); - } - /** * Process all assessments that were not processed in a period of time. - * @param job job information. + * @param job process job. + * @param processSummary process summary for logging. * @returns processing result. */ - @Process() - async enqueueAssessmentRetryOperations( + protected async process( job: Job, - ): Promise { - const processSummary = new ProcessSummary(); - try { - processSummary.info("Checking assessments to be queued for retry."); - // Check for assessments cancellations to be retried. - await this.executeEnqueueProcess( - job.data.amountHoursAssessmentRetry, - processSummary, - this.workflowEnqueuerService.enqueueCancelAssessmentRetryWorkflows.bind( - this.workflowEnqueuerService, - ), - ); - // Check for assessments to be started. - await this.executeEnqueueProcess( - job.data.amountHoursAssessmentRetry, - processSummary, - this.workflowEnqueuerService.enqueueStartAssessmentRetryWorkflows.bind( - this.workflowEnqueuerService, - ), - ); - return getSuccessMessageWithAttentionCheck( - ["Process finalized with success."], - processSummary, - ); - } catch (error: unknown) { - const errorMessage = "Unexpected error while executing the job."; - processSummary.error(errorMessage, error); - } finally { - this.logger.logProcessSummary(processSummary); - await logProcessSummaryToJobLogger(processSummary, job); - } + processSummary: ProcessSummary, + ): Promise { + // Check for assessments cancellations to be retried. + await this.executeEnqueueProcess( + job.data.amountHoursAssessmentRetry, + processSummary, + this.workflowEnqueuerService.enqueueCancelAssessmentRetryWorkflows.bind( + this.workflowEnqueuerService, + ), + ); + // Check for assessments to be started. + await this.executeEnqueueProcess( + job.data.amountHoursAssessmentRetry, + processSummary, + this.workflowEnqueuerService.enqueueStartAssessmentRetryWorkflows.bind( + this.workflowEnqueuerService, + ), + ); + return "Process finalized with success."; } /** @@ -128,6 +96,12 @@ export class WorkflowQueueRetryScheduler extends BaseScheduler { - const processSummaryResult: ProcessSummaryResult = - new ProcessSummaryResult(); + async processAppliedDisabilityRequests( + processSummary: ProcessSummary, + ): Promise { // Students who applied for disability status and waiting for confirmation. const studentDisabilityUpdates = await this.atbcService.getStudentDisabilityStatusUpdatesByDate(); @@ -83,10 +87,7 @@ export class ATBCIntegrationProcessingService { ), })); let updatedDisabilityStatusCount = 0; - this.logger.log( - `Total disability status requests processed: ${studentsToUpdate.length}`, - ); - processSummaryResult.summary.push( + processSummary.info( `Total disability status requests processed: ${studentsToUpdate.length}`, ); if (studentsToUpdate.length) { @@ -102,10 +103,9 @@ export class ATBCIntegrationProcessingService { (result) => result, ).length; } - processSummaryResult.summary.push( + processSummary.info( `Students updated with disability status: ${updatedDisabilityStatusCount}`, ); - return processSummaryResult; } /** diff --git a/sources/packages/backend/libs/integrations/src/esdc-integration/application-changes-report/application-changes-report.processing.service.ts b/sources/packages/backend/libs/integrations/src/esdc-integration/application-changes-report/application-changes-report.processing.service.ts index 028b24c369..7147f79619 100644 --- a/sources/packages/backend/libs/integrations/src/esdc-integration/application-changes-report/application-changes-report.processing.service.ts +++ b/sources/packages/backend/libs/integrations/src/esdc-integration/application-changes-report/application-changes-report.processing.service.ts @@ -40,7 +40,7 @@ export class ApplicationChangesReportProcessingService { processSummary: ProcessSummary, ): Promise { processSummary.info( - "Retrieving all application changes which were not reported already.", + "Retrieving all application changes that have not yet been reported.", ); const applicationChanges = await this.applicationService.getDateChangeNotReportedApplications();