Skip to content

Commit

Permalink
#4076 - Queue Monitoring - Schedulers Refactor (Fed Restrictions and …
Browse files Browse the repository at this point in the history
…Receipts) (#4135)

- Refactored schedulers `disbursement-receipts-file-integration` and
`federal-restrictions-integration`.
- Adjusted existing E2Es.
- As agreed in previous PRs
  - moving the audit user to the service instead of the processor.
- removing the friendly start/end logs from the processor since the
`BaseQueue` is already logging similar start/end logs.

### E2E `containLogMessages` changes
To allow the below check to happen the method `containLogMessages` was
changed to check if the string "contains" a value instead of "endsWith"
it. The `endsWith` was used to have a more precise check and remove the
log initial information (e.g. context and date). Using the "contains"
will still give enough assertion precision and allow inspecting errors
currently serialized to a JSON.
```ts
 // Act/Assert
await expect(processor.processQueue(mockedJob.job)).rejects.toThrow(
  "One or more errors were reported during the process, please see logs for details.",
);
expect(
  mockedJob.containLogMessages([
    `Error downloading file ${expectedFileName}.`,
    "Invalid file footer.",
  ]),
).toBe(true);
```
  • Loading branch information
andrewsignori-aot authored Dec 19, 2024
1 parent 272e503 commit 77f6d41
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,23 @@ describe(
return createFileFromStructuredRecords(file);
},
);
// Queued job.
const { job } = mockBullJob<void>();

// Act
const [result] = await processor.processDisbursementReceipts(job);

// Assert
const downloadedFile = path.join(
const expectedFileName = path.join(
process.env.ESDC_RESPONSE_FOLDER,
FEDERAL_ONLY_FULL_TIME_FILE,
);
expect(result.errorsSummary).toContain(
`Error downloading file ${downloadedFile}. Error: Error: Invalid file header.`,
);
// Queued job.
const mockedJob = mockBullJob<void>();

// Act/Assert
await expect(processor.processQueue(mockedJob.job)).rejects.toThrowError(
"One or more errors were reported during the process, please see logs for details.",
);
expect(
mockedJob.containLogMessages([
`Error downloading file ${expectedFileName}.`,
"Invalid file header.",
]),
).toBe(true);
});

it("Should log 'SIN Hash validation failed' error when the footer has an invalid SIN total hash.", async () => {
Expand All @@ -167,20 +170,24 @@ describe(
return createFileFromStructuredRecords(file);
},
);
// Queued job.
const { job } = mockBullJob<void>();

// Act
const [result] = await processor.processDisbursementReceipts(job);

// Assert
const downloadedFile = path.join(
// Expected file name.
const expectedFileName = path.join(
process.env.ESDC_RESPONSE_FOLDER,
FEDERAL_ONLY_FULL_TIME_FILE,
);
expect(result.errorsSummary).toContain(
`Error downloading file ${downloadedFile}. Error: Error: SIN Hash validation failed.`,
);
// Queued job.
const mockedJob = mockBullJob<void>();

// Act/Assert
await expect(processor.processQueue(mockedJob.job)).rejects.toThrowError(
"One or more errors were reported during the process, please see logs for details.",
);
expect(
mockedJob.containLogMessages([
`Error downloading file ${expectedFileName}.`,
"SIN Hash validation failed.",
]),
).toBe(true);
});

it("Should log 'Invalid file footer' error when the footer has a record code different than 'T'.", async () => {
Expand All @@ -195,20 +202,23 @@ describe(
return createFileFromStructuredRecords(file);
},
);
// Queued job.
const { job } = mockBullJob<void>();

// Act
const [result] = await processor.processDisbursementReceipts(job);

// Assert
const downloadedFile = path.join(
const expectedFileName = path.join(
process.env.ESDC_RESPONSE_FOLDER,
FEDERAL_ONLY_FULL_TIME_FILE,
);
expect(result.errorsSummary).toContain(
`Error downloading file ${downloadedFile}. Error: Error: Invalid file footer.`,
);
// Queued job.
const mockedJob = mockBullJob<void>();

// Act/Assert
await expect(processor.processQueue(mockedJob.job)).rejects.toThrow(
"One or more errors were reported during the process, please see logs for details.",
);
expect(
mockedJob.containLogMessages([
`Error downloading file ${expectedFileName}.`,
"Invalid file footer.",
]),
).toBe(true);
});

