Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#4076 - Queue Monitoring - Schedulers Refactor - Warnings Metrics Counter #4186

Merged
merged 4 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,19 @@ import {
} from "../../utilities";
import { Job } from "bull";
import { Process } from "@nestjs/bull";
import { MetricsService, QueuesMetricsEvents } from "../../services";
import { Inject } from "@nestjs/common";

/**
* Provides basic functionality for queue processing.
*/
export abstract class BaseQueue<T> {
/**
* Metrics service to allow incrementing job events.
*/
@Inject(MetricsService)
private readonly metricsService: MetricsService;

/**
* Wrap the queue job execution in a try/catch for the global error handling.
* It provides also the default {@link ProcessSummary} for logging.
Expand All @@ -37,6 +45,13 @@ export abstract class BaseQueue<T> {
processSummary.info(endLogMessage);
this.logger.log(endLogMessage);
const logsSum = processSummary.getLogLevelSum();
if (logsSum.warn) {
this.metricsService.incrementJobEventCounter(
job,
QueuesMetricsEvents.JobFinalizedWithWarnings,
{ incrementValue: logsSum.warn },
);
}
if (logsSum.error) {
throw new CustomNamedError(
"One or more errors were reported during the process, please see logs for details.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from "./cas-supplier/cas-supplier.service";
export * from "./student-file/student-file.service";
export * from "./cas-supplier/cas-evaluation-result-processor";
export * from "./metrics/metrics.service";
export * from "./metrics/metrics.models";
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { QueueModel } from "@sims/services/queue";
import { Queue } from "bull";

/**
* Bull queues event names to have metrics associated.
* @see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events
* Bull queues event names to have metrics associated (@see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events)
* and others that are useful for monitoring.
*/
export enum QueuesMetricsEvents {
/**
Expand Down Expand Up @@ -62,6 +62,10 @@ export enum QueuesMetricsEvents {
* are not able to extend locks.
*/
LockExtensionFailed = "lock-extension-failed",
/**
* A job was finished with success but contains at least one warning.
*/
JobFinalizedWithWarnings = "job-finalized-with-warnings",
}

/**
Expand All @@ -76,3 +80,11 @@ export interface MonitoredQueue {
* Default label added to all the metrics.
*/
export const DEFAULT_METRICS_APP_LABEL = "queue-consumers";

/**
* Metrics queue types.
*/
export enum MetricsQueueTypes {
Scheduler = "scheduler",
Consumer = "consumer",
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import {
DEFAULT_METRICS_APP_LABEL,
QueuesMetricsEvents,
MonitoredQueue,
MetricsQueueTypes,
} from "./metrics.models";
import { Queue } from "bull";
import { Queue, Job } from "bull";
import { register, collectDefaultMetrics, Gauge, Counter } from "prom-client";

@Injectable()
Expand Down Expand Up @@ -52,6 +53,34 @@ export class MetricsService {
});
}

/**
* Increments the event counter for a specific job event within a queue.
* @param job the job whose event counter is to be incremented.
* @param queueEvent the type of event occurring on the job.
* @param options options for the increment operation.
* - `incrementValue` the value by which to increment
* the counter. Defaults to 1 if not provided.
*/
incrementJobEventCounter(
job: Job<unknown>,
queueEvent: QueuesMetricsEvents,
options?: {
incrementValue?: number;
},
): void {
const queueType = job.opts?.repeat
? MetricsQueueTypes.Scheduler
: MetricsQueueTypes.Consumer;
this.jobsEventsCounter.inc(
{
queueName: job.queue.name,
queueEvent,
queueType,
},
options?.incrementValue ?? 1,
);
}

/**
* Set global metrics configurations.
*/
Expand Down Expand Up @@ -81,7 +110,9 @@ export class MetricsService {
private async refreshJobCountsMetricsForQueue(
queue: MonitoredQueue,
): Promise<void> {
const queueType = queue.queueModel.isScheduler ? "scheduler" : "consumer";
const queueType = queue.queueModel.isScheduler
? MetricsQueueTypes.Scheduler
: MetricsQueueTypes.Consumer;
const queueJobCounts = await queue.provider.getJobCounts();
Object.keys(queueJobCounts).forEach((jobCountEvent: string) => {
this.jobCountsGauge.set(
Expand Down Expand Up @@ -111,7 +142,9 @@ export class MetricsService {
*/
private associateCountersToQueueEvents(queue: MonitoredQueue): void {
const queueName = queue.provider.name;
const queueType = queue.queueModel.isScheduler ? "scheduler" : "consumer";
const queueType = queue.queueModel.isScheduler
? MetricsQueueTypes.Scheduler
: MetricsQueueTypes.Consumer;
Object.values(QueuesMetricsEvents).forEach(
(queueEvent: QueuesMetricsEvents) => {
this.logger.log(
Expand Down
Loading