From 9869540b7a4c1c5c5e1f19c61c9a6fa3e4e6b249 Mon Sep 17 00:00:00 2001 From: Wilfred Spiegelenburg Date: Thu, 24 Oct 2024 11:54:05 -0500 Subject: [PATCH] [YUNIKORN-2926] Placeholder counters incorrect (#986) Placeholder tracking data is maintained inside the application for scheduling. If the placeholder is released we update the counters in the tracking data. We have cases in which we do not do that correctly: * placeholders are smaller than the real allocation * placeholder does not have an allocation to replace * all allocations are removed from an application Closes: #986 Signed-off-by: Craig Condit --- pkg/scheduler/objects/application.go | 58 +++--- pkg/scheduler/objects/application_test.go | 208 ++++++++++++---------- pkg/scheduler/partition_test.go | 72 +++++--- 3 files changed, 193 insertions(+), 145 deletions(-) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 7df5c1e4f..4de35b3e6 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -402,7 +402,7 @@ func (sa *Application) clearPlaceholderTimer() { } // timeoutPlaceholderProcessing cleans up all placeholder asks and allocations that are not used after the timeout. -// If the application has started processing, Starting state or further, the application keeps on processing without +// If the application has started processing, Running state or further, the application keeps on processing without // being able to use the placeholders. // If the application is in New or Accepted state we clean up and take followup action based on the gang scheduling // style. @@ -422,17 +422,14 @@ func (sa *Application) timeoutPlaceholderProcessing() { } alloc.SetReleased(true) toRelease = append(toRelease, alloc) - // mark as timeout out in the tracking data - if _, ok := sa.placeholderData[alloc.GetTaskGroup()]; ok { - sa.placeholderData[alloc.GetTaskGroup()].TimedOut++ - } } log.Log(log.SchedApplication).Info("Placeholder timeout, releasing placeholders", zap.String("AppID", sa.ApplicationID), zap.Int("placeholders being replaced", replacing), zap.Int("releasing placeholders", len(toRelease))) + // trigger the release of the placeholders: accounting updates when the release is done sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") - // Case 2: in every other case fail the application, and notify the context about the expired placeholder asks + // Case 2: in every other case progress the application, and notify the context about the expired placeholder asks default: log.Log(log.SchedApplication).Info("Placeholder timeout, releasing asks and placeholders", zap.String("AppID", sa.ApplicationID), @@ -1805,22 +1802,28 @@ func (sa *Application) updatePreemptedResource(info *Allocation) { info.GetAllocatedResource(), info.GetBindTime()) } +// ReplaceAllocation removes the placeholder from the allocation list and replaces it with the real allocation. +// If no replacing allocation is linked to the placeholder it will still be removed from the application. +// Queue and Node objects are updated by the caller. func (sa *Application) ReplaceAllocation(allocationKey string) *Allocation { sa.Lock() defer sa.Unlock() // remove the placeholder that was just confirmed by the shim ph := sa.removeAllocationInternal(allocationKey, si.TerminationType_PLACEHOLDER_REPLACED) - // this has already been replaced or it is a duplicate message from the shim - if ph == nil || !ph.HasRelease() { - log.Log(log.SchedApplication).Debug("Unexpected placeholder released", + // this has already been replaced, or it is a duplicate message from the shim just ignore + if ph == nil { + return nil + } + // ph is the placeholder, the releases entry points to the real allocation we need to swap in + alloc := ph.GetRelease() + if alloc == nil { + log.Log(log.SchedApplication).Warn("Placeholder replaced without replacement allocation", zap.String("applicationID", sa.ApplicationID), zap.Stringer("placeholder", ph)) - return nil + return ph } // update the replacing allocation // we double linked the real and placeholder allocation - // ph is the placeholder, the releases entry points to the real one - alloc := ph.GetRelease() alloc.SetPlaceholderUsed(true) alloc.SetPlaceholderCreateTime(ph.GetCreateTime()) alloc.SetBindTime(time.Now()) @@ -1828,13 +1831,10 @@ func (sa *Application) ReplaceAllocation(allocationKey string) *Allocation { // order is important: clean up the allocation after adding it to the app // we need the original Replaced allocation resultType. alloc.ClearRelease() - if sa.placeholderData != nil { - sa.placeholderData[ph.GetTaskGroup()].Replaced++ - } return ph } -// Remove the Allocation from the application. +// RemoveAllocation removes the Allocation from the application. // Return the allocation that was removed or nil if not found. func (sa *Application) RemoveAllocation(allocationKey string, releaseType si.TerminationType) *Allocation { sa.Lock() @@ -1842,8 +1842,9 @@ func (sa *Application) RemoveAllocation(allocationKey string, releaseType si.Ter return sa.removeAllocationInternal(allocationKey, releaseType) } -// Remove the Allocation from the application -// No locking must be called while holding the lock +// removeAllocationInternal removes the Allocation from the application. +// Returns the allocation that was removed or nil if not found. +// No locking must be called while holding the application lock. func (sa *Application) removeAllocationInternal(allocationKey string, releaseType si.TerminationType) *Allocation { alloc := sa.allocations[allocationKey] @@ -1858,9 +1859,14 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp // update correct allocation tracker if alloc.IsPlaceholder() { // make sure we account for the placeholders being removed in the tracking data - if releaseType == si.TerminationType_STOPPED_BY_RM || releaseType == si.TerminationType_PREEMPTED_BY_SCHEDULER || releaseType == si.TerminationType_UNKNOWN_TERMINATION_TYPE { - if _, ok := sa.placeholderData[alloc.taskGroupName]; ok { - sa.placeholderData[alloc.taskGroupName].TimedOut++ + // update based on termination type: everything is counted as a timeout except for a real replace + if sa.placeholderData != nil { + if phData, ok := sa.placeholderData[alloc.taskGroupName]; ok { + if releaseType == si.TerminationType_PLACEHOLDER_REPLACED { + phData.Replaced++ + } else { + phData.TimedOut++ + } } } // as and when every ph gets removed (for replacement), resource usage would be reduced. @@ -1933,7 +1939,7 @@ func (sa *Application) hasZeroAllocations() bool { return resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) } -// Remove all allocations from the application. +// RemoveAllAllocations removes all allocations from the application. // All allocations that have been removed are returned. func (sa *Application) RemoveAllAllocations() []*Allocation { sa.Lock() @@ -1941,6 +1947,12 @@ func (sa *Application) RemoveAllAllocations() []*Allocation { allocationsToRelease := make([]*Allocation, 0) for _, alloc := range sa.allocations { + // update placeholder tracking data + if alloc.IsPlaceholder() && sa.placeholderData != nil { + if phData, ok := sa.placeholderData[alloc.taskGroupName]; ok { + phData.TimedOut++ + } + } allocationsToRelease = append(allocationsToRelease, alloc) // Aggregate the resources used by this alloc to the application's user resource tracker sa.trackCompletedResource(alloc) @@ -1959,7 +1971,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation { // When the resource trackers are zero we should not expect anything to come in later. if resources.IsZero(sa.pending) { if err := sa.HandleApplicationEvent(CompleteApplication); err != nil { - log.Log(log.SchedApplication).Warn("Application state not changed to Waiting while removing all allocations", + log.Log(log.SchedApplication).Warn("Application state not changed to Completing while removing all allocations", zap.String("currentState", sa.CurrentState()), zap.Error(err)) } diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 097125852..91db298fe 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -525,42 +525,33 @@ func TestAddAllocAsk(t *testing.T) { assert.Assert(t, app.IsAccepted(), "Application should have stayed in accepted state") // test PlaceholderData - tg1 := "tg-1" + const ( + tg1 = "tg-1" + tg2 = "tg-2" + ) ask = newAllocationAskTG(aKey, appID1, tg1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") app.SetTimedOutPlaceholder(tg1, 1) - app.SetTimedOutPlaceholder("tg-2", 2) + app.SetTimedOutPlaceholder(tg2, 2) clonePlaceholderData := app.GetAllPlaceholderData() assert.Equal(t, len(clonePlaceholderData), 1) assert.Equal(t, len(app.placeholderData), 1) assert.Equal(t, clonePlaceholderData[0], app.placeholderData[tg1]) - assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1) - assert.Equal(t, app.placeholderData[tg1].Count, int64(1)) - assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) - assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1)) - assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) + assertPlaceholderData(t, app, tg1, 1, 1, 0, res) ask = newAllocationAskTG(aKey, appID1, tg1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") assert.Equal(t, len(app.placeholderData), 1) - assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1) - assert.Equal(t, app.placeholderData[tg1].Count, int64(2)) - assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) - assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1)) - assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) + assertPlaceholderData(t, app, tg1, 2, 1, 0, res) - tg2 := "tg-2" ask = newAllocationAskTG(aKey, appID1, tg2, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") + assert.Equal(t, len(app.placeholderData), 2) - assert.Equal(t, app.placeholderData[tg2].TaskGroupName, tg2) - assert.Equal(t, app.placeholderData[tg2].Count, int64(1)) - assert.Equal(t, app.placeholderData[tg2].Replaced, int64(0)) - assert.Equal(t, app.placeholderData[tg2].TimedOut, int64(0)) - assert.DeepEqual(t, app.placeholderData[tg2].MinResource, res) + assertPlaceholderData(t, app, tg2, 1, 0, 0, res) } // test state change on add and remove ask @@ -632,15 +623,7 @@ func TestRecoverAllocAsk(t *testing.T) { assert.Equal(t, 0, len(app.placeholderData)) ask = newAllocationAskTG("ask-3", appID1, "testGroup", res) app.RecoverAllocationAsk(ask) - phData := app.placeholderData - assert.Equal(t, 1, len(phData)) - taskGroupData := phData["testGroup"] - assert.Assert(t, taskGroupData != nil) - assert.Equal(t, "testGroup", taskGroupData.TaskGroupName) - assert.Equal(t, int64(1), taskGroupData.Count) - assert.Equal(t, int64(0), taskGroupData.Replaced) - assert.Equal(t, int64(0), taskGroupData.TimedOut) - assert.Assert(t, resources.Equals(taskGroupData.MinResource, res)) + assertPlaceholderData(t, app, "testGroup", 1, 0, 0, res) } // test reservations removal by allocation @@ -1232,13 +1215,24 @@ func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySec } } -func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, - vcoresSecconds int64) { +func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, vcoresSecconds int64) { detailedResource := appSummary.PlaceholderResource.TrackedResourceMap[instType1] assert.Equal(t, memorySeconds, int64(detailedResource.Resources["memory"])) assert.Equal(t, vcoresSecconds, int64(detailedResource.Resources["vcores"])) } +func assertPlaceholderData(t *testing.T, app *Application, taskGroup string, count, timedout, replaced int, res *resources.Resource) { + assert.Assert(t, len(app.placeholderData) >= 1, "expected placeholder data to be set") + phData, ok := app.placeholderData[taskGroup] + assert.Assert(t, ok, "placeholder data not for the taskgroup: %s", taskGroup) + assert.Equal(t, phData.Count, int64(count), "placeholder count does not match") + assert.Equal(t, phData.Replaced, int64(replaced), "replaced count does not match") + assert.Equal(t, phData.TimedOut, int64(timedout), "timedout count does not match") + if res != nil { + assert.Assert(t, resources.Equals(phData.MinResource, res), "resource for taskgroup is not correct") + } +} + func TestResourceUsageAggregation(t *testing.T) { setupUGM() @@ -1421,13 +1415,8 @@ func TestReplaceAllocation(t *testing.T) { // add the placeholder to the app app.AddAllocation(ph) // add PlaceholderData - app.addPlaceholderDataWithLocking(ph) - assert.Equal(t, len(app.placeholderData), 1) - assert.Equal(t, app.placeholderData["tg"].TaskGroupName, "tg") - assert.Equal(t, app.placeholderData["tg"].Count, int64(1)) - assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0)) - assert.Equal(t, app.placeholderData["tg"].TimedOut, int64(0)) - assert.DeepEqual(t, app.placeholderData["tg"].MinResource, res) + app.addPlaceholderData(ph) + assertPlaceholderData(t, app, "tg", 1, 0, 0, res) assert.Equal(t, len(app.allocations), 1, "allocation not added as expected") assert.Assert(t, resources.IsZero(app.allocatedResource), "placeholder counted as real allocation") @@ -1435,15 +1424,20 @@ func TestReplaceAllocation(t *testing.T) { t.Fatalf("placeholder allocation not updated as expected: got %s, expected %s", app.allocatedPlaceholder, res) } assertUserGroupResource(t, getTestUserGroup(), res) + // if the placeholder exists it should be removed even without real allocation linked alloc = app.ReplaceAllocation(ph.GetAllocationKey()) - assert.Equal(t, alloc, nilAlloc, "placeholder without releases expected nil to be returned got a real alloc: %s", alloc) - assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0)) + assert.Equal(t, alloc, ph, "returned allocation is not the placeholder") + // placeholder data must show replaced increase + assertPlaceholderData(t, app, "tg", 1, 0, 1, res) assertUserGroupResource(t, getTestUserGroup(), nil) + assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "placeholder should have been released") + assert.Assert(t, resources.IsZero(app.allocatedResource), "no replacement made allocated should be zero") + // add the placeholder back to the app, the failure test above changed state and removed the ph app.SetState(Running.String()) app.AddAllocation(ph) - app.addPlaceholderDataWithLocking(ph) - assert.Equal(t, app.placeholderData["tg"].Count, int64(2)) + app.addPlaceholderData(ph) + assertPlaceholderData(t, app, "tg", 2, 0, 1, res) assertUserGroupResource(t, getTestUserGroup(), res) // set the real one to replace the placeholder @@ -1452,10 +1446,8 @@ func TestReplaceAllocation(t *testing.T) { alloc = app.ReplaceAllocation(ph.GetAllocationKey()) assert.Equal(t, alloc, ph, "returned allocation is not the placeholder") assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "real allocation counted as placeholder") - if !resources.Equals(app.allocatedResource, res) { - t.Fatalf("real allocation not updated as expected: got %s, expected %s", app.allocatedResource, res) - } - assert.Equal(t, app.placeholderData["tg"].Replaced, int64(1)) + assert.Assert(t, resources.Equals(app.allocatedResource, res), "real allocation not updated as expected") + assertPlaceholderData(t, app, "tg", 2, 0, 2, res) assert.Equal(t, realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime(), "real allocation's placeholder create time not updated as expected: got %s, expected %s", realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime()) assertUserGroupResource(t, getTestUserGroup(), res) @@ -1463,8 +1455,8 @@ func TestReplaceAllocation(t *testing.T) { app.SetState(Running.String()) ph.ClearRelease() app.AddAllocation(ph) - app.addPlaceholderDataWithLocking(ph) - assert.Equal(t, app.placeholderData["tg"].Count, int64(3)) + app.addPlaceholderData(ph) + assertPlaceholderData(t, app, "tg", 3, 0, 2, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) } @@ -1485,14 +1477,14 @@ func TestReplaceAllocationTracking(t *testing.T) { ph3.SetInstanceType(instType1) app.AddAllocation(ph1) assert.NilError(t, err, "could not add ask") - app.addPlaceholderDataWithLocking(ph1) + app.addPlaceholderData(ph1) assert.Equal(t, true, app.HasPlaceholderAllocation()) app.AddAllocation(ph2) assert.NilError(t, err, "could not add ask") - app.addPlaceholderDataWithLocking(ph2) + app.addPlaceholderData(ph2) app.AddAllocation(ph3) assert.NilError(t, err, "could not add ask") - app.addPlaceholderDataWithLocking(ph3) + app.addPlaceholderData(ph3) ph1.SetBindTime(time.Now().Add(-10 * time.Second)) ph2.SetBindTime(time.Now().Add(-10 * time.Second)) @@ -1523,15 +1515,20 @@ func TestReplaceAllocationTracking(t *testing.T) { assertPlaceHolderResource(t, appSummary, 3000, 300) } -func TestTimeoutPlaceholderSoftStyle(t *testing.T) { +func TestTimeoutPlaceholderSoft(t *testing.T) { runTimeoutPlaceholderTest(t, Resuming.String(), Soft, []int{1, 2}) } -func TestTimeoutPlaceholderAllocAsk(t *testing.T) { +func TestTimeoutPlaceholderHard(t *testing.T) { runTimeoutPlaceholderTest(t, Failing.String(), Hard, []int{1, 2}) } func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulingStyle string, expectedReleases []int) { + const ( + tg1 = "tg-1" + tg2 = "tg-2" + ) + setupUGM() // create a fake queue queue, err := createRootQueue(nil) @@ -1551,36 +1548,35 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin res, err := resources.NewResourceFromConf(resMap) assert.NilError(t, err, "Unexpected error when creating resource from map") // add the placeholder ask to the app - tg1 := "tg-1" phAsk := newAllocationAskTG("ask-1", appID1, tg1, res) err = app.AddAllocationAsk(phAsk) assert.NilError(t, err, "Application ask should have been added") + assertPlaceholderData(t, app, tg1, 1, 0, 0, res) assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") - // check PlaceHolderData - assert.Equal(t, len(app.placeholderData), 1) - assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1) - assert.Equal(t, app.placeholderData[tg1].Count, int64(1)) - assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) - assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(0)) - assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) - // add the placeholder to the app - ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg") + ph := newPlaceholderAlloc(appID1, nodeID1, res, tg2) app.AddAllocation(ph) + app.addPlaceholderDataWithLocking(ph) + assertPlaceholderData(t, app, tg2, 1, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), res) assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") // add a second one to check the filter - ph = newPlaceholderAlloc(appID1, nodeID1, res, "tg") + ph = newPlaceholderAlloc(appID1, nodeID1, res, tg2) app.AddAllocation(ph) + app.addPlaceholderDataWithLocking(ph) + assertPlaceholderData(t, app, tg2, 2, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) + assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") err = common.WaitForCondition(10*time.Millisecond, 1*time.Second, func() bool { app.RLock() defer app.RUnlock() return app.placeholderTimer == nil }) - assert.Equal(t, app.placeholderData[tg1].TimedOut, app.placeholderData[tg1].Count, "When the app is in an accepted state, timeout should equal to count") assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly") + // When the app was in the accepted state, timeout should equal to count + assertPlaceholderData(t, app, tg1, 1, 1, 0, res) + assertPlaceholderData(t, app, tg2, 2, 2, 0, res) assert.Equal(t, app.stateMachine.Current(), expectedState, "Application did not progress into expected state") assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) events := testHandler.GetEvents() @@ -1593,12 +1589,8 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin idx++ } } - // check if the Replaced of PlaceHolderData is 0 - assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) - // Because the Count of PlaceHolderData is only added in AddAllocationAsk, so it is 1 - assert.Equal(t, app.placeholderData[tg1].Count, int64(1)) - assert.Equal(t, found, 2, "release allocation or ask event not found in list") + // asks are completely cleaned up assert.Assert(t, resources.IsZero(app.GetPendingResource()), "pending placeholder resources should be zero") // a released placeholder still holds the resource until release confirmed by the RM @@ -1612,6 +1604,8 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin } func TestTimeoutPlaceholderAllocReleased(t *testing.T) { + const tg1 = "tg-1" + setupUGM() originalPhTimeout := defaultPlaceholderTimeout @@ -1626,24 +1620,18 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) { res, err := resources.NewResourceFromConf(resMap) assert.NilError(t, err, "Unexpected error when creating resource from map") // add the placeholders to the app: one released, one still available. - ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg") - ph.SetReleased(true) - app.AddAllocation(ph) - // add PlaceholderData - app.addPlaceholderDataWithLocking(ph) - assert.Equal(t, len(app.placeholderData), 1) - assert.Equal(t, app.placeholderData["tg"].TaskGroupName, "tg") - assert.Equal(t, app.placeholderData["tg"].Count, int64(1)) - assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0)) - assert.Equal(t, app.placeholderData["tg"].TimedOut, int64(0)) - assert.DeepEqual(t, app.placeholderData["tg"].MinResource, res) + phReleased := newPlaceholderAlloc(appID1, nodeID1, res, tg1) + phReleased.SetReleased(true) + app.AddAllocation(phReleased) + app.addPlaceholderDataWithLocking(phReleased) + assertPlaceholderData(t, app, tg1, 1, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") - ph = newPlaceholderAlloc(appID1, nodeID1, res, "tg") + ph := newPlaceholderAlloc(appID1, nodeID1, res, tg1) app.AddAllocation(ph) app.addPlaceholderDataWithLocking(ph) - assert.Equal(t, app.placeholderData["tg"].Count, int64(2)) + assertPlaceholderData(t, app, tg1, 2, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) alloc := newAllocation(appID1, nodeID1, res) @@ -1669,9 +1657,24 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) { assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res), "Unexpected allocated resources for the app") // a released placeholder still holds the resource until release confirmed by the RM assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app") - assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0)) - assert.Equal(t, app.placeholderData["tg"].TimedOut, int64(1)) + // tracking data not updated until confirmed by the RM + assertPlaceholderData(t, app, tg1, 2, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 3)) + // do what the RM does and respond to the release + removed := app.RemoveAllocation(ph.allocationKey, si.TerminationType_TIMEOUT) + assert.Assert(t, removed != nil, "expected allocation got nil") + assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected placeholder to be returned") + assertPlaceholderData(t, app, tg1, 2, 1, 0, res) + assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), res), "placeholder resources still accounted for on the app") + assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) + + // process the replacement no real alloc linked account for that + removed = app.ReplaceAllocation(phReleased.allocationKey) + assert.Assert(t, removed != nil, "expected allocation got nil") + assert.Equal(t, phReleased.allocationKey, removed.allocationKey, "expected placeholder to be returned") + assertPlaceholderData(t, app, tg1, 2, 1, 1, res) + assert.Assert(t, resources.IsZero(app.GetPlaceholderResource()), "placeholder resources still accounted for on the app") + assertUserGroupResource(t, getTestUserGroup(), res) } func TestTimeoutPlaceholderCompleting(t *testing.T) { @@ -1686,9 +1689,12 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) { res, err := resources.NewResourceFromConf(resMap) assert.NilError(t, err, "Unexpected error when creating resource from map") // add the placeholder to the app - ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg") + tg := "tg-1" + ph := newPlaceholderAlloc(appID1, nodeID1, res, tg) app.AddAllocation(ph) assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") + app.addPlaceholderDataWithLocking(ph) + assertPlaceholderData(t, app, tg, 1, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) // add a real allocation as well alloc := newAllocation(appID1, nodeID1, res) @@ -1714,8 +1720,15 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) { } } assert.Assert(t, found, "release allocation event not found in list") - assert.Assert(t, app.IsCompleting(), "App should still be in completing state") + assert.Assert(t, app.IsCompleting(), "App should be in completing state") assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) + // tracking data not updated until confirmed by the RM + assertPlaceholderData(t, app, tg, 1, 0, 0, res) + // do what the RM does and respond to the release + removed := app.RemoveAllocation(ph.allocationKey, si.TerminationType_TIMEOUT) + assert.Assert(t, removed != nil, "expected allocation got nil") + assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected placeholder to be returned") + assertPlaceholderData(t, app, tg, 1, 1, 0, res) } func TestAppTimersAfterAppRemoval(t *testing.T) { @@ -1730,9 +1743,12 @@ func TestAppTimersAfterAppRemoval(t *testing.T) { res, err := resources.NewResourceFromConf(resMap) assert.NilError(t, err, "Unexpected error when creating resource from map") // add the placeholder to the app - ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg") + tg := "tg-1" + ph := newPlaceholderAlloc(appID1, nodeID1, res, tg) app.AddAllocation(ph) assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") + app.addPlaceholderDataWithLocking(ph) + assertPlaceholderData(t, app, tg, 1, 0, 0, res) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) // add a real allocation as well alloc := newAllocation(appID1, nodeID1, res) @@ -1749,6 +1765,7 @@ func TestAppTimersAfterAppRemoval(t *testing.T) { if app.stateTimer != nil { t.Fatalf("State timer has not be cleared after app removal as expected, %v", app.stateTimer) } + assertPlaceholderData(t, app, tg, 1, 1, 0, res) } func TestIncAndDecUserResourceUsage(t *testing.T) { @@ -1867,12 +1884,17 @@ func TestFinishedTime(t *testing.T) { } func TestCanReplace(t *testing.T) { + const ( + tg1 = "available" + tg2 = "unavailable" + tg3 = "timedout" + ) + app := newApplication(appID1, "default", "root.unknown") resMap := map[string]string{"memory": "100", "vcores": "10"} res, err := resources.NewResourceFromConf(resMap) assert.NilError(t, err, "Unexpected error when creating resource from map") - tg1 := "available" tests := []struct { name string ask *Allocation @@ -1890,14 +1912,12 @@ func TestCanReplace(t *testing.T) { } // add the placeholder data // available tg has one replacement open - app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg1, res)) + app.addPlaceholderData(newAllocationAskTG(aKey, appID1, tg1, res)) // unavailable tg has NO replacement open (replaced) - tg2 := "unavailable" - app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg2, res)) + app.addPlaceholderData(newAllocationAskTG(aKey, appID1, tg2, res)) app.placeholderData[tg2].Replaced++ // unavailable tg has NO replacement open (timedout) - tg3 := "timedout" - app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg3, res)) + app.addPlaceholderData(newAllocationAskTG(aKey, appID1, tg3, res)) app.placeholderData[tg3].TimedOut++ tests = []struct { name string @@ -2737,8 +2757,8 @@ func TestGetOutstandingRequests_AskReplaceable(t *testing.T) { sr.insert(allocationAsk2) sr.insert(allocationAsk3) app.sortedRequests = sr - app.addPlaceholderDataWithLocking(allocationAsk1) - app.addPlaceholderDataWithLocking(allocationAsk2) + app.addPlaceholderData(allocationAsk1) + app.addPlaceholderData(allocationAsk2) var total []*Allocation headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 8a16d34dc..e1c6f03ac 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -461,7 +461,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t *testing.T) { assert.Equal(t, 7, partition.GetTotalAllocationCount(), "placeholder allocation should be counted as normal allocations on the partition") assert.Equal(t, 6, partition.getPhAllocationCount(), "placeholder allocations should be counted as placeholders on the partition") - assertPlaceholderData(t, gangApp, 6, 0) + assertPlaceholderData(t, gangApp, 6, 0, 0) partition.removeApplication(appID1) assert.Equal(t, 6, partition.GetTotalAllocationCount(), "remove app did not remove allocation from count") assert.Equal(t, 6, partition.getPhAllocationCount(), "placeholder allocations changed unexpectedly") @@ -515,7 +515,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t *testing.T) { assert.Assert(t, confirmed == nil, "not expecting any confirmed allocations") assert.Equal(t, 5, partition.GetTotalAllocationCount(), "preempted placeholder should be removed from allocations") assert.Equal(t, 5, partition.getPhAllocationCount(), "preempted placeholder should be removed") - assertPlaceholderData(t, gangApp, 6, 1) + assertPlaceholderData(t, gangApp, 6, 1, 0) } // test node removal effect on placeholder data @@ -597,11 +597,11 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) { // try to allocate a last placeholder via normal allocate partition.tryAllocate() - assertPlaceholderData(t, gangApp, 7, 0) + assertPlaceholderData(t, gangApp, 7, 0, 0) // Remove node partition.removeNode(nodeID2) - assertPlaceholderData(t, gangApp, 7, 4) + assertPlaceholderData(t, gangApp, 7, 4, 0) } // Test removal of placeholder has been accounted as timed out in app placeholder data @@ -685,7 +685,7 @@ func TestPlaceholderDataWithRemoval(t *testing.T) { // try to allocate a last placeholder via normal allocate partition.tryAllocate() - assertPlaceholderData(t, gangApp, 7, 0) + assertPlaceholderData(t, gangApp, 7, 0, 0) // release allocation: do what the context would do after the shim processing release := &si.AllocationRelease{ @@ -696,16 +696,17 @@ func TestPlaceholderDataWithRemoval(t *testing.T) { } releases, _ := partition.removeAllocation(release) assert.Equal(t, 1, len(releases), "unexpected number of allocations released") - assertPlaceholderData(t, gangApp, 7, 1) + assertPlaceholderData(t, gangApp, 7, 1, 0) } // check PlaceHolderData -func assertPlaceholderData(t *testing.T, gangApp *objects.Application, count int64, timedout int64) { +func assertPlaceholderData(t *testing.T, gangApp *objects.Application, count, timedout, replaced int64) { assert.Equal(t, len(gangApp.GetAllPlaceholderData()), 1) - assert.Equal(t, gangApp.GetAllPlaceholderData()[0].TaskGroupName, taskGroup) - assert.Equal(t, gangApp.GetAllPlaceholderData()[0].Count, count) - assert.Equal(t, gangApp.GetAllPlaceholderData()[0].Replaced, int64(0)) - assert.Equal(t, gangApp.GetAllPlaceholderData()[0].TimedOut, timedout) + phData := gangApp.GetAllPlaceholderData()[0] + assert.Equal(t, phData.TaskGroupName, taskGroup) + assert.Equal(t, phData.Count, count, "placeholder count does not match") + assert.Equal(t, phData.Replaced, replaced, "replaced count does not match") + assert.Equal(t, phData.TimedOut, timedout, "timedout count does not match") } // test with a replacement of a placeholder: placeholder on the removed node, real on the 2nd node @@ -2983,6 +2984,8 @@ func TestPlaceholderSmallerThanReal(t *testing.T) { ask := newAllocationAskTG(phID, appID1, taskGroup, phRes, true) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") + assertPlaceholderData(t, app, 1, 0, 0) + // try to allocate a placeholder via normal allocate result := partition.tryAllocate() if result == nil || result.Request == nil { @@ -3016,6 +3019,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) { released, _ := partition.removeAllocation(release) assert.Equal(t, 0, partition.getPhAllocationCount(), "ph should not be registered") + assertPlaceholderData(t, app, 1, 1, 0) assert.Equal(t, 0, len(released), "expected no releases") assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), "nothing should be allocated on node") @@ -3042,7 +3046,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) { app := newApplicationTG(appID1, "default", "root.default", tgRes) err = partition.AddApplication(app) assert.NilError(t, err, "app-1 should have been added to the partition") - phs := make(map[string]*objects.Allocation, 5) + phs := make(map[string]*objects.Allocation, phCount) for i := 0; i < phCount; i++ { // add an ask for a placeholder and allocate id := "ph-" + strconv.Itoa(i) @@ -3057,6 +3061,8 @@ func TestPlaceholderSmallerMulti(t *testing.T) { assert.Equal(t, id, result.Request.GetAllocationKey(), "expected allocation of %s to be returned", id) phs[id] = result.Request } + assertPlaceholderData(t, app, int64(phCount), 0, 0) + assert.Assert(t, resources.Equals(tgRes, app.GetQueue().GetAllocatedResource()), "all placeholders should be allocated on queue") assert.Assert(t, resources.Equals(tgRes, node.GetAllocatedResource()), "all placeholders should be allocated on node") assert.Equal(t, phCount, partition.GetTotalAllocationCount(), "placeholder allocation should be counted as normal allocations on the partition") @@ -3088,6 +3094,9 @@ func TestPlaceholderSmallerMulti(t *testing.T) { released, _ := partition.removeAllocation(release) assert.Equal(t, 0, len(released), "expected no releases") } + // check the tracking details for the placeholders + assertPlaceholderData(t, app, int64(phCount), int64(phCount), 0) + assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), "nothing should be allocated on node") assert.Assert(t, resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be allocated on queue") assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation should be registered on the partition") @@ -3123,6 +3132,8 @@ func TestPlaceholderBiggerThanReal(t *testing.T) { if result == nil || result.Request == nil { t.Fatal("expected placeholder ph-1 to be allocated") } + assertPlaceholderData(t, app, 1, 0, 0) + assert.Equal(t, phID, result.Request.GetAllocationKey(), "expected allocation of ph-1 to be returned") assert.Assert(t, resources.Equals(phRes, app.GetQueue().GetAllocatedResource()), "placeholder size should be allocated on queue") assert.Assert(t, resources.Equals(phRes, node.GetAllocatedResource()), "placeholder size should be allocated on node") @@ -3158,6 +3169,8 @@ func TestPlaceholderBiggerThanReal(t *testing.T) { if confirmed == nil { t.Fatal("one allocation should be confirmed") } + assertPlaceholderData(t, app, 1, 0, 1) + assert.Equal(t, 1, partition.GetTotalAllocationCount(), "real allocation should be registered on the partition") assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") assert.Assert(t, resources.Equals(smallRes, app.GetQueue().GetAllocatedResource()), "real size should be allocated on queue") @@ -3184,6 +3197,7 @@ func TestPlaceholderMatch(t *testing.T) { ask := newAllocationAskTG(phID, appID1, taskGroup, phRes, true) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") + assertPlaceholderData(t, app, 1, 0, 0) // try to allocate a placeholder via normal allocate result := partition.tryAllocate() if result == nil || result.Request == nil { @@ -3191,7 +3205,6 @@ func TestPlaceholderMatch(t *testing.T) { } phAllocationKey := result.Request.GetAllocationKey() assert.Equal(t, phID, phAllocationKey, "expected allocation of ph-1 to be returned") - assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should be created on allocate") assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder allocation should be registered as allocation") assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") assertLimits(t, getTestUserGroup(), phRes) @@ -3207,9 +3220,7 @@ func TestPlaceholderMatch(t *testing.T) { assert.Equal(t, 2, partition.GetTotalAllocationCount(), "allocations should be registered: ph + normal") assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") assert.Equal(t, allocKey, result.Request.GetAllocationKey(), "expected allocation of alloc-1 to be returned") - assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should not be updated") - assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") - assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements") + assertPlaceholderData(t, app, 1, 0, 0) assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2)) // add a new ask the same task group as the placeholder @@ -3220,9 +3231,7 @@ func TestPlaceholderMatch(t *testing.T) { if result != nil { t.Fatal("expected ask not to be allocated (matched task group)") } - assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should not be updated") - assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") - assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements") + assertPlaceholderData(t, app, 1, 0, 0) assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2)) // replace the placeholder should work @@ -3233,8 +3242,7 @@ func TestPlaceholderMatch(t *testing.T) { assert.Equal(t, 2, partition.GetTotalAllocationCount(), "allocations should be registered: ph + normal") assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") assert.Equal(t, allocKey2, result.Request.GetAllocationKey(), "expected allocation of alloc-2 to be returned") - assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") - assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements yet") + assertPlaceholderData(t, app, 1, 0, 0) // release placeholder: do what the context would do after the shim processing release := &si.AllocationRelease{ @@ -3248,9 +3256,9 @@ func TestPlaceholderMatch(t *testing.T) { if confirmed == nil { t.Fatal("confirmed allocation should not be nil") } + assertPlaceholderData(t, app, 1, 0, 1) assert.Equal(t, 2, partition.GetTotalAllocationCount(), "two allocations should be registered") assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") - assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show the replacement") assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2)) // add a new ask the same task group as the placeholder @@ -3265,6 +3273,7 @@ func TestPlaceholderMatch(t *testing.T) { assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 3)) assert.Equal(t, 3, partition.GetTotalAllocationCount(), "three allocations should be registered") assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") + assertPlaceholderData(t, app, 1, 0, 1) } func TestPreemptedPlaceholderSkip(t *testing.T) { @@ -3285,6 +3294,7 @@ func TestPreemptedPlaceholderSkip(t *testing.T) { ask := newAllocationAskTG(phID, appID1, taskGroup, phRes, true) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") + assertPlaceholderData(t, app, 1, 0, 0) // try to allocate a placeholder via normal allocate result := partition.tryAllocate() if result == nil || result.Request == nil { @@ -3293,7 +3303,6 @@ func TestPreemptedPlaceholderSkip(t *testing.T) { ph := result.Request phAllocationKey := result.Request.GetAllocationKey() assert.Equal(t, phID, phAllocationKey, "expected allocation of ph-1 to be returned") - assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should be created on allocate") assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder allocation should be registered as allocation") assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") @@ -3327,7 +3336,8 @@ func TestPreemptedPlaceholderSkip(t *testing.T) { if confirmed != nil { t.Fatal("confirmed allocation should be nil") } - assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].TimedOut, "placeholder data should show the preemption") + // preemption shows as timedout + assertPlaceholderData(t, app, 1, 1, 0) assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation should be registered") assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") @@ -3337,10 +3347,8 @@ func TestPreemptedPlaceholderSkip(t *testing.T) { t.Fatal("expected ask to be allocated (no placeholder left)") } assert.Equal(t, allocKey, result.Request.GetAllocationKey(), "expected allocation of alloc-1 to be returned") - assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should not be updated") - assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") - assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements") - assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].TimedOut, "placeholder data should show the preemption") + // no change is the placeholder tracking + assertPlaceholderData(t, app, 1, 1, 0) assert.Equal(t, 1, partition.GetTotalAllocationCount(), "allocation should be registered as allocation") assert.Equal(t, 0, partition.getPhAllocationCount(), "placeholder allocation should be registered") } @@ -3373,6 +3381,8 @@ func TestTryPlaceholderAllocate(t *testing.T) { ask := newAllocationAskTG(phID, appID1, taskGroup, res, true) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") + assertPlaceholderData(t, app, 1, 0, 0) + // try to allocate placeholder should just return result := partition.tryPlaceholderAllocate() if result != nil { @@ -3397,6 +3407,8 @@ func TestTryPlaceholderAllocate(t *testing.T) { ask = newAllocationAskTG("ph-2", appID1, taskGroup, res, true) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add placeholder ask ph-2 to app") + assertPlaceholderData(t, app, 2, 0, 0) + // try to allocate placeholder should just return result = partition.tryPlaceholderAllocate() if result != nil { @@ -3465,6 +3477,8 @@ func TestTryPlaceholderAllocate(t *testing.T) { if !resources.Equals(app.GetAllocatedResource(), res) { t.Fatalf("allocations not updated as expected: got %s, expected %s", app.GetAllocatedResource(), res) } + assertPlaceholderData(t, app, 2, 0, 1) + assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2)) } @@ -3500,6 +3514,7 @@ func TestFailReplacePlaceholder(t *testing.T) { ask := newAllocationAskTG(phID, appID1, taskGroup, res, true) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") + assertPlaceholderData(t, app, 1, 0, 0) // try to allocate a placeholder via normal allocate result := partition.tryAllocate() @@ -3561,6 +3576,7 @@ func TestFailReplacePlaceholder(t *testing.T) { assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), "node-1 allocated resources should be zero") assert.Assert(t, resources.Equals(node2.GetAllocatedResource(), res), "node-2 allocations not set as expected: got %s, expected %s", node2.GetAllocatedResource(), res) assert.Assert(t, !app.IsCompleting(), "application with allocation should not be in COMPLETING state") + assertPlaceholderData(t, app, 1, 0, 1) assertLimits(t, getTestUserGroup(), res) }