Skip to content

Commit

Permalink
handle failed child notification jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
f-w committed Dec 4, 2024
1 parent d78d671 commit 35c3744
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 42 deletions.
43 changes: 2 additions & 41 deletions src/api/notifications/notifications.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ import {
import { FlowJob, FlowProducer, QueueEvents } from 'bullmq';
import { Request } from 'express';
import jmespath from 'jmespath';
import { pick, pullAll } from 'lodash';
import { pick } from 'lodash';
import { AnyObject, FilterQuery } from 'mongoose';
import { Role } from 'src/auth/constants';
import { UserProfile } from 'src/auth/dto/user-profile.dto';
import { Roles } from 'src/auth/roles.decorator';
import {
CommonService,
NotificationDispatchStatusField,
} from 'src/common/common.service';
import { CommonService } from 'src/common/common.service';
import { AppConfigService } from 'src/config/app-config.service';
import { CountDto } from '../common/dto/count.dto';
import { FilterDto } from '../common/dto/filter.dto';
Expand Down Expand Up @@ -321,49 +318,13 @@ export class NotificationsController {
queueEvents.close();
});

// todo: moved failed listener to parent job processor
// failed listener marks all candidates in the chunk failed if
// guaranteedBroadcastPushDispatchProcessing is false
queueEvents.on('failed', async ({ jobId }) => {
const failedJob = j.children.find((e) => e.job.id === jobId);
if (this.guaranteedBroadcastPushDispatchProcessing || !failedJob) {
return;
}
const notification = await this.notificationsService.findOne(
{
where: { id: failedJob.job.data.id },
},
this.req,
);
const startIdx = failedJob.job.data.s;
const subChunk = (notification.dispatch.candidates as string[]).slice(
startIdx,
startIdx + this.broadcastSubscriberChunkSize,
);
pullAll(
subChunk,
(notification.dispatch?.failed ?? []).map(
(e: any) => e.subscriptionId,
),
);
await this.commonService.updateBroadcastPushNotificationStatus(
notification,
NotificationDispatchStatusField.failed,
subChunk.map((e) => ({
subscriptionId: e,
})),
this.req,
);
});

await queueEvents.waitUntilReady();

const j = await this.flowProducer.add(flowJob, {
queuesOptions: {
[flowJob.queueName]: {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion src/common/common.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ export class CommonService {
await this.notificationsService.updateById(
data.id,
{
$push: {
$addToSet: {
['dispatch.' + NotificationDispatchStatusField[field]]: val,
},
},
Expand Down
81 changes: 81 additions & 0 deletions src/queue/notification-events-listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import {
InjectQueue,
OnQueueEvent,
QueueEventsHost,
QueueEventsListener,
} from '@nestjs/bullmq';
import { BeforeApplicationShutdown, Injectable, Logger } from '@nestjs/common';
import { Job, Queue } from 'bullmq';
import { pullAll } from 'lodash';
import { NotificationsService } from 'src/api/notifications/notifications.service';
import {
CommonService,
NotificationDispatchStatusField,
} from 'src/common/common.service';
import { AppConfigService } from 'src/config/app-config.service';

@QueueEventsListener('n')
@Injectable()
export class NotificationEventsListener
extends QueueEventsHost
implements BeforeApplicationShutdown
{
readonly logger = new Logger(NotificationEventsListener.name);
private readonly appConfig;
private get guaranteedBroadcastPushDispatchProcessing() {
return this.appConfig.notification
?.guaranteedBroadcastPushDispatchProcessing;
}
private get broadcastSubscriberChunkSize() {
return this.appConfig.notification?.broadcastSubscriberChunkSize;
}

constructor(
appConfigService: AppConfigService,
private readonly notificationsService: NotificationsService,
private readonly commonService: CommonService,
@InjectQueue('n') private notificationQueue: Queue,
) {
super();
this.appConfig = appConfigService.get();
}

// failed listener marks all candidates in the chunk failed if
// guaranteedBroadcastPushDispatchProcessing is false
@OnQueueEvent('failed')
async onFailed({ jobId }) {
const job = await Job.fromId(this.notificationQueue, jobId);
if (this.guaranteedBroadcastPushDispatchProcessing || job?.name !== 'c') {
job.remove();
return;
}
const notification = await this.notificationsService.findOne(
{
where: { id: job.data.id },
},
null,
);
const startIdx = job.data.s;
const subChunk = (notification.dispatch.candidates as string[]).slice(
startIdx,
startIdx + this.broadcastSubscriberChunkSize,
);
pullAll(
subChunk,
(notification.dispatch?.failed ?? []).map((e: any) => e.subscriptionId),
);
await this.commonService.updateBroadcastPushNotificationStatus(
notification,
NotificationDispatchStatusField.failed,
subChunk.map((e) => ({
subscriptionId: e,
})),
null,
);
job.remove();
}

async beforeApplicationShutdown() {
await this.queueEvents.close();
}
}
8 changes: 8 additions & 0 deletions src/queue/queue-consumers.module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { BullModule } from '@nestjs/bullmq';
import { DynamicModule, Module } from '@nestjs/common';
import { BouncesModule } from 'src/api/bounces/bounces.module';
import { NotificationsModule } from 'src/api/notifications/notifications.module';
import { SubscriptionsModule } from 'src/api/subscriptions/subscriptions.module';
import { EmailQueueConsumer } from './email-queue-consumer';
import { NotificationEventsListener } from './notification-events-listener';
import { NotificationQueueConsumer } from './notification-queue-consumer';
import { SmsQueueConsumer } from './sms-queue-consumer';

Expand All @@ -15,9 +17,15 @@ export class QueueConsumersModule {
imports.push(NotificationsModule);
imports.push(SubscriptionsModule);
imports.push(BouncesModule);
imports.push(
BullModule.registerQueue({
name: 'n',
}),
);
providers.push(EmailQueueConsumer);
providers.push(SmsQueueConsumer);
providers.push(NotificationQueueConsumer);
providers.push(NotificationEventsListener);
}
return {
module: QueueConsumersModule,
Expand Down

0 comments on commit 35c3744

Please sign in to comment.