Skip to content

Commit

Permalink
#4076 - Queue Monitoring - Schedulers Refactor - Warnings Metrics Cou…
Browse files Browse the repository at this point in the history
…nter (#4186)

- Collected a metric for jobs finalizing with some warnings to allow the
creation of a Sysdig alert based on the counter.
- Injected the service as a property to avoid passing the service to
every single class inheriting from the `BaseQueue`.

### New metric sample from metrics payload
```
queue_event_total_count {
  queueName="student-application-notifications",
  queueEvent="job-finalized-with-warnings",
  queueType="scheduler",
  app="queue-consumers"
} 2
```


![image](https://github.com/user-attachments/assets/2d7e7c13-1b40-47a9-b2a5-2f1cf7f349af)

_Note:_ this change was a quick way to resolve the concerned raised and
explained to the business in this
[comment](#4076 (comment)).
  • Loading branch information
andrewsignori-aot authored Dec 27, 2024
1 parent e8eece7 commit 275fa1c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,19 @@ import {
} from "../../utilities";
import { Job } from "bull";
import { Process } from "@nestjs/bull";
import { MetricsService, QueuesMetricsEvents } from "../../services";
import { Inject } from "@nestjs/common";

/**
* Provides basic functionality for queue processing.
*/
export abstract class BaseQueue<T> {
/**
* Metrics service to allow incrementing job events.
*/
@Inject(MetricsService)
private readonly metricsService: MetricsService;

/**
* Wrap the queue job execution in a try/catch for the global error handling.
* It provides also the default {@link ProcessSummary} for logging.
Expand All @@ -37,6 +45,13 @@ export abstract class BaseQueue<T> {
processSummary.info(endLogMessage);
this.logger.log(endLogMessage);
const logsSum = processSummary.getLogLevelSum();
if (logsSum.warn) {
this.metricsService.incrementJobEventCounter(
job,
QueuesMetricsEvents.JobFinalizedWithWarnings,
{ incrementValue: logsSum.warn },
);
}
if (logsSum.error) {
throw new CustomNamedError(
"One or more errors were reported during the process, please see logs for details.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from "./cas-supplier/cas-supplier.service";
export * from "./student-file/student-file.service";
export * from "./cas-supplier/cas-evaluation-result-processor";
export * from "./metrics/metrics.service";
export * from "./metrics/metrics.models";
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { QueueModel } from "@sims/services/queue";
import { Queue } from "bull";

/**
* Bull queues event names to have metrics associated.
* @see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events
* Bull queues event names to have metrics associated (@see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events)
* and others that are useful for monitoring.
*/
export enum QueuesMetricsEvents {
/**
Expand Down Expand Up @@ -62,6 +62,10 @@ export enum QueuesMetricsEvents {
* are not able to extend locks.
*/
LockExtensionFailed = "lock-extension-failed",
/**
* A job was finished with success but contains at least one warning.
*/
JobFinalizedWithWarnings = "job-finalized-with-warnings",
}

/**
Expand All @@ -76,3 +80,11 @@ export interface MonitoredQueue {
* Default label added to all the metrics.
*/
export const DEFAULT_METRICS_APP_LABEL = "queue-consumers";

/**
* Metrics queue types.
*/
export enum MetricsQueueTypes {
Scheduler = "scheduler",
Consumer = "consumer",
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import {
DEFAULT_METRICS_APP_LABEL,
QueuesMetricsEvents,
MonitoredQueue,
MetricsQueueTypes,
} from "./metrics.models";
import { Queue } from "bull";
import { Queue, Job } from "bull";
import { register, collectDefaultMetrics, Gauge, Counter } from "prom-client";

@Injectable()
Expand Down Expand Up @@ -52,6 +53,34 @@ export class MetricsService {
});
}

/**
* Increments the event counter for a specific job event within a queue.
* @param job the job whose event counter is to be incremented.
* @param queueEvent the type of event occurring on the job.
* @param options options for the increment operation.
* - `incrementValue` the value by which to increment
* the counter. Defaults to 1 if not provided.
*/
incrementJobEventCounter(
job: Job<unknown>,
queueEvent: QueuesMetricsEvents,
options?: {
incrementValue?: number;
},
): void {
const queueType = job.opts?.repeat
? MetricsQueueTypes.Scheduler
: MetricsQueueTypes.Consumer;
this.jobsEventsCounter.inc(
{
queueName: job.queue.name,
queueEvent,
queueType,
},
options?.incrementValue ?? 1,
);
}

/**
* Set global metrics configurations.
*/
Expand Down Expand Up @@ -81,7 +110,9 @@ export class MetricsService {
private async refreshJobCountsMetricsForQueue(
queue: MonitoredQueue,
): Promise<void> {
const queueType = queue.queueModel.isScheduler ? "scheduler" : "consumer";
const queueType = queue.queueModel.isScheduler
? MetricsQueueTypes.Scheduler
: MetricsQueueTypes.Consumer;
const queueJobCounts = await queue.provider.getJobCounts();
Object.keys(queueJobCounts).forEach((jobCountEvent: string) => {
this.jobCountsGauge.set(
Expand Down Expand Up @@ -111,7 +142,9 @@ export class MetricsService {
*/
private associateCountersToQueueEvents(queue: MonitoredQueue): void {
const queueName = queue.provider.name;
const queueType = queue.queueModel.isScheduler ? "scheduler" : "consumer";
const queueType = queue.queueModel.isScheduler
? MetricsQueueTypes.Scheduler
: MetricsQueueTypes.Consumer;
Object.values(QueuesMetricsEvents).forEach(
(queueEvent: QueuesMetricsEvents) => {
this.logger.log(
Expand Down

0 comments on commit 275fa1c

Please sign in to comment.