From 35c3744a53729a2659188ba1b0a849d2f877a1d3 Mon Sep 17 00:00:00 2001 From: f-w Date: Tue, 3 Dec 2024 11:18:24 -0800 Subject: [PATCH] handle failed child notification jobs --- .../notifications/notifications.controller.ts | 43 +--------- src/common/common.service.ts | 2 +- src/queue/notification-events-listener.ts | 81 +++++++++++++++++++ src/queue/queue-consumers.module.ts | 8 ++ 4 files changed, 92 insertions(+), 42 deletions(-) create mode 100644 src/queue/notification-events-listener.ts diff --git a/src/api/notifications/notifications.controller.ts b/src/api/notifications/notifications.controller.ts index b5efb5449..213ef1a7b 100644 --- a/src/api/notifications/notifications.controller.ts +++ b/src/api/notifications/notifications.controller.ts @@ -41,15 +41,12 @@ import { import { FlowJob, FlowProducer, QueueEvents } from 'bullmq'; import { Request } from 'express'; import jmespath from 'jmespath'; -import { pick, pullAll } from 'lodash'; +import { pick } from 'lodash'; import { AnyObject, FilterQuery } from 'mongoose'; import { Role } from 'src/auth/constants'; import { UserProfile } from 'src/auth/dto/user-profile.dto'; import { Roles } from 'src/auth/roles.decorator'; -import { - CommonService, - NotificationDispatchStatusField, -} from 'src/common/common.service'; +import { CommonService } from 'src/common/common.service'; import { AppConfigService } from 'src/config/app-config.service'; import { CountDto } from '../common/dto/count.dto'; import { FilterDto } from '../common/dto/filter.dto'; @@ -321,41 +318,6 @@ export class NotificationsController { queueEvents.close(); }); - // todo: moved failed listener to parent job processor - // failed listener marks all candidates in the chunk failed if - // guaranteedBroadcastPushDispatchProcessing is false - queueEvents.on('failed', async ({ jobId }) => { - const failedJob = j.children.find((e) => e.job.id === jobId); - if (this.guaranteedBroadcastPushDispatchProcessing || !failedJob) { - return; - } - const notification = await this.notificationsService.findOne( - { - where: { id: failedJob.job.data.id }, - }, - this.req, - ); - const startIdx = failedJob.job.data.s; - const subChunk = (notification.dispatch.candidates as string[]).slice( - startIdx, - startIdx + this.broadcastSubscriberChunkSize, - ); - pullAll( - subChunk, - (notification.dispatch?.failed ?? []).map( - (e: any) => e.subscriptionId, - ), - ); - await this.commonService.updateBroadcastPushNotificationStatus( - notification, - NotificationDispatchStatusField.failed, - subChunk.map((e) => ({ - subscriptionId: e, - })), - this.req, - ); - }); - await queueEvents.waitUntilReady(); const j = await this.flowProducer.add(flowJob, { @@ -363,7 +325,6 @@ export class NotificationsController { [flowJob.queueName]: { defaultJobOptions: { removeOnComplete: true, - removeOnFail: true, }, }, }, diff --git a/src/common/common.service.ts b/src/common/common.service.ts index 2dc2588ad..11033cc31 100644 --- a/src/common/common.service.ts +++ b/src/common/common.service.ts @@ -492,7 +492,7 @@ export class CommonService { await this.notificationsService.updateById( data.id, { - $push: { + $addToSet: { ['dispatch.' + NotificationDispatchStatusField[field]]: val, }, }, diff --git a/src/queue/notification-events-listener.ts b/src/queue/notification-events-listener.ts new file mode 100644 index 000000000..2314e9efd --- /dev/null +++ b/src/queue/notification-events-listener.ts @@ -0,0 +1,81 @@ +import { + InjectQueue, + OnQueueEvent, + QueueEventsHost, + QueueEventsListener, +} from '@nestjs/bullmq'; +import { BeforeApplicationShutdown, Injectable, Logger } from '@nestjs/common'; +import { Job, Queue } from 'bullmq'; +import { pullAll } from 'lodash'; +import { NotificationsService } from 'src/api/notifications/notifications.service'; +import { + CommonService, + NotificationDispatchStatusField, +} from 'src/common/common.service'; +import { AppConfigService } from 'src/config/app-config.service'; + +@QueueEventsListener('n') +@Injectable() +export class NotificationEventsListener + extends QueueEventsHost + implements BeforeApplicationShutdown +{ + readonly logger = new Logger(NotificationEventsListener.name); + private readonly appConfig; + private get guaranteedBroadcastPushDispatchProcessing() { + return this.appConfig.notification + ?.guaranteedBroadcastPushDispatchProcessing; + } + private get broadcastSubscriberChunkSize() { + return this.appConfig.notification?.broadcastSubscriberChunkSize; + } + + constructor( + appConfigService: AppConfigService, + private readonly notificationsService: NotificationsService, + private readonly commonService: CommonService, + @InjectQueue('n') private notificationQueue: Queue, + ) { + super(); + this.appConfig = appConfigService.get(); + } + + // failed listener marks all candidates in the chunk failed if + // guaranteedBroadcastPushDispatchProcessing is false + @OnQueueEvent('failed') + async onFailed({ jobId }) { + const job = await Job.fromId(this.notificationQueue, jobId); + if (this.guaranteedBroadcastPushDispatchProcessing || job?.name !== 'c') { + job.remove(); + return; + } + const notification = await this.notificationsService.findOne( + { + where: { id: job.data.id }, + }, + null, + ); + const startIdx = job.data.s; + const subChunk = (notification.dispatch.candidates as string[]).slice( + startIdx, + startIdx + this.broadcastSubscriberChunkSize, + ); + pullAll( + subChunk, + (notification.dispatch?.failed ?? []).map((e: any) => e.subscriptionId), + ); + await this.commonService.updateBroadcastPushNotificationStatus( + notification, + NotificationDispatchStatusField.failed, + subChunk.map((e) => ({ + subscriptionId: e, + })), + null, + ); + job.remove(); + } + + async beforeApplicationShutdown() { + await this.queueEvents.close(); + } +} diff --git a/src/queue/queue-consumers.module.ts b/src/queue/queue-consumers.module.ts index cb549598d..e14e77a44 100644 --- a/src/queue/queue-consumers.module.ts +++ b/src/queue/queue-consumers.module.ts @@ -1,8 +1,10 @@ +import { BullModule } from '@nestjs/bullmq'; import { DynamicModule, Module } from '@nestjs/common'; import { BouncesModule } from 'src/api/bounces/bounces.module'; import { NotificationsModule } from 'src/api/notifications/notifications.module'; import { SubscriptionsModule } from 'src/api/subscriptions/subscriptions.module'; import { EmailQueueConsumer } from './email-queue-consumer'; +import { NotificationEventsListener } from './notification-events-listener'; import { NotificationQueueConsumer } from './notification-queue-consumer'; import { SmsQueueConsumer } from './sms-queue-consumer'; @@ -15,9 +17,15 @@ export class QueueConsumersModule { imports.push(NotificationsModule); imports.push(SubscriptionsModule); imports.push(BouncesModule); + imports.push( + BullModule.registerQueue({ + name: 'n', + }), + ); providers.push(EmailQueueConsumer); providers.push(SmsQueueConsumer); providers.push(NotificationQueueConsumer); + providers.push(NotificationEventsListener); } return { module: QueueConsumersModule,