From c712af6c575907535940c77f6658840b8eec763d Mon Sep 17 00:00:00 2001 From: f-w Date: Mon, 25 Nov 2024 11:45:04 -0800 Subject: [PATCH] handle guaranteedBroadcastPushDispatchProcessing in bullMQ --- .../notifications/notifications.controller.ts | 43 +++++++++++++++++-- .../notification-queue-consumer.ts | 16 ++++++- test/notification.e2e-spec.ts | 11 +++-- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/src/api/notifications/notifications.controller.ts b/src/api/notifications/notifications.controller.ts index c6d562d5e..79d2b2a67 100644 --- a/src/api/notifications/notifications.controller.ts +++ b/src/api/notifications/notifications.controller.ts @@ -41,12 +41,15 @@ import { import { FlowJob, FlowProducer, QueueEvents } from 'bullmq'; import { Request } from 'express'; import jmespath from 'jmespath'; -import { pick } from 'lodash'; +import { pick, pullAll } from 'lodash'; import { AnyObject, FilterQuery } from 'mongoose'; import { Role } from 'src/auth/constants'; import { UserProfile } from 'src/auth/dto/user-profile.dto'; import { Roles } from 'src/auth/roles.decorator'; -import { CommonService } from 'src/common/common.service'; +import { + CommonService, + NotificationDispatchStatusField, +} from 'src/common/common.service'; import { AppConfigService } from 'src/config/app-config.service'; import { CountDto } from '../common/dto/count.dto'; import { FilterDto } from '../common/dto/filter.dto'; @@ -318,7 +321,40 @@ export class NotificationsController { queueEvents.close(); }); - // todo: add failed listener + // failed listener marks all candidates in the chunk failed if + // guaranteedBroadcastPushDispatchProcessing is false + queueEvents.on('failed', async ({ jobId }) => { + const failedJob = j.children.find((e) => e.job.id === jobId); + if (this.guaranteedBroadcastPushDispatchProcessing || !failedJob) { + return; + } + const notification = await this.notificationsService.findOne( + { + where: { id: failedJob.job.data.id }, + }, + this.req, + ); + const startIdx = failedJob.job.data.s; + const subChunk = (notification.dispatch.candidates as string[]).slice( + startIdx, + startIdx + this.broadcastSubscriberChunkSize, + ); + pullAll( + subChunk, + (notification.dispatch?.failed ?? []).map( + (e: any) => e.subscriptionId, + ), + ); + await this.commonService.updateBroadcastPushNotificationStatus( + notification, + NotificationDispatchStatusField.failed, + subChunk.map((e) => ({ + subscriptionId: e, + })), + this.req, + ); + }); + await queueEvents.waitUntilReady(); const j = await this.flowProducer.add(flowJob, { @@ -481,6 +517,7 @@ export class NotificationsController { id: data.id, s: i++ * this.broadcastSubscriberChunkSize, }, + opts: { ignoreDependencyOnFailure: true }, }); } await this.waitForFlowJobCompletion({ diff --git a/src/queue-consumers/notification-queue-consumer.ts b/src/queue-consumers/notification-queue-consumer.ts index f05960c5d..ad020f892 100644 --- a/src/queue-consumers/notification-queue-consumer.ts +++ b/src/queue-consumers/notification-queue-consumer.ts @@ -1,5 +1,10 @@ import { Processor, WorkerHost } from '@nestjs/bullmq'; -import { BeforeApplicationShutdown, Injectable, Logger } from '@nestjs/common'; +import { + BeforeApplicationShutdown, + Injectable, + Logger, + OnApplicationBootstrap, +} from '@nestjs/common'; import { Job } from 'bullmq'; import jmespath from 'jmespath'; import { pullAll } from 'lodash'; @@ -17,7 +22,7 @@ import { AppConfigService } from 'src/config/app-config.service'; @Processor('n') export class NotificationQueueConsumer extends WorkerHost - implements BeforeApplicationShutdown + implements OnApplicationBootstrap, BeforeApplicationShutdown { readonly logger = new Logger(NotificationQueueConsumer.name); private readonly appConfig; @@ -316,4 +321,11 @@ export class NotificationQueueConsumer async beforeApplicationShutdown() { await this.worker.close(); } + + onApplicationBootstrap() { + this.worker.opts.maxStalledCount = this + .guaranteedBroadcastPushDispatchProcessing + ? 2 + : 0; + } } diff --git a/test/notification.e2e-spec.ts b/test/notification.e2e-spec.ts index 0d342c4bb..62577b594 100644 --- a/test/notification.e2e-spec.ts +++ b/test/notification.e2e-spec.ts @@ -24,6 +24,7 @@ import { NotificationsService } from 'src/api/notifications/notifications.servic import { SubscriptionsService } from 'src/api/subscriptions/subscriptions.service'; import { CommonService } from 'src/common/common.service'; import { AppConfigService } from 'src/config/app-config.service'; +import { NotificationQueueConsumer } from 'src/queue-consumers/notification-queue-consumer'; import supertest from 'supertest'; import { getAppAndClient, runAsSuperAdmin, wait } from './test-helper'; @@ -1348,8 +1349,7 @@ describe('POST /notifications', () => { }); }); - // skip b/c bullMQ - it.skip( + it( 'should handle batch broadcast request error ' + 'when guaranteedBroadcastPushDispatchProcessing is false', async () => { @@ -1362,7 +1362,12 @@ describe('POST /notifications', () => { guaranteedBroadcastPushDispatchProcessing: false, }); appConfig.notification = newNotificationConfig; - jest.spyOn(axios, 'get').mockRejectedValue({}); + jest + .spyOn( + NotificationQueueConsumer.prototype, + 'broadcastToSubscriberChunk', + ) + .mockRejectedValue({}); const res = await client .post('/api/notifications') .send({