From 1020ea5250adc6ead51e9526ded665f0888f6461 Mon Sep 17 00:00:00 2001 From: qzhu Date: Tue, 20 Aug 2024 15:04:59 +0800 Subject: [PATCH 1/8] [YUNIKORN-2818] State of appMetrics of Queue Metrics is incomplete and should be fixed --- pkg/metrics/queue.go | 112 +++++++++++++++++- pkg/metrics/scheduler.go | 75 +++++++++++- pkg/scheduler/objects/application.go | 6 + pkg/scheduler/objects/application_state.go | 59 +++++++-- .../objects/application_state_test.go | 76 ++++++++++-- 5 files changed, 296 insertions(+), 32 deletions(-) diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go index 596ca1425..544fdca0a 100644 --- a/pkg/metrics/queue.go +++ b/pkg/metrics/queue.go @@ -27,11 +27,16 @@ import ( ) const ( - AppAccepted = "accepted" - AppRunning = "running" - AppFailed = "failed" - AppRejected = "rejected" - AppCompleted = "completed" + AppNew = "new" + AppAccepted = "accepted" + AppRunning = "running" + AppFailing = "failing" + AppFailed = "failed" + AppRejected = "rejected" + AppResuming = "resuming" + AppCompleting = "completing" + AppCompleted = "completed" + AppExpired = "expired" ContainerReleased = "released" ContainerAllocated = "allocated" @@ -160,10 +165,31 @@ func (m *QueueMetrics) GetQueueApplicationsRunning() (int, error) { return -1, err } +func (m *QueueMetrics) IncQueueApplicationsNew() { + m.incQueueApplications(AppNew) +} + +func (m *QueueMetrics) DecQueueApplicationsNew() { + m.decQueueApplications(AppNew) +} + +func (m *QueueMetrics) GetQueueApplicationsNew() (int, error) { + metricDto := &dto.Metric{} + err := m.appMetricsLabel.WithLabelValues(AppNew).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + func (m *QueueMetrics) IncQueueApplicationsAccepted() { m.incQueueApplications(AppAccepted) } +func (m *QueueMetrics) DecQueueApplicationsAccepted() { + m.decQueueApplications(AppAccepted) +} + func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) { metricDto := &dto.Metric{} err := m.appMetricsLabel.WithLabelValues(AppAccepted).Write(metricDto) @@ -177,6 +203,10 @@ func (m *QueueMetrics) IncQueueApplicationsRejected() { m.incQueueApplications(AppRejected) } +func (m *QueueMetrics) DecQueueApplicationsRejected() { + m.decQueueApplications(AppRejected) +} + func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) { metricDto := &dto.Metric{} err := m.appMetricsLabel.WithLabelValues(AppRejected).Write(metricDto) @@ -186,10 +216,48 @@ func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) { return -1, err } +func (m *QueueMetrics) IncQueueApplicationsResuming() { + m.incQueueApplications(AppResuming) +} + +func (m *QueueMetrics) DecQueueApplicationsResuming() { + m.decQueueApplications(AppResuming) +} + +func (m *QueueMetrics) GetQueueApplicationsResuming() (int, error) { + metricDto := &dto.Metric{} + err := m.appMetricsLabel.WithLabelValues(AppResuming).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + +func (m *QueueMetrics) IncQueueApplicationsFailing() { + m.incQueueApplications(AppFailing) +} + +func (m *QueueMetrics) DecQueueApplicationsFailing() { + m.decQueueApplications(AppFailing) +} + +func (m *QueueMetrics) GetQueueApplicationsFailing() (int, error) { + metricDto := &dto.Metric{} + err := m.appMetricsLabel.WithLabelValues(AppFailing).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + func (m *QueueMetrics) IncQueueApplicationsFailed() { m.incQueueApplications(AppFailed) } +func (m *QueueMetrics) DecQueueApplicationsFailed() { + m.decQueueApplications(AppFailed) +} + func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) { metricDto := &dto.Metric{} err := m.appMetricsLabel.WithLabelValues(AppFailed).Write(metricDto) @@ -199,10 +267,31 @@ func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) { return -1, err } +func (m *QueueMetrics) IncQueueApplicationsCompleting() { + m.incQueueApplications(AppCompleting) +} + +func (m *QueueMetrics) DecQueueApplicationsCompleting() { + m.decQueueApplications(AppCompleting) +} + +func (m *QueueMetrics) GetQueueApplicationsCompleting() (int, error) { + metricDto := &dto.Metric{} + err := m.appMetricsLabel.WithLabelValues(AppCompleting).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + func (m *QueueMetrics) IncQueueApplicationsCompleted() { m.incQueueApplications(AppCompleted) } +func (m *QueueMetrics) DecQueueApplicationsCompleted() { + m.decQueueApplications(AppCompleted) +} + func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) { metricDto := &dto.Metric{} err := m.appMetricsLabel.WithLabelValues(AppCompleted).Write(metricDto) @@ -212,6 +301,19 @@ func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) { return -1, err } +func (m *QueueMetrics) IncQueueApplicationsExpired() { + m.incQueueApplications(AppExpired) +} + +func (m *QueueMetrics) GetQueueApplicationsExpired() (int, error) { + metricDto := &dto.Metric{} + err := m.appMetricsLabel.WithLabelValues(AppExpired).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + func (m *QueueMetrics) IncAllocatedContainer() { m.containerMetrics.WithLabelValues(ContainerAllocated).Inc() } diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go index 23fcc850b..2f69eb1f4 100644 --- a/pkg/metrics/scheduler.go +++ b/pkg/metrics/scheduler.go @@ -57,7 +57,7 @@ var resourceUsageRangeBuckets = []string{ // SchedulerMetrics to declare scheduler metrics type SchedulerMetrics struct { containerAllocation *prometheus.CounterVec - applicationSubmission *prometheus.CounterVec + applicationSubmission *prometheus.GaugeVec application *prometheus.GaugeVec node *prometheus.GaugeVec nodeResourceUsage map[string]*prometheus.GaugeVec @@ -84,8 +84,8 @@ func InitSchedulerMetrics() *SchedulerMetrics { Help: "Total number of attempts to allocate containers. State of the attempt includes `allocated`, `rejected`, `error`, `released`", }, []string{"state"}) - s.applicationSubmission = prometheus.NewCounterVec( - prometheus.CounterOpts{ + s.applicationSubmission = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: SchedulerSubsystem, Name: "application_submission_total", @@ -241,14 +241,44 @@ func (m *SchedulerMetrics) GetSchedulingErrors() (int, error) { return -1, err } +func (m *SchedulerMetrics) IncTotalApplicationsNew() { + m.applicationSubmission.WithLabelValues(AppNew).Inc() +} + +func (m *SchedulerMetrics) DecTotalApplicationsNew() { + m.applicationSubmission.WithLabelValues(AppNew).Dec() +} + +func (m *SchedulerMetrics) GetTotalApplicationsNew() (int, error) { + metricDto := &dto.Metric{} + err := m.applicationSubmission.WithLabelValues(AppNew).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + func (m *SchedulerMetrics) IncTotalApplicationsAccepted() { m.applicationSubmission.WithLabelValues(AppAccepted).Inc() } +func (m *SchedulerMetrics) DecTotalApplicationsAccepted() { + m.applicationSubmission.WithLabelValues(AppAccepted).Dec() +} + func (m *SchedulerMetrics) AddTotalApplicationsAccepted(value int) { m.applicationSubmission.WithLabelValues(AppAccepted).Add(float64(value)) } +func (m *SchedulerMetrics) GetTotalApplicationsAccepted() (int, error) { + metricDto := &dto.Metric{} + err := m.applicationSubmission.WithLabelValues(AppAccepted).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + func (m *SchedulerMetrics) IncTotalApplicationsRejected() { m.applicationSubmission.WithLabelValues(ContainerRejected).Inc() } @@ -261,7 +291,7 @@ func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) { metricDto := &dto.Metric{} err := m.applicationSubmission.WithLabelValues(ContainerRejected).Write(metricDto) if err == nil { - return int(*metricDto.Counter.Value), nil + return int(*metricDto.Gauge.Value), nil } return -1, err } @@ -287,10 +317,47 @@ func (m *SchedulerMetrics) GetTotalApplicationsRunning() (int, error) { return -1, err } +func (m *SchedulerMetrics) IncTotalApplicationsFailing() { + m.application.WithLabelValues(AppFailing).Inc() +} + +func (m *SchedulerMetrics) DecTotalApplicationsFailing() { + m.application.WithLabelValues(AppFailing).Dec() +} + func (m *SchedulerMetrics) IncTotalApplicationsFailed() { m.application.WithLabelValues(AppFailed).Inc() } +func (m *SchedulerMetrics) IncTotalApplicationsCompleting() { + m.application.WithLabelValues(AppCompleting).Inc() +} + +func (m *SchedulerMetrics) DecTotalApplicationsCompleting() { + m.application.WithLabelValues(AppCompleting).Dec() +} + +func (m *SchedulerMetrics) IncTotalApplicationsResuming() { + m.application.WithLabelValues(AppResuming).Inc() +} + +func (m *SchedulerMetrics) DecTotalApplicationsResuming() { + m.application.WithLabelValues(AppResuming).Dec() +} + +func (m *SchedulerMetrics) GetTotalApplicationsResuming() (int, error) { + metricDto := &dto.Metric{} + err := m.application.WithLabelValues(AppResuming).Write(metricDto) + if err == nil { + return int(*metricDto.Gauge.Value), nil + } + return -1, err +} + +func (m *SchedulerMetrics) DecTotalApplicationsFailed() { + m.application.WithLabelValues(AppFailed).Dec() +} + func (m *SchedulerMetrics) IncTotalApplicationsCompleted() { m.application.WithLabelValues(AppCompleted).Inc() } diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 1ccde74ec..42ccd6f83 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -191,9 +191,15 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve app.rmID = rmID app.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem()) app.appEvents.SendNewApplicationEvent(app.ApplicationID) + app.setNewMetrics() return app } +func (sa *Application) setNewMetrics() { + metrics.GetSchedulerMetrics().IncTotalApplicationsNew() + metrics.GetQueueMetrics(sa.GetQueuePath()).IncQueueApplicationsNew() +} + func (sa *Application) String() string { if sa == nil { return "application is nil" diff --git a/pkg/scheduler/objects/application_state.go b/pkg/scheduler/objects/application_state.go index dc600850a..c4bab95e8 100644 --- a/pkg/scheduler/objects/application_state.go +++ b/pkg/scheduler/objects/application_state.go @@ -75,6 +75,7 @@ const ( ) var stateEvents = map[string]si.EventRecord_ChangeDetail{ + New.String(): si.EventRecord_APP_NEW, Accepted.String(): si.EventRecord_APP_ACCEPTED, Running.String(): si.EventRecord_APP_RUNNING, Rejected.String(): si.EventRecord_APP_REJECT, @@ -167,16 +168,20 @@ func callbacks() fsm.Callbacks { "leave_state": func(_ context.Context, event *fsm.Event) { event.Args[0].(*Application).clearStateTimer() //nolint:errcheck }, - fmt.Sprintf("enter_%s", Completing.String()): func(_ context.Context, event *fsm.Event) { + fmt.Sprintf("leave_%s", New.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck - app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication) + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsNew() + metrics.GetSchedulerMetrics().DecTotalApplicationsNew() }, - fmt.Sprintf("leave_%s", New.String()): func(_ context.Context, event *fsm.Event) { - if event.Dst != Rejected.String() { - app := event.Args[0].(*Application) //nolint:errcheck - metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted() - metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted() - } + fmt.Sprintf("enter_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted() + metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted() + }, + fmt.Sprintf("leave_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsAccepted() + metrics.GetSchedulerMetrics().DecTotalApplicationsAccepted() }, fmt.Sprintf("enter_%s", Rejected.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck @@ -207,6 +212,37 @@ func callbacks() fsm.Callbacks { metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() } }, + fmt.Sprintf("enter_%s", Resuming.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsResuming() + metrics.GetSchedulerMetrics().IncTotalApplicationsResuming() + }, + fmt.Sprintf("leave_%s", Resuming.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsResuming() + metrics.GetSchedulerMetrics().DecTotalApplicationsResuming() + }, + fmt.Sprintf("enter_%s", Failing.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailing() + metrics.GetSchedulerMetrics().IncTotalApplicationsFailing() + }, + fmt.Sprintf("leave_%s", Failing.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsFailing() + metrics.GetSchedulerMetrics().DecTotalApplicationsFailing() + }, + fmt.Sprintf("enter_%s", Completing.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication) + metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleting() + metrics.GetSchedulerMetrics().IncTotalApplicationsCompleting() + }, + fmt.Sprintf("leave_%s", Completing.String()): func(_ context.Context, event *fsm.Event) { + app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsCompleting() + metrics.GetSchedulerMetrics().DecTotalApplicationsCompleting() + }, fmt.Sprintf("enter_%s", Completed.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted() @@ -216,13 +252,10 @@ func callbacks() fsm.Callbacks { app.clearPlaceholderTimer() app.cleanupAsks() }, - fmt.Sprintf("enter_%s", Failing.String()): func(_ context.Context, event *fsm.Event) { - app := event.Args[0].(*Application) //nolint:errcheck - metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed() - metrics.GetSchedulerMetrics().IncTotalApplicationsFailed() - }, fmt.Sprintf("enter_%s", Failed.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck + metrics.GetSchedulerMetrics().IncTotalApplicationsFailed() + metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed() app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication) app.executeTerminatedCallback() app.cleanupAsks() diff --git a/pkg/scheduler/objects/application_state_test.go b/pkg/scheduler/objects/application_state_test.go index ba54aa37e..4df11fd36 100644 --- a/pkg/scheduler/objects/application_state_test.go +++ b/pkg/scheduler/objects/application_state_test.go @@ -280,65 +280,84 @@ func TestAppStateTransitionEvents(t *testing.T) { // app-00002: New -> Accepted -> Running -> Completing -> Running -> Failing-> Failed // app-00003: New -> Accepted -> Running -> Failing -> Failed // app-00004: New -> Rejected -// Final metrics will be: 0 running, 3 accepted, 1 completed, 2 failed and 1 rejected applications +// Final metrics will be: 0 running, 0 accepted, 1 completed, 2 failed and 1 rejected applications func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen - queue := createQueue(t, "root.metrics") + queue := createQueue(t, "metrics") metrics.GetSchedulerMetrics().Reset() // app-00001: New -> Resuming -> Accepted --> Running -> Completing-> Completed app := newApplication("app-00001", "default", "root.metrics") app.SetQueue(queue) assertState(t, app, nil, New.String()) + assertTotalAppsNewMetrics(t, 1) assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) + assertQueueRunningApps(t, app, 0) + assertQueueApplicationsRunningMetrics(t, app, 0) + assertQueueApplicationsAcceptedMetrics(t, app, 0) + assertQueueApplicationsRejectedMetrics(t, app, 0) + assertQueueApplicationsFailedMetrics(t, app, 0) + assertQueueApplicationsCompletedMetrics(t, app, 0) + assertQueueApplicationsNewMetrics(t, app, 1) + // New -> Resuming err := app.HandleApplicationEvent(ResumeApplication) assertState(t, app, err, Resuming.String()) + assertTotalAppsNewMetrics(t, 0) assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) - assertQueueApplicationsAcceptedMetrics(t, app, 1) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 0) assertQueueApplicationsCompletedMetrics(t, app, 0) + assertQueueApplicationsResumingMetrics(t, app, 1) + assertQueueApplicationsNewMetrics(t, app, 0) // Resuming -> Accepted err = app.HandleApplicationEvent(RunApplication) assertState(t, app, err, Accepted.String()) assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) + assertTotalAppsNewMetrics(t, 0) + assertTotalAppsAcceptedMetrics(t, 1) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) assertQueueApplicationsAcceptedMetrics(t, app, 1) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 0) assertQueueApplicationsCompletedMetrics(t, app, 0) + assertQueueApplicationsNewMetrics(t, app, 0) // Accepted -> Running err = app.HandleApplicationEvent(RunApplication) assertState(t, app, err, Running.String()) assertTotalAppsRunningMetrics(t, 1) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) + assertTotalAppsAcceptedMetrics(t, 0) assertQueueRunningApps(t, app, 1) assertQueueApplicationsRunningMetrics(t, app, 1) - assertQueueApplicationsAcceptedMetrics(t, app, 1) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 0) assertQueueApplicationsCompletedMetrics(t, app, 0) + assertQueueApplicationsNewMetrics(t, app, 0) // Running -> Running err = app.HandleApplicationEvent(RunApplication) assertState(t, app, err, Running.String()) assertTotalAppsRunningMetrics(t, 1) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) + assertTotalAppsAcceptedMetrics(t, 0) assertQueueRunningApps(t, app, 1) assertQueueApplicationsRunningMetrics(t, app, 1) - assertQueueApplicationsAcceptedMetrics(t, app, 1) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 0) assertQueueApplicationsCompletedMetrics(t, app, 0) + assertQueueApplicationsNewMetrics(t, app, 0) // Running -> Completing err = app.HandleApplicationEvent(CompleteApplication) assertState(t, app, err, Completing.String()) @@ -347,10 +366,11 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRejectedMetrics(t, 0) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) - assertQueueApplicationsAcceptedMetrics(t, app, 1) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 0) assertQueueApplicationsCompletedMetrics(t, app, 0) + assertQueueApplicationsCompletingMetrics(t, app, 1) // Completing -> Completed err = app.HandleApplicationEvent(CompleteApplication) assertState(t, app, err, Completed.String()) @@ -359,10 +379,11 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRejectedMetrics(t, 0) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) - assertQueueApplicationsAcceptedMetrics(t, app, 1) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 0) assertQueueApplicationsCompletedMetrics(t, app, 1) + assertQueueApplicationsCompletingMetrics(t, app, 0) // app-00002: New -> Accepted -> Completing -> Running -> Failing-> Failed app = newApplication("app-00002", "default", "root.metrics") @@ -391,7 +412,7 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRejectedMetrics(t, 0) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) - assertQueueApplicationsAcceptedMetrics(t, app, 2) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 1) assertQueueApplicationsCompletedMetrics(t, app, 1) @@ -417,7 +438,7 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRejectedMetrics(t, 0) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) - assertQueueApplicationsAcceptedMetrics(t, app, 3) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 0) assertQueueApplicationsFailedMetrics(t, app, 2) assertQueueApplicationsCompletedMetrics(t, app, 1) @@ -434,7 +455,7 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRejectedMetrics(t, 1) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) - assertQueueApplicationsAcceptedMetrics(t, app, 3) + assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 1) assertQueueApplicationsFailedMetrics(t, app, 2) assertQueueApplicationsCompletedMetrics(t, app, 1) @@ -446,6 +467,13 @@ func assertState(t testing.TB, app *Application, err error, expected string) { assert.Equal(t, app.CurrentState(), expected, "application not in expected state.") } +func assertTotalAppsNewMetrics(t testing.TB, expected int) { + t.Helper() + totalAppsNew, err := metrics.GetSchedulerMetrics().GetTotalApplicationsNew() + assert.NilError(t, err, "no error expected when getting total new application count.") + assert.Equal(t, totalAppsNew, expected, "total new application metrics is not as expected.") +} + func assertTotalAppsRunningMetrics(t testing.TB, expected int) { t.Helper() totalAppsRunning, err := metrics.GetSchedulerMetrics().GetTotalApplicationsRunning() @@ -453,6 +481,13 @@ func assertTotalAppsRunningMetrics(t testing.TB, expected int) { assert.Equal(t, totalAppsRunning, expected, "total running application metrics is not as expected.") } +func assertTotalAppsAcceptedMetrics(t testing.TB, expected int) { + t.Helper() + totalAppsAccepted, err := metrics.GetSchedulerMetrics().GetTotalApplicationsAccepted() + assert.NilError(t, err, "no error expected when getting total accepted application count.") + assert.Equal(t, totalAppsAccepted, expected, "total accepted application metrics is not as expected.") +} + func assertTotalAppsCompletedMetrics(t testing.TB, expected int) { t.Helper() totalAppsCompleted, err := metrics.GetSchedulerMetrics().GetTotalApplicationsCompleted() @@ -501,6 +536,13 @@ func assertQueueApplicationsFailedMetrics(t testing.TB, app *Application, expect assert.Equal(t, queueApplicationsFailed, expected, "total failed application metrics in queue is not as expected.") } +func assertQueueApplicationsResumingMetrics(t testing.TB, app *Application, expected int) { + t.Helper() + queueApplicationsResuming, err := metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsResuming() + assert.NilError(t, err, "no error expected when getting total resuming application count in queue.") + assert.Equal(t, queueApplicationsResuming, expected, "total resuming application metrics in queue is not as expected.") +} + func assertQueueApplicationsCompletedMetrics(t testing.TB, app *Application, expected int) { t.Helper() queueApplicationsCompleted, err := metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsCompleted() @@ -508,6 +550,20 @@ func assertQueueApplicationsCompletedMetrics(t testing.TB, app *Application, exp assert.Equal(t, queueApplicationsCompleted, expected, "total completed application metrics in queue is not as expected.") } +func assertQueueApplicationsCompletingMetrics(t testing.TB, app *Application, expected int) { + t.Helper() + queueApplicationsCompleting, err := metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsCompleting() + assert.NilError(t, err, "no error expected when getting total completing application count in queue.") + assert.Equal(t, queueApplicationsCompleting, expected, "total completing application metrics in queue is not as expected.") +} + +func assertQueueApplicationsNewMetrics(t testing.TB, app *Application, expected int) { + t.Helper() + queueApplicationsNew, err := metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsNew() + assert.NilError(t, err, "no error expected when getting total new applications in queue.") + assert.Equal(t, queueApplicationsNew, expected, "total new applications in queue is not as expected.") +} + func createQueue(t *testing.T, queueName string) *Queue { root, err := createRootQueue(nil) assert.NilError(t, err, "failed to create queue: %v", err) From 22ed44c68afe3e80eb50c077f276cdb9dd2f7ca7 Mon Sep 17 00:00:00 2001 From: qzhu Date: Tue, 20 Aug 2024 15:16:06 +0800 Subject: [PATCH 2/8] Remove duplicated metrics --- pkg/metrics/scheduler.go | 20 ++------------------ pkg/rmproxy/rmproxy.go | 8 -------- pkg/scheduler/context.go | 2 -- 3 files changed, 2 insertions(+), 28 deletions(-) diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go index 2f69eb1f4..011c92cfa 100644 --- a/pkg/metrics/scheduler.go +++ b/pkg/metrics/scheduler.go @@ -266,10 +266,6 @@ func (m *SchedulerMetrics) DecTotalApplicationsAccepted() { m.applicationSubmission.WithLabelValues(AppAccepted).Dec() } -func (m *SchedulerMetrics) AddTotalApplicationsAccepted(value int) { - m.applicationSubmission.WithLabelValues(AppAccepted).Add(float64(value)) -} - func (m *SchedulerMetrics) GetTotalApplicationsAccepted() (int, error) { metricDto := &dto.Metric{} err := m.applicationSubmission.WithLabelValues(AppAccepted).Write(metricDto) @@ -280,16 +276,12 @@ func (m *SchedulerMetrics) GetTotalApplicationsAccepted() (int, error) { } func (m *SchedulerMetrics) IncTotalApplicationsRejected() { - m.applicationSubmission.WithLabelValues(ContainerRejected).Inc() -} - -func (m *SchedulerMetrics) AddTotalApplicationsRejected(value int) { - m.applicationSubmission.WithLabelValues(ContainerRejected).Add(float64(value)) + m.applicationSubmission.WithLabelValues(AppRejected).Inc() } func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) { metricDto := &dto.Metric{} - err := m.applicationSubmission.WithLabelValues(ContainerRejected).Write(metricDto) + err := m.applicationSubmission.WithLabelValues(AppRejected).Write(metricDto) if err == nil { return int(*metricDto.Gauge.Value), nil } @@ -304,10 +296,6 @@ func (m *SchedulerMetrics) DecTotalApplicationsRunning() { m.application.WithLabelValues(AppRunning).Dec() } -func (m *SchedulerMetrics) SubTotalApplicationsRunning(value int) { - m.application.WithLabelValues(AppRunning).Sub(float64(value)) -} - func (m *SchedulerMetrics) GetTotalApplicationsRunning() (int, error) { metricDto := &dto.Metric{} err := m.application.WithLabelValues(AppRunning).Write(metricDto) @@ -362,10 +350,6 @@ func (m *SchedulerMetrics) IncTotalApplicationsCompleted() { m.application.WithLabelValues(AppCompleted).Inc() } -func (m *SchedulerMetrics) AddTotalApplicationsCompleted(value int) { - m.application.WithLabelValues(AppCompleted).Add(float64(value)) -} - func (m *SchedulerMetrics) GetTotalApplicationsCompleted() (int, error) { metricDto := &dto.Metric{} err := m.application.WithLabelValues(AppCompleted).Write(metricDto) diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go index 6816c10c6..fa29a097e 100644 --- a/pkg/rmproxy/rmproxy.go +++ b/pkg/rmproxy/rmproxy.go @@ -124,14 +124,6 @@ func (rmp *RMProxy) processApplicationUpdateEvent(event *rmevent.RMApplicationUp log.Log(log.RMProxy).DPanic("RM is not registered", zap.String("rmID", event.RmID)) } - - // update app metrics - if len(event.RejectedApplications) > 0 { - metrics.GetSchedulerMetrics().AddTotalApplicationsRejected(len(event.RejectedApplications)) - } - if len(event.AcceptedApplications) > 0 { - metrics.GetSchedulerMetrics().AddTotalApplicationsAccepted(len(event.AcceptedApplications)) - } } func (rmp *RMProxy) processRMReleaseAllocationEvent(event *rmevent.RMReleaseAllocationEvent) { diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index ee7257b73..b0df469a8 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -555,8 +555,6 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate } // Update metrics with removed applications if len(request.Remove) > 0 { - metrics.GetSchedulerMetrics().SubTotalApplicationsRunning(len(request.Remove)) - metrics.GetSchedulerMetrics().AddTotalApplicationsCompleted(len(request.Remove)) for _, app := range request.Remove { partition := cc.GetPartition(app.PartitionName) if partition == nil { From e413ba6a7ecc1055cb69b77eeaf663075855218f Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 21 Aug 2024 10:42:10 +0800 Subject: [PATCH 3/8] Fix lint --- pkg/scheduler/objects/application_state.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/scheduler/objects/application_state.go b/pkg/scheduler/objects/application_state.go index c4bab95e8..1c5027c46 100644 --- a/pkg/scheduler/objects/application_state.go +++ b/pkg/scheduler/objects/application_state.go @@ -138,6 +138,8 @@ func eventDesc() fsm.Events { // The first argument must always be an Application and if there is a second, // that must be a string. If this precondition is not met, a runtime panic // will occur. +// +//nolint:funlen func callbacks() fsm.Callbacks { return fsm.Callbacks{ "enter_state": func(_ context.Context, event *fsm.Event) { From 0d21b97470d3ca748e54ad075231d85d038f5dcc Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 21 Aug 2024 11:37:41 +0800 Subject: [PATCH 4/8] Increase test coverage --- pkg/metrics/queue.go | 13 ---- pkg/metrics/queue_test.go | 113 +++++++++++++++++++++++++++++++++ pkg/metrics/scheduler_test.go | 115 ++++++++++++++++++++++++++++++++++ 3 files changed, 228 insertions(+), 13 deletions(-) diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go index 544fdca0a..146edd30c 100644 --- a/pkg/metrics/queue.go +++ b/pkg/metrics/queue.go @@ -301,19 +301,6 @@ func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) { return -1, err } -func (m *QueueMetrics) IncQueueApplicationsExpired() { - m.incQueueApplications(AppExpired) -} - -func (m *QueueMetrics) GetQueueApplicationsExpired() (int, error) { - metricDto := &dto.Metric{} - err := m.appMetricsLabel.WithLabelValues(AppExpired).Write(metricDto) - if err == nil { - return int(*metricDto.Gauge.Value), nil - } - return -1, err -} - func (m *QueueMetrics) IncAllocatedContainer() { m.containerMetrics.WithLabelValues(ContainerAllocated).Inc() } diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go index e0a0f94b7..bfd4edddb 100644 --- a/pkg/metrics/queue_test.go +++ b/pkg/metrics/queue_test.go @@ -30,12 +30,38 @@ import ( var qm *QueueMetrics +func TestApplicationsNew(t *testing.T) { + qm = getQueueMetrics() + defer unregisterQueueMetrics() + + qm.IncQueueApplicationsNew() + verifyAppMetrics(t, "new") + + curr, err := qm.GetQueueApplicationsNew() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsNew() + curr, err = qm.GetQueueApplicationsNew() + assert.NilError(t, err) + assert.Equal(t, 0, curr) +} + func TestApplicationsRunning(t *testing.T) { qm = getQueueMetrics() defer unregisterQueueMetrics() qm.IncQueueApplicationsRunning() verifyAppMetrics(t, "running") + + curr, err := qm.GetQueueApplicationsRunning() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsRunning() + curr, err = qm.GetQueueApplicationsRunning() + assert.NilError(t, err) + assert.Equal(t, 0, curr) } func TestApplicationsAccepted(t *testing.T) { @@ -44,6 +70,49 @@ func TestApplicationsAccepted(t *testing.T) { qm.IncQueueApplicationsAccepted() verifyAppMetrics(t, "accepted") + + curr, err := qm.GetQueueApplicationsAccepted() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsAccepted() + curr, err = qm.GetQueueApplicationsAccepted() + assert.NilError(t, err) + assert.Equal(t, 0, curr) +} + +func TestApplicationsResuming(t *testing.T) { + qm = getQueueMetrics() + defer unregisterQueueMetrics() + + qm.IncQueueApplicationsResuming() + verifyAppMetrics(t, "resuming") + + curr, err := qm.GetQueueApplicationsResuming() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsResuming() + curr, err = qm.GetQueueApplicationsResuming() + assert.NilError(t, err) + assert.Equal(t, 0, curr) +} + +func TestApplicationsFailing(t *testing.T) { + qm = getQueueMetrics() + defer unregisterQueueMetrics() + + qm.IncQueueApplicationsFailing() + verifyAppMetrics(t, "failing") + + curr, err := qm.GetQueueApplicationsFailing() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsFailing() + curr, err = qm.GetQueueApplicationsFailing() + assert.NilError(t, err) + assert.Equal(t, 0, curr) } func TestApplicationsRejected(t *testing.T) { @@ -52,6 +121,15 @@ func TestApplicationsRejected(t *testing.T) { qm.IncQueueApplicationsRejected() verifyAppMetrics(t, "rejected") + + curr, err := qm.GetQueueApplicationsRejected() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsRejected() + curr, err = qm.GetQueueApplicationsRejected() + assert.NilError(t, err) + assert.Equal(t, 0, curr) } func TestApplicationsFailed(t *testing.T) { @@ -60,6 +138,32 @@ func TestApplicationsFailed(t *testing.T) { qm.IncQueueApplicationsFailed() verifyAppMetrics(t, "failed") + + curr, err := qm.GetQueueApplicationsFailed() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsFailed() + curr, err = qm.GetQueueApplicationsFailed() + assert.NilError(t, err) + assert.Equal(t, 0, curr) +} + +func TestApplicationsCompleting(t *testing.T) { + qm = getQueueMetrics() + defer unregisterQueueMetrics() + + qm.IncQueueApplicationsCompleting() + verifyAppMetrics(t, "completing") + + curr, err := qm.GetQueueApplicationsCompleting() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsCompleting() + curr, err = qm.GetQueueApplicationsCompleting() + assert.NilError(t, err) + assert.Equal(t, 0, curr) } func TestApplicationsCompleted(t *testing.T) { @@ -68,6 +172,15 @@ func TestApplicationsCompleted(t *testing.T) { qm.IncQueueApplicationsCompleted() verifyAppMetrics(t, "completed") + + curr, err := qm.GetQueueApplicationsCompleted() + assert.NilError(t, err) + assert.Equal(t, 1, curr) + + qm.DecQueueApplicationsCompleted() + curr, err = qm.GetQueueApplicationsCompleted() + assert.NilError(t, err) + assert.Equal(t, 0, curr) } func TestAllocatedContainers(t *testing.T) { diff --git a/pkg/metrics/scheduler_test.go b/pkg/metrics/scheduler_test.go index b63e48313..db891c4a6 100644 --- a/pkg/metrics/scheduler_test.go +++ b/pkg/metrics/scheduler_test.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "gotest.tools/v3/assert" ) @@ -59,6 +60,120 @@ func TestTryPreemptionLatency(t *testing.T) { verifyHistogram(t, "trypreemption_latency_milliseconds", 60, 1) } +func TestSchedulerApplicationsNew(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsNew() + verifyMetric(t, 1, "new") + + curr, err := sm.GetTotalApplicationsNew() + assert.NilError(t, err) + assert.Equal(t, curr, 1) + + sm.DecTotalApplicationsNew() + verifyMetric(t, 0, "new") +} + +func TestSchedulerApplicationsAccepted(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsAccepted() + verifyMetric(t, 1, "accepted") + + curr, err := sm.GetTotalApplicationsAccepted() + assert.NilError(t, err) + assert.Equal(t, curr, 1) + + sm.DecTotalApplicationsAccepted() + verifyMetric(t, 0, "accepted") +} + +func TestSchedulerApplicationsRejected(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsRejected() + verifyMetric(t, 1, "rejected") + + curr, err := sm.GetTotalApplicationsRejected() + assert.NilError(t, err) + assert.Equal(t, curr, 1) +} + +func TestSchedulerApplicationsRunning(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsRunning() + verifyMetric(t, 1, "running") + + curr, err := sm.GetTotalApplicationsRunning() + assert.NilError(t, err) + assert.Equal(t, curr, 1) + + sm.DecTotalApplicationsRunning() + verifyMetric(t, 0, "running") +} + +func TestSchedulerApplicationsCompleting(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsCompleting() + verifyMetric(t, 1, "completing") + + sm.DecTotalApplicationsCompleting() + verifyMetric(t, 0, "completing") +} + +func TestSchedulerApplicationsResuming(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsResuming() + verifyMetric(t, 1, "resuming") + + curr, err := sm.GetTotalApplicationsResuming() + assert.NilError(t, err) + assert.Equal(t, curr, 1) + + sm.DecTotalApplicationsResuming() + verifyMetric(t, 0, "resuming") +} + +func TestSchedulerApplicationsFailing(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsFailing() + verifyMetric(t, 1, "failing") + + sm.DecTotalApplicationsFailing() + verifyMetric(t, 0, "failing") +} + +func TestSchedulerApplicationsCompleted(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsCompleted() + verifyMetric(t, 1, "completed") + + curr, err := sm.GetTotalApplicationsCompleted() + assert.NilError(t, err) + assert.Equal(t, curr, 1) +} + +func TestSchedulerApplicationsFailed(t *testing.T) { + sm = getSchedulerMetrics(t) + defer unregisterMetrics() + + sm.IncTotalApplicationsFailed() + verifyMetric(t, 1, "failed") +} + func getSchedulerMetrics(t *testing.T) *SchedulerMetrics { unregisterMetrics() return InitSchedulerMetrics() From bbc6136b8a7a7128742099db29c6f0d95c19d426 Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 21 Aug 2024 15:30:46 +0800 Subject: [PATCH 5/8] increase test coverage --- pkg/metrics/scheduler.go | 2 +- pkg/metrics/scheduler_test.go | 40 +++++++++++++++++------------------ 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go index 011c92cfa..5b9a71686 100644 --- a/pkg/metrics/scheduler.go +++ b/pkg/metrics/scheduler.go @@ -90,7 +90,7 @@ func InitSchedulerMetrics() *SchedulerMetrics { Subsystem: SchedulerSubsystem, Name: "application_submission_total", Help: "Total number of application submissions. State of the attempt includes `accepted` and `rejected`.", - }, []string{"result"}) + }, []string{"state"}) s.application = prometheus.NewGaugeVec( prometheus.GaugeOpts{ diff --git a/pkg/metrics/scheduler_test.go b/pkg/metrics/scheduler_test.go index db891c4a6..985848e77 100644 --- a/pkg/metrics/scheduler_test.go +++ b/pkg/metrics/scheduler_test.go @@ -38,10 +38,10 @@ func TestDrainingNodes(t *testing.T) { defer unregisterMetrics() sm.IncDrainingNodes() - verifyMetric(t, 1, "draining") + verifyMetric(t, 1, "draining", "yunikorn_scheduler_node") sm.DecDrainingNodes() - verifyMetric(t, 0, "draining") + verifyMetric(t, 0, "draining", "yunikorn_scheduler_node") } func TestTotalDecommissionedNodes(t *testing.T) { @@ -49,7 +49,7 @@ func TestTotalDecommissionedNodes(t *testing.T) { defer unregisterMetrics() sm.IncTotalDecommissionedNodes() - verifyMetric(t, 1, "decommissioned") + verifyMetric(t, 1, "decommissioned", "yunikorn_scheduler_node") } func TestTryPreemptionLatency(t *testing.T) { @@ -65,14 +65,14 @@ func TestSchedulerApplicationsNew(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsNew() - verifyMetric(t, 1, "new") + verifyMetric(t, 1, "new", "yunikorn_scheduler_application_submission_total") curr, err := sm.GetTotalApplicationsNew() assert.NilError(t, err) assert.Equal(t, curr, 1) sm.DecTotalApplicationsNew() - verifyMetric(t, 0, "new") + verifyMetric(t, 0, "new", "yunikorn_scheduler_application_submission_total") } func TestSchedulerApplicationsAccepted(t *testing.T) { @@ -80,14 +80,14 @@ func TestSchedulerApplicationsAccepted(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsAccepted() - verifyMetric(t, 1, "accepted") + verifyMetric(t, 1, "accepted", "yunikorn_scheduler_application_submission_total") curr, err := sm.GetTotalApplicationsAccepted() assert.NilError(t, err) assert.Equal(t, curr, 1) sm.DecTotalApplicationsAccepted() - verifyMetric(t, 0, "accepted") + verifyMetric(t, 0, "accepted", "yunikorn_scheduler_application_submission_total") } func TestSchedulerApplicationsRejected(t *testing.T) { @@ -95,7 +95,7 @@ func TestSchedulerApplicationsRejected(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsRejected() - verifyMetric(t, 1, "rejected") + verifyMetric(t, 1, "rejected", "yunikorn_scheduler_application_submission_total") curr, err := sm.GetTotalApplicationsRejected() assert.NilError(t, err) @@ -107,14 +107,14 @@ func TestSchedulerApplicationsRunning(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsRunning() - verifyMetric(t, 1, "running") + verifyMetric(t, 1, "running", "yunikorn_scheduler_application_total") curr, err := sm.GetTotalApplicationsRunning() assert.NilError(t, err) assert.Equal(t, curr, 1) sm.DecTotalApplicationsRunning() - verifyMetric(t, 0, "running") + verifyMetric(t, 0, "running", "yunikorn_scheduler_application_total") } func TestSchedulerApplicationsCompleting(t *testing.T) { @@ -122,10 +122,10 @@ func TestSchedulerApplicationsCompleting(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsCompleting() - verifyMetric(t, 1, "completing") + verifyMetric(t, 1, "completing", "yunikorn_scheduler_application_total") sm.DecTotalApplicationsCompleting() - verifyMetric(t, 0, "completing") + verifyMetric(t, 0, "completing", "yunikorn_scheduler_application_total") } func TestSchedulerApplicationsResuming(t *testing.T) { @@ -133,14 +133,14 @@ func TestSchedulerApplicationsResuming(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsResuming() - verifyMetric(t, 1, "resuming") + verifyMetric(t, 1, "resuming", "yunikorn_scheduler_application_total") curr, err := sm.GetTotalApplicationsResuming() assert.NilError(t, err) assert.Equal(t, curr, 1) sm.DecTotalApplicationsResuming() - verifyMetric(t, 0, "resuming") + verifyMetric(t, 0, "resuming", "yunikorn_scheduler_application_total") } func TestSchedulerApplicationsFailing(t *testing.T) { @@ -148,10 +148,10 @@ func TestSchedulerApplicationsFailing(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsFailing() - verifyMetric(t, 1, "failing") + verifyMetric(t, 1, "failing", "yunikorn_scheduler_application_total") sm.DecTotalApplicationsFailing() - verifyMetric(t, 0, "failing") + verifyMetric(t, 0, "failing", "yunikorn_scheduler_application_total") } func TestSchedulerApplicationsCompleted(t *testing.T) { @@ -159,7 +159,7 @@ func TestSchedulerApplicationsCompleted(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsCompleted() - verifyMetric(t, 1, "completed") + verifyMetric(t, 1, "completed", "yunikorn_scheduler_application_total") curr, err := sm.GetTotalApplicationsCompleted() assert.NilError(t, err) @@ -171,7 +171,7 @@ func TestSchedulerApplicationsFailed(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsFailed() - verifyMetric(t, 1, "failed") + verifyMetric(t, 1, "failed", "yunikorn_scheduler_application_total") } func getSchedulerMetrics(t *testing.T) *SchedulerMetrics { @@ -193,13 +193,13 @@ func verifyHistogram(t *testing.T, name string, value float64, delta float64) { } } -func verifyMetric(t *testing.T, expectedCounter float64, expectedState string) { +func verifyMetric(t *testing.T, expectedCounter float64, expectedState string, name string) { mfs, err := prometheus.DefaultGatherer.Gather() assert.NilError(t, err) var checked bool for _, metric := range mfs { - if strings.Contains(metric.GetName(), "yunikorn_scheduler_node") { + if strings.Contains(metric.GetName(), name) { assert.Equal(t, 1, len(metric.Metric)) assert.Equal(t, dto.MetricType_GAUGE, metric.GetType()) m := metric.Metric[0] From 2d55f778deb864a07133a00055f7bf4893e13fec Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 21 Aug 2024 16:16:43 +0800 Subject: [PATCH 6/8] Address new comments --- pkg/metrics/scheduler.go | 30 ++++------- pkg/metrics/scheduler_test.go | 54 +++++++++---------- pkg/scheduler/objects/application_state.go | 4 +- .../objects/application_state_test.go | 26 +++++++-- 4 files changed, 59 insertions(+), 55 deletions(-) diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go index 5b9a71686..a5c7c054b 100644 --- a/pkg/metrics/scheduler.go +++ b/pkg/metrics/scheduler.go @@ -57,7 +57,7 @@ var resourceUsageRangeBuckets = []string{ // SchedulerMetrics to declare scheduler metrics type SchedulerMetrics struct { containerAllocation *prometheus.CounterVec - applicationSubmission *prometheus.GaugeVec + applicationSubmission *prometheus.CounterVec application *prometheus.GaugeVec node *prometheus.GaugeVec nodeResourceUsage map[string]*prometheus.GaugeVec @@ -84,20 +84,20 @@ func InitSchedulerMetrics() *SchedulerMetrics { Help: "Total number of attempts to allocate containers. State of the attempt includes `allocated`, `rejected`, `error`, `released`", }, []string{"state"}) - s.applicationSubmission = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + s.applicationSubmission = prometheus.NewCounterVec( + prometheus.CounterOpts{ Namespace: Namespace, Subsystem: SchedulerSubsystem, Name: "application_submission_total", - Help: "Total number of application submissions. State of the attempt includes `accepted` and `rejected`.", - }, []string{"state"}) + Help: "Total number of application submissions. State of the attempt includes `new`, `accepted` and `rejected`.", + }, []string{"result"}) s.application = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: SchedulerSubsystem, Name: "application_total", - Help: "Total number of applications. State of the application includes `running`, `completed` and `failed`.", + Help: "Total number of applications. State of the application includes `running`, `resuming`, `failing`, `completing`, `completed` and `failed`.", }, []string{"state"}) s.node = prometheus.NewGaugeVec( @@ -245,15 +245,11 @@ func (m *SchedulerMetrics) IncTotalApplicationsNew() { m.applicationSubmission.WithLabelValues(AppNew).Inc() } -func (m *SchedulerMetrics) DecTotalApplicationsNew() { - m.applicationSubmission.WithLabelValues(AppNew).Dec() -} - func (m *SchedulerMetrics) GetTotalApplicationsNew() (int, error) { metricDto := &dto.Metric{} err := m.applicationSubmission.WithLabelValues(AppNew).Write(metricDto) if err == nil { - return int(*metricDto.Gauge.Value), nil + return int(*metricDto.Counter.Value), nil } return -1, err } @@ -262,15 +258,11 @@ func (m *SchedulerMetrics) IncTotalApplicationsAccepted() { m.applicationSubmission.WithLabelValues(AppAccepted).Inc() } -func (m *SchedulerMetrics) DecTotalApplicationsAccepted() { - m.applicationSubmission.WithLabelValues(AppAccepted).Dec() -} - func (m *SchedulerMetrics) GetTotalApplicationsAccepted() (int, error) { metricDto := &dto.Metric{} err := m.applicationSubmission.WithLabelValues(AppAccepted).Write(metricDto) if err == nil { - return int(*metricDto.Gauge.Value), nil + return int(*metricDto.Counter.Value), nil } return -1, err } @@ -283,7 +275,7 @@ func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) { metricDto := &dto.Metric{} err := m.applicationSubmission.WithLabelValues(AppRejected).Write(metricDto) if err == nil { - return int(*metricDto.Gauge.Value), nil + return int(*metricDto.Counter.Value), nil } return -1, err } @@ -342,10 +334,6 @@ func (m *SchedulerMetrics) GetTotalApplicationsResuming() (int, error) { return -1, err } -func (m *SchedulerMetrics) DecTotalApplicationsFailed() { - m.application.WithLabelValues(AppFailed).Dec() -} - func (m *SchedulerMetrics) IncTotalApplicationsCompleted() { m.application.WithLabelValues(AppCompleted).Inc() } diff --git a/pkg/metrics/scheduler_test.go b/pkg/metrics/scheduler_test.go index 985848e77..42879b67a 100644 --- a/pkg/metrics/scheduler_test.go +++ b/pkg/metrics/scheduler_test.go @@ -38,10 +38,10 @@ func TestDrainingNodes(t *testing.T) { defer unregisterMetrics() sm.IncDrainingNodes() - verifyMetric(t, 1, "draining", "yunikorn_scheduler_node") + verifyMetric(t, 1, "draining", "yunikorn_scheduler_node", dto.MetricType_GAUGE, "state") sm.DecDrainingNodes() - verifyMetric(t, 0, "draining", "yunikorn_scheduler_node") + verifyMetric(t, 0, "draining", "yunikorn_scheduler_node", dto.MetricType_GAUGE, "state") } func TestTotalDecommissionedNodes(t *testing.T) { @@ -49,7 +49,7 @@ func TestTotalDecommissionedNodes(t *testing.T) { defer unregisterMetrics() sm.IncTotalDecommissionedNodes() - verifyMetric(t, 1, "decommissioned", "yunikorn_scheduler_node") + verifyMetric(t, 1, "decommissioned", "yunikorn_scheduler_node", dto.MetricType_GAUGE, "state") } func TestTryPreemptionLatency(t *testing.T) { @@ -65,14 +65,11 @@ func TestSchedulerApplicationsNew(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsNew() - verifyMetric(t, 1, "new", "yunikorn_scheduler_application_submission_total") + verifyMetric(t, 1, "new", "yunikorn_scheduler_application_submission_total", dto.MetricType_COUNTER, "result") curr, err := sm.GetTotalApplicationsNew() assert.NilError(t, err) assert.Equal(t, curr, 1) - - sm.DecTotalApplicationsNew() - verifyMetric(t, 0, "new", "yunikorn_scheduler_application_submission_total") } func TestSchedulerApplicationsAccepted(t *testing.T) { @@ -80,14 +77,11 @@ func TestSchedulerApplicationsAccepted(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsAccepted() - verifyMetric(t, 1, "accepted", "yunikorn_scheduler_application_submission_total") + verifyMetric(t, 1, "accepted", "yunikorn_scheduler_application_submission_total", dto.MetricType_COUNTER, "result") curr, err := sm.GetTotalApplicationsAccepted() assert.NilError(t, err) assert.Equal(t, curr, 1) - - sm.DecTotalApplicationsAccepted() - verifyMetric(t, 0, "accepted", "yunikorn_scheduler_application_submission_total") } func TestSchedulerApplicationsRejected(t *testing.T) { @@ -95,7 +89,7 @@ func TestSchedulerApplicationsRejected(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsRejected() - verifyMetric(t, 1, "rejected", "yunikorn_scheduler_application_submission_total") + verifyMetric(t, 1, "rejected", "yunikorn_scheduler_application_submission_total", dto.MetricType_COUNTER, "result") curr, err := sm.GetTotalApplicationsRejected() assert.NilError(t, err) @@ -107,14 +101,14 @@ func TestSchedulerApplicationsRunning(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsRunning() - verifyMetric(t, 1, "running", "yunikorn_scheduler_application_total") + verifyMetric(t, 1, "running", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") curr, err := sm.GetTotalApplicationsRunning() assert.NilError(t, err) assert.Equal(t, curr, 1) sm.DecTotalApplicationsRunning() - verifyMetric(t, 0, "running", "yunikorn_scheduler_application_total") + verifyMetric(t, 0, "running", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") } func TestSchedulerApplicationsCompleting(t *testing.T) { @@ -122,10 +116,10 @@ func TestSchedulerApplicationsCompleting(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsCompleting() - verifyMetric(t, 1, "completing", "yunikorn_scheduler_application_total") + verifyMetric(t, 1, "completing", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") sm.DecTotalApplicationsCompleting() - verifyMetric(t, 0, "completing", "yunikorn_scheduler_application_total") + verifyMetric(t, 0, "completing", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") } func TestSchedulerApplicationsResuming(t *testing.T) { @@ -133,14 +127,14 @@ func TestSchedulerApplicationsResuming(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsResuming() - verifyMetric(t, 1, "resuming", "yunikorn_scheduler_application_total") + verifyMetric(t, 1, "resuming", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") curr, err := sm.GetTotalApplicationsResuming() assert.NilError(t, err) assert.Equal(t, curr, 1) sm.DecTotalApplicationsResuming() - verifyMetric(t, 0, "resuming", "yunikorn_scheduler_application_total") + verifyMetric(t, 0, "resuming", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") } func TestSchedulerApplicationsFailing(t *testing.T) { @@ -148,10 +142,10 @@ func TestSchedulerApplicationsFailing(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsFailing() - verifyMetric(t, 1, "failing", "yunikorn_scheduler_application_total") + verifyMetric(t, 1, "failing", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") sm.DecTotalApplicationsFailing() - verifyMetric(t, 0, "failing", "yunikorn_scheduler_application_total") + verifyMetric(t, 0, "failing", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") } func TestSchedulerApplicationsCompleted(t *testing.T) { @@ -159,7 +153,7 @@ func TestSchedulerApplicationsCompleted(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsCompleted() - verifyMetric(t, 1, "completed", "yunikorn_scheduler_application_total") + verifyMetric(t, 1, "completed", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") curr, err := sm.GetTotalApplicationsCompleted() assert.NilError(t, err) @@ -171,7 +165,7 @@ func TestSchedulerApplicationsFailed(t *testing.T) { defer unregisterMetrics() sm.IncTotalApplicationsFailed() - verifyMetric(t, 1, "failed", "yunikorn_scheduler_application_total") + verifyMetric(t, 1, "failed", "yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state") } func getSchedulerMetrics(t *testing.T) *SchedulerMetrics { @@ -193,7 +187,7 @@ func verifyHistogram(t *testing.T, name string, value float64, delta float64) { } } -func verifyMetric(t *testing.T, expectedCounter float64, expectedState string, name string) { +func verifyMetric(t *testing.T, expectedCounter float64, expectedState string, name string, metricType dto.MetricType, labelName string) { mfs, err := prometheus.DefaultGatherer.Gather() assert.NilError(t, err) @@ -201,13 +195,19 @@ func verifyMetric(t *testing.T, expectedCounter float64, expectedState string, n for _, metric := range mfs { if strings.Contains(metric.GetName(), name) { assert.Equal(t, 1, len(metric.Metric)) - assert.Equal(t, dto.MetricType_GAUGE, metric.GetType()) + assert.Equal(t, metricType, metric.GetType()) m := metric.Metric[0] assert.Equal(t, 1, len(m.Label)) - assert.Equal(t, "state", *m.Label[0].Name) + assert.Equal(t, labelName, *m.Label[0].Name) assert.Equal(t, expectedState, *m.Label[0].Value) - assert.Assert(t, m.Gauge != nil) - assert.Equal(t, expectedCounter, *m.Gauge.Value) + switch metricType { + case dto.MetricType_GAUGE: + assert.Equal(t, expectedCounter, *m.Gauge.Value) + case dto.MetricType_COUNTER: + assert.Equal(t, expectedCounter, *m.Counter.Value) + default: + assert.Assert(t, false, "unsupported") + } checked = true break } diff --git a/pkg/scheduler/objects/application_state.go b/pkg/scheduler/objects/application_state.go index 1c5027c46..3360f1f5a 100644 --- a/pkg/scheduler/objects/application_state.go +++ b/pkg/scheduler/objects/application_state.go @@ -172,8 +172,8 @@ func callbacks() fsm.Callbacks { }, fmt.Sprintf("leave_%s", New.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck + // only updated queue metrics because scheduler metrics are increased only for submission count metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsNew() - metrics.GetSchedulerMetrics().DecTotalApplicationsNew() }, fmt.Sprintf("enter_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck @@ -182,8 +182,8 @@ func callbacks() fsm.Callbacks { }, fmt.Sprintf("leave_%s", Accepted.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck + // only updated queue metrics because scheduler metrics are increased only for submission count metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsAccepted() - metrics.GetSchedulerMetrics().DecTotalApplicationsAccepted() }, fmt.Sprintf("enter_%s", Rejected.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck diff --git a/pkg/scheduler/objects/application_state_test.go b/pkg/scheduler/objects/application_state_test.go index 4df11fd36..33c218ffd 100644 --- a/pkg/scheduler/objects/application_state_test.go +++ b/pkg/scheduler/objects/application_state_test.go @@ -280,7 +280,9 @@ func TestAppStateTransitionEvents(t *testing.T) { // app-00002: New -> Accepted -> Running -> Completing -> Running -> Failing-> Failed // app-00003: New -> Accepted -> Running -> Failing -> Failed // app-00004: New -> Rejected -// Final metrics will be: 0 running, 0 accepted, 1 completed, 2 failed and 1 rejected applications +// Final queue metrics will be: 0 new, 0 running, 0 accepted, 1 completed, 2 failed and 1 rejected applications +// Final scheduler metrics will be: 4 new, 0 running, 3 accepted, 1 completed, 2 failed and 1 rejected applications +// Because the scheduler app submission state (new, accepted, rejected)is counter based, and it will not be decremented when the state changes func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen queue := createQueue(t, "metrics") metrics.GetSchedulerMetrics().Reset() @@ -303,7 +305,7 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen // New -> Resuming err := app.HandleApplicationEvent(ResumeApplication) assertState(t, app, err, Resuming.String()) - assertTotalAppsNewMetrics(t, 0) + assertTotalAppsNewMetrics(t, 1) assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) @@ -321,7 +323,7 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) - assertTotalAppsNewMetrics(t, 0) + assertTotalAppsNewMetrics(t, 1) assertTotalAppsAcceptedMetrics(t, 1) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) @@ -336,7 +338,8 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 1) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) - assertTotalAppsAcceptedMetrics(t, 0) + assertTotalAppsNewMetrics(t, 1) + assertTotalAppsAcceptedMetrics(t, 1) assertQueueRunningApps(t, app, 1) assertQueueApplicationsRunningMetrics(t, app, 1) assertQueueApplicationsAcceptedMetrics(t, app, 0) @@ -350,7 +353,9 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 1) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) - assertTotalAppsAcceptedMetrics(t, 0) + assertTotalAppsAcceptedMetrics(t, 1) + assertTotalAppsNewMetrics(t, 1) + assertTotalAppsAcceptedMetrics(t, 1) assertQueueRunningApps(t, app, 1) assertQueueApplicationsRunningMetrics(t, app, 1) assertQueueApplicationsAcceptedMetrics(t, app, 0) @@ -364,6 +369,8 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 0) assertTotalAppsRejectedMetrics(t, 0) + assertTotalAppsNewMetrics(t, 1) + assertTotalAppsAcceptedMetrics(t, 1) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) assertQueueApplicationsAcceptedMetrics(t, app, 0) @@ -377,6 +384,8 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 1) assertTotalAppsRejectedMetrics(t, 0) + assertTotalAppsNewMetrics(t, 1) + assertTotalAppsAcceptedMetrics(t, 1) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) assertQueueApplicationsAcceptedMetrics(t, app, 0) @@ -410,6 +419,8 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 1) assertTotalAppsRejectedMetrics(t, 0) + assertTotalAppsNewMetrics(t, 2) + assertTotalAppsAcceptedMetrics(t, 2) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) assertQueueApplicationsAcceptedMetrics(t, app, 0) @@ -436,6 +447,8 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 1) assertTotalAppsRejectedMetrics(t, 0) + assertTotalAppsNewMetrics(t, 3) + assertTotalAppsAcceptedMetrics(t, 3) assertQueueRunningApps(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) assertQueueApplicationsAcceptedMetrics(t, app, 0) @@ -453,7 +466,10 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen assertTotalAppsRunningMetrics(t, 0) assertTotalAppsCompletedMetrics(t, 1) assertTotalAppsRejectedMetrics(t, 1) + assertTotalAppsNewMetrics(t, 4) + assertTotalAppsAcceptedMetrics(t, 3) assertQueueRunningApps(t, app, 0) + assertQueueApplicationsNewMetrics(t, app, 0) assertQueueApplicationsRunningMetrics(t, app, 0) assertQueueApplicationsAcceptedMetrics(t, app, 0) assertQueueApplicationsRejectedMetrics(t, app, 1) From dc4c239e046618780c174cdaf98fb078e802a8db Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 21 Aug 2024 16:21:49 +0800 Subject: [PATCH 7/8] Address new comments --- pkg/metrics/queue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go index 146edd30c..ab40a8f1a 100644 --- a/pkg/metrics/queue.go +++ b/pkg/metrics/queue.go @@ -70,7 +70,7 @@ func InitQueueMetrics(name string) *QueueMetrics { Namespace: Namespace, Name: "queue_app", ConstLabels: prometheus.Labels{"queue": name}, - Help: "Queue application metrics. State of the application includes `accepted`, `rejected`, `running`, `failed`, `completed`.", + Help: "Queue application metrics. State of the application includes `new`, `accepted`, `rejected`, `running`, `failing`, `failed`, `resuming`, `completing`, `completed`.", }, []string{"state"}) q.appMetricsSubsystem = prometheus.NewGaugeVec( @@ -78,7 +78,7 @@ func InitQueueMetrics(name string) *QueueMetrics { Namespace: Namespace, Subsystem: replaceStr, Name: "queue_app", - Help: "Queue application metrics. State of the application includes `accepted`, `rejected`, `running`, `failed`, `completed`.", + Help: "Queue application metrics. State of the application includes `new`, `accepted`, `rejected`, `running`, `failing`, `failed`, `resuming`, `completing`, `completed`.", }, []string{"state"}) q.containerMetrics = prometheus.NewCounterVec( From a477027194da681bb608c0d61174f8a4f30ae242 Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 21 Aug 2024 17:53:49 +0800 Subject: [PATCH 8/8] Remove unused code --- pkg/metrics/queue.go | 12 ------------ pkg/metrics/queue_test.go | 15 --------------- 2 files changed, 27 deletions(-) diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go index ab40a8f1a..1a2c36706 100644 --- a/pkg/metrics/queue.go +++ b/pkg/metrics/queue.go @@ -203,10 +203,6 @@ func (m *QueueMetrics) IncQueueApplicationsRejected() { m.incQueueApplications(AppRejected) } -func (m *QueueMetrics) DecQueueApplicationsRejected() { - m.decQueueApplications(AppRejected) -} - func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) { metricDto := &dto.Metric{} err := m.appMetricsLabel.WithLabelValues(AppRejected).Write(metricDto) @@ -254,10 +250,6 @@ func (m *QueueMetrics) IncQueueApplicationsFailed() { m.incQueueApplications(AppFailed) } -func (m *QueueMetrics) DecQueueApplicationsFailed() { - m.decQueueApplications(AppFailed) -} - func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) { metricDto := &dto.Metric{} err := m.appMetricsLabel.WithLabelValues(AppFailed).Write(metricDto) @@ -288,10 +280,6 @@ func (m *QueueMetrics) IncQueueApplicationsCompleted() { m.incQueueApplications(AppCompleted) } -func (m *QueueMetrics) DecQueueApplicationsCompleted() { - m.decQueueApplications(AppCompleted) -} - func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) { metricDto := &dto.Metric{} err := m.appMetricsLabel.WithLabelValues(AppCompleted).Write(metricDto) diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go index bfd4edddb..ea2376bd8 100644 --- a/pkg/metrics/queue_test.go +++ b/pkg/metrics/queue_test.go @@ -125,11 +125,6 @@ func TestApplicationsRejected(t *testing.T) { curr, err := qm.GetQueueApplicationsRejected() assert.NilError(t, err) assert.Equal(t, 1, curr) - - qm.DecQueueApplicationsRejected() - curr, err = qm.GetQueueApplicationsRejected() - assert.NilError(t, err) - assert.Equal(t, 0, curr) } func TestApplicationsFailed(t *testing.T) { @@ -142,11 +137,6 @@ func TestApplicationsFailed(t *testing.T) { curr, err := qm.GetQueueApplicationsFailed() assert.NilError(t, err) assert.Equal(t, 1, curr) - - qm.DecQueueApplicationsFailed() - curr, err = qm.GetQueueApplicationsFailed() - assert.NilError(t, err) - assert.Equal(t, 0, curr) } func TestApplicationsCompleting(t *testing.T) { @@ -176,11 +166,6 @@ func TestApplicationsCompleted(t *testing.T) { curr, err := qm.GetQueueApplicationsCompleted() assert.NilError(t, err) assert.Equal(t, 1, curr) - - qm.DecQueueApplicationsCompleted() - curr, err = qm.GetQueueApplicationsCompleted() - assert.NilError(t, err) - assert.Equal(t, 0, curr) } func TestAllocatedContainers(t *testing.T) {