Skip to content

Commit

Permalink
migrate from bull to BullMQ (#984)
Browse files Browse the repository at this point in the history
migrate from bull -> BullMQ
update all local jobs to support BullMQ
update Redis configuration to work with new BullMQ
  • Loading branch information
mhuseinov authored Sep 19, 2023
1 parent 7054c96 commit e8a8f80
Show file tree
Hide file tree
Showing 17 changed files with 284 additions and 312 deletions.
20 changes: 12 additions & 8 deletions services/apps/alcs/src/queues/bullConfig.service.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import { CONFIG_TOKEN } from '@app/common/config/config.module';
import {
BullModuleOptions,
SharedBullConfigurationFactory,
} from '@nestjs/bull';
import { SharedBullConfigurationFactory } from '@nestjs/bullmq';
import { Inject, Injectable } from '@nestjs/common';
import { IConfig } from 'config';
import { QueueOptions } from 'bullmq';
import {
CONFIG_TOKEN,
IConfig,
} from '../../../../libs/common/src/config/config.module';

@Injectable()
export class BullConfigService implements SharedBullConfigurationFactory {
constructor(@Inject(CONFIG_TOKEN) private config: IConfig) {}

createSharedConfiguration(): BullModuleOptions {
createSharedConfiguration(): QueueOptions {
return {
url: this.config.get<string>('REDIS.URL'),
connection: {
host: this.config.get<string>('REDIS.HOST'),
port: this.config.get<number>('REDIS.PORT'),
password: this.config.get<string>('REDIS.PASSWORD'),
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('SchedulerConsumerService', () => {
subject: 'Applications near expiry',
};

await applicationExpiryConsumer.applicationExpiry();
await applicationExpiryConsumer.process();

expect(mockApplicationService.getAllNearExpiryDates).toBeCalledTimes(1);
expect(mockEmailService.sendEmail).toBeCalledTimes(1);
Expand All @@ -69,7 +69,7 @@ describe('SchedulerConsumerService', () => {
it('should not send email if no application near expiry', async () => {
mockApplicationService.getAllNearExpiryDates.mockResolvedValue([]);

await applicationExpiryConsumer.applicationExpiry();
await applicationExpiryConsumer.process();

expect(mockApplicationService.getAllNearExpiryDates).toBeCalledTimes(1);
expect(mockEmailService.sendEmail).toBeCalledTimes(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CONFIG_TOKEN, IConfig } from '@app/common/config/config.module';
import { Process, Processor } from '@nestjs/bull';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Inject, Logger } from '@nestjs/common';
import * as dayjs from 'dayjs';
import {
Expand All @@ -10,17 +10,18 @@ import { EmailService } from '../../../../providers/email/email.service';
import { QUEUES } from '../../scheduler.service';

@Processor(QUEUES.APP_EXPIRY)
export class ApplicationExpiryConsumer {
export class ApplicationExpiryConsumer extends WorkerHost {
private logger = new Logger(ApplicationExpiryConsumer.name);

constructor(
private applicationService: ApplicationService,
private emailService: EmailService,
@Inject(CONFIG_TOKEN) private config: IConfig,
) {}
) {
super();
}

@Process()
async applicationExpiry() {
async process() {
try {
this.logger.debug('starting applicationExpiry');

Expand Down Expand Up @@ -50,6 +51,11 @@ export class ApplicationExpiryConsumer {
}
}

@OnWorkerEvent('completed')
onCompleted() {
this.logger.debug('Completed applicationExpiry job.');
}

private getApplicationsNearExpiryDates() {
const startDate = dayjs().add(-90, 'day');
const endDate = dayjs().add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ describe('ApplicationSubmissionStatusEmailConsumer', () => {
});
mockStatusEmailService.sendApplicationStatusEmail.mockResolvedValue();

await consumer.processSubmissionStatusesAndSendEmails();
await consumer.process();

expect(
mockApplicationSubmissionStatusService.getSubmissionToSubmissionStatusForSendingEmails,
Expand Down Expand Up @@ -119,7 +119,7 @@ describe('ApplicationSubmissionStatusEmailConsumer', () => {
});
mockStatusEmailService.sendApplicationStatusEmail.mockResolvedValue();

await consumer.processSubmissionStatusesAndSendEmails();
await consumer.process();

expect(
mockApplicationSubmissionStatusService.getSubmissionToSubmissionStatusForSendingEmails,
Expand Down Expand Up @@ -169,7 +169,7 @@ describe('ApplicationSubmissionStatusEmailConsumer', () => {
});
mockStatusEmailService.sendApplicationStatusEmail.mockResolvedValue();

await consumer.processSubmissionStatusesAndSendEmails();
await consumer.process();

expect(
mockApplicationSubmissionStatusService.getSubmissionToSubmissionStatusForSendingEmails,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Process, Processor } from '@nestjs/bull';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import * as dayjs from 'dayjs';
import * as timezone from 'dayjs/plugin/timezone';
Expand All @@ -19,16 +19,17 @@ dayjs.extend(utc);
dayjs.extend(timezone);

@Processor(QUEUES.APPLICATION_STATUS_EMAILS)
export class ApplicationSubmissionStatusEmailConsumer {
export class ApplicationSubmissionStatusEmailConsumer extends WorkerHost {
private logger = new Logger(ApplicationSubmissionStatusEmailConsumer.name);

constructor(
private submissionStatusService: ApplicationSubmissionStatusService,
private statusEmailService: StatusEmailService,
) {}
) {
super();
}

@Process()
async processSubmissionStatusesAndSendEmails() {
async process() {
try {
this.logger.debug(
'Starting application submission status email consumer.',
Expand Down Expand Up @@ -71,6 +72,13 @@ export class ApplicationSubmissionStatusEmailConsumer {
}
}

@OnWorkerEvent('completed')
onCompleted() {
this.logger.debug(
'Completed ApplicationSubmissionStatusEmailConsumer job.',
);
}

private async sendEmailAndUpdateStatus(
applicationSubmission: ApplicationSubmission,
submissionGovernment: LocalGovernment | null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('SchedulerConsumerService', () => {
affected: 1,
} as DeleteResult);

await notificationCleanUpConsumer.cleanUpNotifications();
await notificationCleanUpConsumer.process();

expect(mockNotificationService.cleanUp).toHaveBeenCalledTimes(2);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Process, Processor } from '@nestjs/bull';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import * as dayjs from 'dayjs';
import { MessageService } from '../../alcs/message/message.service';
Expand All @@ -8,13 +8,14 @@ const DAYS_TO_RETAIN_READ = 30;
const DAYS_TO_RETAIN_UNREAD = 365;

@Processor(QUEUES.CLEANUP_NOTIFICATIONS)
export class CleanUpNotificationsConsumer {
export class CleanUpNotificationsConsumer extends WorkerHost {
private logger = new Logger(CleanUpNotificationsConsumer.name);

constructor(private notificationService: MessageService) {}
constructor(private notificationService: MessageService) {
super();
}

@Process()
async cleanUpNotifications() {
async process() {
try {
this.logger.debug('starting notification cleanup');

Expand Down Expand Up @@ -43,4 +44,9 @@ export class CleanUpNotificationsConsumer {
}
return;
}

@OnWorkerEvent('completed')
onCompleted() {
this.logger.debug('Completed CleanUpNotificationsConsumer job.');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ describe('NoticeOfIntentSubmissionStatusEmailConsumer', () => {
});
mockStatusEmailService.sendNoticeOfIntentStatusEmail.mockResolvedValue();

await consumer.processSubmissionStatusesAndSendEmails();
await consumer.process();

expect(
mockNoticeOfIntentSubmissionStatusService.getSubmissionToSubmissionStatusForSendingEmails,
Expand Down Expand Up @@ -119,7 +119,7 @@ describe('NoticeOfIntentSubmissionStatusEmailConsumer', () => {
});
mockStatusEmailService.sendNoticeOfIntentStatusEmail.mockResolvedValue();

await consumer.processSubmissionStatusesAndSendEmails();
await consumer.process();

expect(
mockNoticeOfIntentSubmissionStatusService.getSubmissionToSubmissionStatusForSendingEmails,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Process, Processor } from '@nestjs/bull';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import * as dayjs from 'dayjs';
import * as timezone from 'dayjs/plugin/timezone';
Expand All @@ -15,16 +15,17 @@ dayjs.extend(utc);
dayjs.extend(timezone);

@Processor(QUEUES.NOTICE_OF_INTENTS_STATUS_EMAILS)
export class NoticeOfIntentSubmissionStatusEmailConsumer {
export class NoticeOfIntentSubmissionStatusEmailConsumer extends WorkerHost {
private logger = new Logger(NoticeOfIntentSubmissionStatusEmailConsumer.name);

constructor(
private submissionStatusService: NoticeOfIntentSubmissionStatusService,
private statusEmailService: StatusEmailService,
) {}
) {
super();
}

@Process()
async processSubmissionStatusesAndSendEmails() {
async process() {
try {
const tomorrow = dayjs(new Date())
.tz('Canada/Pacific')
Expand Down Expand Up @@ -69,6 +70,13 @@ export class NoticeOfIntentSubmissionStatusEmailConsumer {
}
}

@OnWorkerEvent('completed')
onCompleted() {
this.logger.debug(
'Completed NoticeOfIntentSubmissionStatusEmailConsumer job.',
);
}

private async updateSubmissionStatus(
submissionStatus: NoticeOfIntentSubmissionToSubmissionStatus,
today: Date,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BullModule } from '@nestjs/bull';
import { BullModule } from '@nestjs/bullmq';
import { Module, OnApplicationBootstrap } from '@nestjs/common';
import { ApplicationSubmissionStatusModule } from '../../alcs/application/application-submission-status/application-submission-status.module';
import { ApplicationModule } from '../../alcs/application/application.module';
Expand Down
30 changes: 17 additions & 13 deletions services/apps/alcs/src/queues/scheduler/scheduler.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ConfigModule } from '@app/common/config/config.module';
import { BullModule, getQueueToken } from '@nestjs/bull';
import { BullModule, getQueueToken } from '@nestjs/bullmq';
import { Test, TestingModule } from '@nestjs/testing';
import { BullConfigService } from '../bullConfig.service';
import {
Expand All @@ -20,25 +20,25 @@ describe('SchedulerService', () => {
mockAppExpiryQueue = {
add: jest.fn(),
process: jest.fn(),
empty: jest.fn(),
drain: jest.fn(),
};

mockNotificationCleanUpQueue = {
add: jest.fn(),
process: jest.fn(),
empty: jest.fn(),
drain: jest.fn(),
};

mockApplicationStatusEmailsQueue = {
add: jest.fn(),
process: jest.fn(),
empty: jest.fn(),
drain: jest.fn(),
};

mockNoticeOfIntentStatusEmailsQueue = {
add: jest.fn(),
process: jest.fn(),
empty: jest.fn(),
drain: jest.fn(),
};

const module: TestingModule = await Test.createTestingModule({
Expand Down Expand Up @@ -80,41 +80,45 @@ describe('SchedulerService', () => {
//Job Disabled for now
// it('should call add for scheduleApplicationExpiry', async () => {
// await schedulerService.setup();
// expect(mockAppExpiryQueue.empty).toBeCalledTimes(1);
// expect(mockAppExpiryQueue.drain).toBeCalledTimes(1);
// expect(mockAppExpiryQueue.add).toBeCalledTimes(1);
// expect(mockAppExpiryQueue.add).toBeCalledWith(
// 'applicationExpiry'
// {},
// { repeat: { cron: MONDAY_TO_FRIDAY_AT_2AM } },
// { repeat: { pattern: MONDAY_TO_FRIDAY_AT_2AM } },
// );
// });

it('should call add for notification cleanup', async () => {
await schedulerService.setup();
expect(mockNotificationCleanUpQueue.empty).toBeCalledTimes(1);
expect(mockNotificationCleanUpQueue.drain).toBeCalledTimes(1);
expect(mockNotificationCleanUpQueue.add).toBeCalledTimes(1);
expect(mockNotificationCleanUpQueue.add).toBeCalledWith(
'cleanupNotifications',
{},
{ repeat: { cron: EVERYDAY_MIDNIGHT } },
{ repeat: { pattern: EVERYDAY_MIDNIGHT } },
);
});

it('should call add for application status email', async () => {
await schedulerService.setup();
expect(mockApplicationStatusEmailsQueue.empty).toBeCalledTimes(1);
expect(mockApplicationStatusEmailsQueue.drain).toBeCalledTimes(1);
expect(mockApplicationStatusEmailsQueue.add).toBeCalledTimes(1);
expect(mockApplicationStatusEmailsQueue.add).toBeCalledWith(
'applicationSubmissionStatusEmails',
{},
{ repeat: { cron: EVERY_15_MINUTES_STARTING_FROM_8AM } },
{ repeat: { pattern: EVERY_15_MINUTES_STARTING_FROM_8AM } },
);
});

it('should call add for notice of intent status email', async () => {
await schedulerService.setup();
expect(mockNoticeOfIntentStatusEmailsQueue.empty).toBeCalledTimes(1);
expect(mockNoticeOfIntentStatusEmailsQueue.drain).toBeCalledTimes(1);
expect(mockNoticeOfIntentStatusEmailsQueue.add).toBeCalledTimes(1);
expect(mockNoticeOfIntentStatusEmailsQueue.add).toBeCalledWith(
'noticeOfIntentSubmissionStatusEmails',
{},
{ repeat: { cron: EVERY_15_MINUTES_STARTING_FROM_8AM } },
{ repeat: { pattern: EVERY_15_MINUTES_STARTING_FROM_8AM } },
);
});
});
Loading

0 comments on commit e8a8f80

Please sign in to comment.