Skip to content

Commit

Permalink
handle guaranteedBroadcastPushDispatchProcessing in bullMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
f-w committed Nov 25, 2024
1 parent 3de89c1 commit 41cb4f1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
44 changes: 41 additions & 3 deletions src/api/notifications/notifications.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ import {
import { FlowJob, FlowProducer, QueueEvents } from 'bullmq';
import { Request } from 'express';
import jmespath from 'jmespath';
import { pick } from 'lodash';
import { pick, pullAll } from 'lodash';
import { AnyObject, FilterQuery } from 'mongoose';
import { Role } from 'src/auth/constants';
import { UserProfile } from 'src/auth/dto/user-profile.dto';
import { Roles } from 'src/auth/roles.decorator';
import { CommonService } from 'src/common/common.service';
import {
CommonService,
NotificationDispatchStatusField,
} from 'src/common/common.service';
import { AppConfigService } from 'src/config/app-config.service';
import { CountDto } from '../common/dto/count.dto';
import { FilterDto } from '../common/dto/filter.dto';
Expand Down Expand Up @@ -318,14 +321,48 @@ export class NotificationsController {
queueEvents.close();
});

// todo: add failed listener
// failed listener marks all candidates in the chunk failed if
// guaranteedBroadcastPushDispatchProcessing is false
queueEvents.on('failed', async ({ jobId }) => {
const failedJob = j.children.find((e) => e.job.id === jobId);
if (this.guaranteedBroadcastPushDispatchProcessing || !failedJob) {
return;
}
const notification = await this.notificationsService.findOne(
{
where: { id: failedJob.job.data.id },
},
this.req,
);
const startIdx = failedJob.job.data.s;
const subChunk = (notification.dispatch.candidates as string[]).slice(
startIdx,
startIdx + this.broadcastSubscriberChunkSize,
);
pullAll(
subChunk,
(notification.dispatch?.failed ?? []).map(
(e: any) => e.subscriptionId,
),
);
await this.commonService.updateBroadcastPushNotificationStatus(
notification,
NotificationDispatchStatusField.failed,
subChunk.map((e) => ({
subscriptionId: e,
})),
this.req,
);
});

await queueEvents.waitUntilReady();

const j = await this.flowProducer.add(flowJob, {
queuesOptions: {
[flowJob.queueName]: {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
},
},
Expand Down Expand Up @@ -481,6 +518,7 @@ export class NotificationsController {
id: data.id,
s: i++ * this.broadcastSubscriberChunkSize,
},
opts: { ignoreDependencyOnFailure: true },
});
}
await this.waitForFlowJobCompletion({
Expand Down
16 changes: 14 additions & 2 deletions src/queue-consumers/notification-queue-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { BeforeApplicationShutdown, Injectable, Logger } from '@nestjs/common';
import {
BeforeApplicationShutdown,
Injectable,
Logger,
OnApplicationBootstrap,
} from '@nestjs/common';
import { Job } from 'bullmq';
import jmespath from 'jmespath';
import { pullAll } from 'lodash';
Expand All @@ -17,7 +22,7 @@ import { AppConfigService } from 'src/config/app-config.service';
@Processor('n')
export class NotificationQueueConsumer
extends WorkerHost
implements BeforeApplicationShutdown
implements OnApplicationBootstrap, BeforeApplicationShutdown
{
readonly logger = new Logger(NotificationQueueConsumer.name);
private readonly appConfig;
Expand Down Expand Up @@ -316,4 +321,11 @@ export class NotificationQueueConsumer
async beforeApplicationShutdown() {
await this.worker.close();
}

onApplicationBootstrap() {
this.worker.opts.maxStalledCount = this
.guaranteedBroadcastPushDispatchProcessing
? 2
: 0;
}
}
11 changes: 8 additions & 3 deletions test/notification.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { NotificationsService } from 'src/api/notifications/notifications.servic
import { SubscriptionsService } from 'src/api/subscriptions/subscriptions.service';
import { CommonService } from 'src/common/common.service';
import { AppConfigService } from 'src/config/app-config.service';
import { NotificationQueueConsumer } from 'src/queue-consumers/notification-queue-consumer';
import supertest from 'supertest';
import { getAppAndClient, runAsSuperAdmin, wait } from './test-helper';

Expand Down Expand Up @@ -1348,8 +1349,7 @@ describe('POST /notifications', () => {
});
});

// skip b/c bullMQ
it.skip(
it(
'should handle batch broadcast request error ' +
'when guaranteedBroadcastPushDispatchProcessing is false',
async () => {
Expand All @@ -1362,7 +1362,12 @@ describe('POST /notifications', () => {
guaranteedBroadcastPushDispatchProcessing: false,
});
appConfig.notification = newNotificationConfig;
jest.spyOn(axios, 'get').mockRejectedValue({});
jest
.spyOn(
NotificationQueueConsumer.prototype,
'broadcastToSubscriberChunk',
)
.mockRejectedValue({});
const res = await client
.post('/api/notifications')
.send({
Expand Down

0 comments on commit 41cb4f1

Please sign in to comment.