Skip to content

Commit

Permalink
[YUNIKORN-2876] Initialize queue metrics for app after queue is set (#…
Browse files Browse the repository at this point in the history
…968)

Closes: #968

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
zhuqi-lucas authored and craigcondit committed Sep 17, 2024
1 parent 227a240 commit 89348e6
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
10 changes: 3 additions & 7 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ var (
terminatedTimeout = 3 * 24 * time.Hour
defaultPlaceholderTimeout = 15 * time.Minute
)

var initAppLogOnce sync.Once
var rateLimitedAppLog *log.RateLimitedLogger

Expand Down Expand Up @@ -191,15 +190,9 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve
app.rmID = rmID
app.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
app.appEvents.SendNewApplicationEvent(app.ApplicationID)
app.setNewMetrics()
return app
}

func (sa *Application) setNewMetrics() {
metrics.GetSchedulerMetrics().IncTotalApplicationsNew()
metrics.GetQueueMetrics(sa.GetQueuePath()).IncQueueApplicationsNew()
}

func (sa *Application) String() string {
if sa == nil {
return "application is nil"
Expand Down Expand Up @@ -1641,6 +1634,9 @@ func (sa *Application) SetQueue(queue *Queue) {
defer sa.Unlock()
sa.queuePath = queue.QueuePath
sa.queue = queue
// here we can make sure the queue is not empty
metrics.GetQueueMetrics(queue.QueuePath).IncQueueApplicationsNew()
metrics.GetSchedulerMetrics().IncTotalApplicationsNew()
}

// remove the leaf queue the application runs in, used when completing the app
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/objects/application_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func TestAppStateTransitionEvents(t *testing.T) {
func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen
queue := createQueue(t, "metrics")
metrics.GetSchedulerMetrics().Reset()
metrics.GetQueueMetrics("root.metrics").Reset()
// app-00001: New -> Resuming -> Accepted --> Running -> Completing-> Completed
app := newApplication("app-00001", "default", "root.metrics")
app.SetQueue(queue)
Expand Down Expand Up @@ -475,6 +476,12 @@ func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen
assertQueueApplicationsRejectedMetrics(t, app, 1)
assertQueueApplicationsFailedMetrics(t, app, 2)
assertQueueApplicationsCompletedMetrics(t, app, 1)

// app-00005: the queuePath is empty, it will happen for dynamic queue when it before the queue is created
app = newApplication("app-00005", "default", "")
assertState(t, app, nil, New.String())
assertQueueApplicationsNewMetrics(t, app, 0)
assertTotalAppsNewMetrics(t, 4)
}

func assertState(t testing.TB, app *Application, err error, expected string) {
Expand Down
42 changes: 42 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
Expand Down Expand Up @@ -246,6 +247,9 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()

// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
Expand Down Expand Up @@ -293,6 +297,9 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()

// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
Expand Down Expand Up @@ -395,6 +402,9 @@ func TestPlaceholderDataWithPlaceholderPreemption(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()

// add a new app1
app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
err = partition.AddApplication(app1)
Expand Down Expand Up @@ -522,6 +532,9 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()

// add a new app1
app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
err = partition.AddApplication(app1)
Expand Down Expand Up @@ -605,6 +618,9 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()

// add a new app1
app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
err = partition.AddApplication(app1)
Expand Down Expand Up @@ -698,6 +714,8 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()
// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
Expand Down Expand Up @@ -770,6 +788,9 @@ func TestRemoveNodeWithReal(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()

// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
Expand Down Expand Up @@ -831,13 +852,22 @@ func TestRemoveNodeWithReal(t *testing.T) {
}

func TestAddApp(t *testing.T) {
defer metrics.GetSchedulerMetrics().Reset()
defer metrics.GetQueueMetrics(defQueue).Reset()
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")

// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
assert.NilError(t, err, "add application to partition should not have failed")
queueApplicationsNew, err := metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
assert.NilError(t, err, "get queue metrics failed")
assert.Equal(t, queueApplicationsNew, 1)
scheduleApplicationsNew, err := metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
assert.NilError(t, err, "get scheduler metrics failed")
assert.Equal(t, scheduleApplicationsNew, 1)

// add the same app
err = partition.AddApplication(app)
if err == nil {
Expand All @@ -853,6 +883,12 @@ func TestAddApp(t *testing.T) {
if err == nil || partition.getApplication(appID2) != nil {
t.Errorf("add application on stopped partition should have failed but did not")
}
queueApplicationsNew, err = metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
assert.NilError(t, err, "get queue metrics failed")
assert.Equal(t, queueApplicationsNew, 1)
scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
assert.NilError(t, err, "get scheduler metrics failed")
assert.Equal(t, scheduleApplicationsNew, 1)

// mark partition for deletion, no new application can be added
partition.stateMachine.SetState(objects.Active.String())
Expand All @@ -863,6 +899,12 @@ func TestAddApp(t *testing.T) {
if err == nil || partition.getApplication(appID3) != nil {
t.Errorf("add application on draining partition should have failed but did not")
}
queueApplicationsNew, err = metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
assert.NilError(t, err, "get queue metrics failed")
assert.Equal(t, queueApplicationsNew, 1)
scheduleApplicationsNew, err = metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
assert.NilError(t, err, "get scheduler metrics failed")
assert.Equal(t, scheduleApplicationsNew, 1)
}

func TestAddAppForced(t *testing.T) {
Expand Down

0 comments on commit 89348e6

Please sign in to comment.