diff --git a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-queue.ts b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-queue.ts index e9aab55c0a..cd5d059a1a 100644 --- a/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-queue.ts +++ b/sources/packages/backend/apps/queue-consumers/src/processors/schedulers/base-queue.ts @@ -11,11 +11,19 @@ import { } from "../../utilities"; import { Job } from "bull"; import { Process } from "@nestjs/bull"; +import { MetricsService, QueuesMetricsEvents } from "../../services"; +import { Inject } from "@nestjs/common"; /** * Provides basic functionality for queue processing. */ export abstract class BaseQueue { + /** + * Metrics service to allow incrementing job events. + */ + @Inject(MetricsService) + private readonly metricsService: MetricsService; + /** * Wrap the queue job execution in a try/catch for the global error handling. * It provides also the default {@link ProcessSummary} for logging. @@ -37,6 +45,13 @@ export abstract class BaseQueue { processSummary.info(endLogMessage); this.logger.log(endLogMessage); const logsSum = processSummary.getLogLevelSum(); + if (logsSum.warn) { + this.metricsService.incrementJobEventCounter( + job, + QueuesMetricsEvents.JobFinalizedWithWarnings, + { incrementValue: logsSum.warn }, + ); + } if (logsSum.error) { throw new CustomNamedError( "One or more errors were reported during the process, please see logs for details.", diff --git a/sources/packages/backend/apps/queue-consumers/src/services/index.ts b/sources/packages/backend/apps/queue-consumers/src/services/index.ts index 8bc4fe5706..7e08b6ed21 100644 --- a/sources/packages/backend/apps/queue-consumers/src/services/index.ts +++ b/sources/packages/backend/apps/queue-consumers/src/services/index.ts @@ -5,3 +5,4 @@ export * from "./cas-supplier/cas-supplier.service"; export * from "./student-file/student-file.service"; export * from "./cas-supplier/cas-evaluation-result-processor"; export * from "./metrics/metrics.service"; +export * from "./metrics/metrics.models"; diff --git a/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.models.ts b/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.models.ts index 0ada79a5f7..87808c80f3 100644 --- a/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.models.ts +++ b/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.models.ts @@ -2,8 +2,8 @@ import { QueueModel } from "@sims/services/queue"; import { Queue } from "bull"; /** - * Bull queues event names to have metrics associated. - * @see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events + * Bull queues event names to have metrics associated (@see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events) + * and others that are useful for monitoring. */ export enum QueuesMetricsEvents { /** @@ -62,6 +62,10 @@ export enum QueuesMetricsEvents { * are not able to extend locks. */ LockExtensionFailed = "lock-extension-failed", + /** + * A job was finished with success but contains at least one warning. + */ + JobFinalizedWithWarnings = "job-finalized-with-warnings", } /** @@ -76,3 +80,11 @@ export interface MonitoredQueue { * Default label added to all the metrics. */ export const DEFAULT_METRICS_APP_LABEL = "queue-consumers"; + +/** + * Metrics queue types. + */ +export enum MetricsQueueTypes { + Scheduler = "scheduler", + Consumer = "consumer", +} diff --git a/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.service.ts b/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.service.ts index fcccd2a6f7..a2cc33b75e 100644 --- a/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.service.ts +++ b/sources/packages/backend/apps/queue-consumers/src/services/metrics/metrics.service.ts @@ -8,8 +8,9 @@ import { DEFAULT_METRICS_APP_LABEL, QueuesMetricsEvents, MonitoredQueue, + MetricsQueueTypes, } from "./metrics.models"; -import { Queue } from "bull"; +import { Queue, Job } from "bull"; import { register, collectDefaultMetrics, Gauge, Counter } from "prom-client"; @Injectable() @@ -52,6 +53,34 @@ export class MetricsService { }); } + /** + * Increments the event counter for a specific job event within a queue. + * @param job the job whose event counter is to be incremented. + * @param queueEvent the type of event occurring on the job. + * @param options options for the increment operation. + * - `incrementValue` the value by which to increment + * the counter. Defaults to 1 if not provided. + */ + incrementJobEventCounter( + job: Job, + queueEvent: QueuesMetricsEvents, + options?: { + incrementValue?: number; + }, + ): void { + const queueType = job.opts?.repeat + ? MetricsQueueTypes.Scheduler + : MetricsQueueTypes.Consumer; + this.jobsEventsCounter.inc( + { + queueName: job.queue.name, + queueEvent, + queueType, + }, + options?.incrementValue ?? 1, + ); + } + /** * Set global metrics configurations. */ @@ -81,7 +110,9 @@ export class MetricsService { private async refreshJobCountsMetricsForQueue( queue: MonitoredQueue, ): Promise { - const queueType = queue.queueModel.isScheduler ? "scheduler" : "consumer"; + const queueType = queue.queueModel.isScheduler + ? MetricsQueueTypes.Scheduler + : MetricsQueueTypes.Consumer; const queueJobCounts = await queue.provider.getJobCounts(); Object.keys(queueJobCounts).forEach((jobCountEvent: string) => { this.jobCountsGauge.set( @@ -111,7 +142,9 @@ export class MetricsService { */ private associateCountersToQueueEvents(queue: MonitoredQueue): void { const queueName = queue.provider.name; - const queueType = queue.queueModel.isScheduler ? "scheduler" : "consumer"; + const queueType = queue.queueModel.isScheduler + ? MetricsQueueTypes.Scheduler + : MetricsQueueTypes.Consumer; Object.values(QueuesMetricsEvents).forEach( (queueEvent: QueuesMetricsEvents) => { this.logger.log(