it("Should import disbursement receipt file and create federal and provincial awards receipts with proper awards code mappings for a full-time application when the file contains federal and provincial receipts.", async () => {
Expand All @@ -222,29 +232,29 @@ describe(
});
mockDownloadFiles(sftpClientMock, [FEDERAL_PROVINCIAL_FULL_TIME_FILE]);
// Queued job.
const { job } = mockBullJob<void>();
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processDisbursementReceipts(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Completed disbursement receipts integration.",
]);
const downloadedFile = path.join(
process.env.ESDC_RESPONSE_FOLDER,
FEDERAL_PROVINCIAL_FULL_TIME_FILE,
);
expect(result).toStrictEqual([
{
processSummary: [
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 3 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
"Provincial daily disbursement CSV report generated.",
],
errorsSummary: [],
},
]);
expect(
mockedJob.containLogMessages([
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 3 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
"Provincial daily disbursement CSV report generated.",
]),
).toBe(true);
// Assert imported receipts.
const { bcReceipt, feReceipt } = await getReceiptsForAssert(
FILE_DATE,
Expand Down Expand Up @@ -353,27 +363,27 @@ describe(
});
mockDownloadFiles(sftpClientMock, [FEDERAL_ONLY_FULL_TIME_FILE]);
// Queued job.
const { job } = mockBullJob<void>();
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processDisbursementReceipts(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Completed disbursement receipts integration.",
]);
const downloadedFile = path.join(
process.env.ESDC_RESPONSE_FOLDER,
FEDERAL_ONLY_FULL_TIME_FILE,
);
expect(result).toStrictEqual([
{
processSummary: [
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
],
errorsSummary: [],
},
]);
expect(
mockedJob.containLogMessages([
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
]),
).toBe(true);
// Assert imported receipts.
const { feReceipt, bcReceipt } = await getReceiptsForAssert(
FILE_DATE,
Expand Down Expand Up @@ -430,29 +440,29 @@ describe(
});
mockDownloadFiles(sftpClientMock, [FEDERAL_PROVINCIAL_PART_TIME_FILE]);
// Queued job.
const { job } = mockBullJob<void>();
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processDisbursementReceipts(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
const downloadedFile = path.join(
process.env.ESDC_RESPONSE_FOLDER,
FEDERAL_PROVINCIAL_PART_TIME_FILE,
);
expect(result).toStrictEqual([
{
processSummary: [
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 3 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
"Provincial daily disbursement CSV report generated.",
],
errorsSummary: [],
},
"Completed disbursement receipts integration.",
]);
expect(
mockedJob.containLogMessages([
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 3 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
"Provincial daily disbursement CSV report generated.",
]),
).toBe(true);
// Assert imported receipts.
const { bpReceipt, feReceipt } = await getReceiptsForAssert(
FILE_DATE,
Expand Down Expand Up @@ -563,30 +573,31 @@ describe(
FEDERAL_PROVINCIAL_FULL_TIME_PART_TIME_FILE,
]);
// Queued job.
const { job } = mockBullJob<void>();
const mockedJob = mockBullJob<void>();

// Act
const result = await processor.processDisbursementReceipts(job);
const result = await processor.processQueue(mockedJob.job);

// Assert
expect(result).toStrictEqual([
"Completed disbursement receipts integration.",
]);
const downloadedFile = path.join(
process.env.ESDC_RESPONSE_FOLDER,
FEDERAL_PROVINCIAL_FULL_TIME_PART_TIME_FILE,
);
expect(result).toEqual([
{
processSummary: [
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 3 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 4 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
"Provincial daily disbursement CSV report generated.",
],
errorsSummary: [],
},
]);
expect(
mockedJob.containLogMessages([
`Processing file ${downloadedFile}.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 2 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 3 inserted successfully.`,
`Record with document number ${SHARED_DOCUMENT_NUMBER} at line 4 inserted successfully.`,
`Processing file ${downloadedFile} completed.`,
`Processing provincial daily disbursement CSV file on ${FILE_DATE}.`,
"Provincial daily disbursement CSV report generated.",
]),
).toBe(true);

// Assert imported receipts.
const { feReceipt, bcReceipt, bpReceipt } = await getReceiptsForAssert(
FILE_DATE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { InjectQueue, Process, Processor } from "@nestjs/bull";
import { InjectQueue, Processor } from "@nestjs/bull";
import { DisbursementReceiptProcessingService } from "@sims/integrations/esdc-integration";
import { QueueService } from "@sims/services/queue";
import { SystemUsersService } from "@sims/services/system-users";
import { QueueNames } from "@sims/utilities";
import { Job, Queue } from "bull";
import { ProcessSummary } from "@sims/utilities/logger";
import { BaseScheduler } from "../../base-scheduler";
import { ESDCFileResponse } from "../models/esdc.models";
import {
InjectLogger,
LoggerService,
ProcessSummary,
} from "@sims/utilities/logger";

@Processor(QueueNames.DisbursementReceiptsFileIntegration)
export class DisbursementReceiptsFileIntegrationScheduler extends BaseScheduler<void> {
Expand All @@ -15,49 +17,30 @@ export class DisbursementReceiptsFileIntegrationScheduler extends BaseScheduler<
schedulerQueue: Queue<void>,
queueService: QueueService,
private readonly disbursementReceiptProcessingService: DisbursementReceiptProcessingService,
private readonly systemUsersService: SystemUsersService,
) {
super(schedulerQueue, queueService);
}

/**
* To be removed once the method {@link process} is implemented.
* This method "hides" the {@link Process} decorator from the base class.
* Process all the disbursement receipt files from remote SFTP location.
* @param _job process job.
* @param processSummary process summary for logging.
* @returns processing result.
*/
async processQueue(): Promise<string | string[]> {
throw new Error("Method not implemented.");
protected async process(
_job: Job<void>,
processSummary: ProcessSummary,
): Promise<string> {
await this.disbursementReceiptProcessingService.process(processSummary);
return "Completed disbursement receipts integration.";
}

/**
* When implemented in a derived class, process the queue job.
* To be implemented.
* Setting the logger here allows the correct context to be set
* during the property injection.
* Even if the logger is not used, it is required to be set, to
* allow the base classes to write logs using the correct context.
*/
protected async process(): Promise<string | string[]> {
throw new Error("Method not implemented.");
}

/**
* Process all the disbursement receipt files from remote sftp location.
* @params job job details.
* @returns Summary details of processing.
*/
@Process()
async processDisbursementReceipts(
job: Job<void>,
): Promise<ESDCFileResponse[]> {
const processSummary = new ProcessSummary();
processSummary.info(
`Processing full time disbursement receipts integration job ${job.id} of type ${job.name}.`,
);
const auditUser = this.systemUsersService.systemUser;
const processResponse =
await this.disbursementReceiptProcessingService.process(auditUser.id);
processSummary.info(
`Completed full time disbursement receipts integration job ${job.id} of type ${job.name}.`,
);
return processResponse.map((response) => ({
processSummary: response.processSummary,
errorsSummary: response.errorsSummary,
}));
}
@InjectLogger()
logger: LoggerService;
}
Loading

0 comments on commit 77f6d41

Please sign in to comment.