Skip to content

Commit

Permalink
[YUNIKORN-2926] placeholder counters incorrect
Browse files Browse the repository at this point in the history
Placeholder tracking data is maintained inside the application for
scheduling. If the placeholder is released we update the counters in the
tracking data. We have cases in which we do not do that correctly:
* placeholders are smaller than the real allocation
* placeholder does not have an allocation to replace
* all allocations are removed from an application

Tests updated to check all the counters inside the placeholder data for
consistency.
  • Loading branch information
wilfred-s committed Oct 18, 2024
1 parent 44705ae commit 566d53c
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 116 deletions.
49 changes: 32 additions & 17 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,45 +1805,49 @@ func (sa *Application) updatePreemptedResource(info *Allocation) {
info.GetAllocatedResource(), info.GetBindTime())
}

// ReplaceAllocation removes the placeholder from the allocation list and replaces it with the real allocation.
// If no replacing allocation is linked to the placeholder it will still be removed from the application.
// Queue and Node objects are updated by the caller.
func (sa *Application) ReplaceAllocation(allocationKey string) *Allocation {
sa.Lock()
defer sa.Unlock()
// remove the placeholder that was just confirmed by the shim
ph := sa.removeAllocationInternal(allocationKey, si.TerminationType_PLACEHOLDER_REPLACED)
// this has already been replaced or it is a duplicate message from the shim
if ph == nil || !ph.HasRelease() {
log.Log(log.SchedApplication).Debug("Unexpected placeholder released",
// this has already been replaced, or it is a duplicate message from the shim just ignore
if ph == nil {
return nil
}
// ph is the placeholder, the releases entry points to the real allocation we need to swap in
alloc := ph.GetRelease()
if alloc == nil {
log.Log(log.SchedApplication).Warn("Placeholder replaced without replacement allocation",
zap.String("applicationID", sa.ApplicationID),
zap.Stringer("placeholder", ph))
return nil
return ph
}
// update the replacing allocation
// we double linked the real and placeholder allocation
// ph is the placeholder, the releases entry points to the real one
alloc := ph.GetRelease()
alloc.SetPlaceholderUsed(true)
alloc.SetPlaceholderCreateTime(ph.GetCreateTime())
alloc.SetBindTime(time.Now())
sa.addAllocationInternal(Replaced, alloc)
// order is important: clean up the allocation after adding it to the app
// we need the original Replaced allocation resultType.
alloc.ClearRelease()
if sa.placeholderData != nil {
sa.placeholderData[ph.GetTaskGroup()].Replaced++
}
return ph
}

// Remove the Allocation from the application.
// RemoveAllocation removes the Allocation from the application.
// Return the allocation that was removed or nil if not found.
func (sa *Application) RemoveAllocation(allocationKey string, releaseType si.TerminationType) *Allocation {
sa.Lock()
defer sa.Unlock()
return sa.removeAllocationInternal(allocationKey, releaseType)
}

// Remove the Allocation from the application
// No locking must be called while holding the lock
// removeAllocationInternal removes the Allocation from the application.
// Returns the allocation that was removed or nil if not found.
// No locking must be called while holding the application lock.
func (sa *Application) removeAllocationInternal(allocationKey string, releaseType si.TerminationType) *Allocation {
alloc := sa.allocations[allocationKey]

Expand All @@ -1858,9 +1862,14 @@ func (sa *Application) removeAllocationInternal(allocationKey string, releaseTyp
// update correct allocation tracker
if alloc.IsPlaceholder() {
// make sure we account for the placeholders being removed in the tracking data
if releaseType == si.TerminationType_STOPPED_BY_RM || releaseType == si.TerminationType_PREEMPTED_BY_SCHEDULER || releaseType == si.TerminationType_UNKNOWN_TERMINATION_TYPE {
if _, ok := sa.placeholderData[alloc.taskGroupName]; ok {
sa.placeholderData[alloc.taskGroupName].TimedOut++
// update based on termination type: everything is counted as a timeout except for a real replace
if sa.placeholderData != nil {
if phData, ok := sa.placeholderData[alloc.taskGroupName]; ok {
if releaseType == si.TerminationType_PLACEHOLDER_REPLACED {
phData.Replaced++
} else {
phData.TimedOut++
}
}
}
// as and when every ph gets removed (for replacement), resource usage would be reduced.
Expand Down Expand Up @@ -1933,14 +1942,20 @@ func (sa *Application) hasZeroAllocations() bool {
return resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource)
}

// Remove all allocations from the application.
// RemoveAllAllocations removes all allocations from the application.
// All allocations that have been removed are returned.
func (sa *Application) RemoveAllAllocations() []*Allocation {
sa.Lock()
defer sa.Unlock()

allocationsToRelease := make([]*Allocation, 0)
for _, alloc := range sa.allocations {
// update placeholder tracking data
if alloc.IsPlaceholder() && sa.placeholderData != nil {
if phData, ok := sa.placeholderData[alloc.taskGroupName]; ok {
phData.TimedOut++
}

Check warning on line 1957 in pkg/scheduler/objects/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/application.go#L1955-L1957

Added lines #L1955 - L1957 were not covered by tests
}
allocationsToRelease = append(allocationsToRelease, alloc)
// Aggregate the resources used by this alloc to the application's user resource tracker
sa.trackCompletedResource(alloc)
Expand All @@ -1959,7 +1974,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
// When the resource trackers are zero we should not expect anything to come in later.
if resources.IsZero(sa.pending) {
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
log.Log(log.SchedApplication).Warn("Application state not changed to Waiting while removing all allocations",
log.Log(log.SchedApplication).Warn("Application state not changed to Completing while removing all allocations",
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
Expand Down
120 changes: 49 additions & 71 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,32 +535,21 @@ func TestAddAllocAsk(t *testing.T) {
assert.Equal(t, len(clonePlaceholderData), 1)
assert.Equal(t, len(app.placeholderData), 1)
assert.Equal(t, clonePlaceholderData[0], app.placeholderData[tg1])
assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1)
assert.Equal(t, app.placeholderData[tg1].Count, int64(1))
assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0))
assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1))
assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res)
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)

ask = newAllocationAskTG(aKey, appID1, tg1, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been updated on app")
assert.Equal(t, len(app.placeholderData), 1)
assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1)
assert.Equal(t, app.placeholderData[tg1].Count, int64(2))
assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0))
assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1))
assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res)
assertPlaceholderData(t, app, tg1, 2, 1, 0, res)

tg2 := "tg-2"
ask = newAllocationAskTG(aKey, appID1, tg2, res)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been updated on app")

assert.Equal(t, len(app.placeholderData), 2)
assert.Equal(t, app.placeholderData[tg2].TaskGroupName, tg2)
assert.Equal(t, app.placeholderData[tg2].Count, int64(1))
assert.Equal(t, app.placeholderData[tg2].Replaced, int64(0))
assert.Equal(t, app.placeholderData[tg2].TimedOut, int64(0))
assert.DeepEqual(t, app.placeholderData[tg2].MinResource, res)
assertPlaceholderData(t, app, tg2, 1, 0, 0, res)
}

// test state change on add and remove ask
Expand Down Expand Up @@ -632,15 +621,7 @@ func TestRecoverAllocAsk(t *testing.T) {
assert.Equal(t, 0, len(app.placeholderData))
ask = newAllocationAskTG("ask-3", appID1, "testGroup", res)
app.RecoverAllocationAsk(ask)
phData := app.placeholderData
assert.Equal(t, 1, len(phData))
taskGroupData := phData["testGroup"]
assert.Assert(t, taskGroupData != nil)
assert.Equal(t, "testGroup", taskGroupData.TaskGroupName)
assert.Equal(t, int64(1), taskGroupData.Count)
assert.Equal(t, int64(0), taskGroupData.Replaced)
assert.Equal(t, int64(0), taskGroupData.TimedOut)
assert.Assert(t, resources.Equals(taskGroupData.MinResource, res))
assertPlaceholderData(t, app, "testGroup", 1, 0, 0, res)
}

// test reservations removal by allocation
Expand Down Expand Up @@ -1232,13 +1213,24 @@ func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySec
}
}

func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64,
vcoresSecconds int64) {
func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, vcoresSecconds int64) {
detailedResource := appSummary.PlaceholderResource.TrackedResourceMap[instType1]
assert.Equal(t, memorySeconds, int64(detailedResource.Resources["memory"]))
assert.Equal(t, vcoresSecconds, int64(detailedResource.Resources["vcores"]))
}

func assertPlaceholderData(t *testing.T, app *Application, taskGroup string, count, timedout, replaced int, res *resources.Resource) {
assert.Assert(t, len(app.placeholderData) >= 1, "expected placeholder data to be set")
phData, ok := app.placeholderData[taskGroup]
assert.Assert(t, ok, "placeholder data not for the taskgroup: %s", taskGroup)
assert.Equal(t, phData.Count, int64(count), "placeholder count does not match")
assert.Equal(t, phData.Replaced, int64(replaced), "replaced count does not match")
assert.Equal(t, phData.TimedOut, int64(timedout), "timedout count does not match")
if res != nil {
assert.Assert(t, resources.Equals(phData.MinResource, res), "resource for taskgroup is not correct")
}
}

func TestResourceUsageAggregation(t *testing.T) {
setupUGM()

Expand Down Expand Up @@ -1421,29 +1413,29 @@ func TestReplaceAllocation(t *testing.T) {
// add the placeholder to the app
app.AddAllocation(ph)
// add PlaceholderData
app.addPlaceholderDataWithLocking(ph)
assert.Equal(t, len(app.placeholderData), 1)
assert.Equal(t, app.placeholderData["tg"].TaskGroupName, "tg")
assert.Equal(t, app.placeholderData["tg"].Count, int64(1))
assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0))
assert.Equal(t, app.placeholderData["tg"].TimedOut, int64(0))
assert.DeepEqual(t, app.placeholderData["tg"].MinResource, res)
app.addPlaceholderData(ph)
assertPlaceholderData(t, app, "tg", 1, 0, 0, res)

assert.Equal(t, len(app.allocations), 1, "allocation not added as expected")
assert.Assert(t, resources.IsZero(app.allocatedResource), "placeholder counted as real allocation")
if !resources.Equals(app.allocatedPlaceholder, res) {
t.Fatalf("placeholder allocation not updated as expected: got %s, expected %s", app.allocatedPlaceholder, res)
}
assertUserGroupResource(t, getTestUserGroup(), res)
// if the placeholder exists it should be removed even without real allocation linked
alloc = app.ReplaceAllocation(ph.GetAllocationKey())
assert.Equal(t, alloc, nilAlloc, "placeholder without releases expected nil to be returned got a real alloc: %s", alloc)
assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0))
assert.Equal(t, alloc, ph, "returned allocation is not the placeholder")
// placeholder data must show replaced increase
assertPlaceholderData(t, app, "tg", 1, 0, 1, res)
assertUserGroupResource(t, getTestUserGroup(), nil)
assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "placeholder should have been released")
assert.Assert(t, resources.IsZero(app.allocatedResource), "no replacement made allocated should be zero")

// add the placeholder back to the app, the failure test above changed state and removed the ph
app.SetState(Running.String())
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
assert.Equal(t, app.placeholderData["tg"].Count, int64(2))
app.addPlaceholderData(ph)
assertPlaceholderData(t, app, "tg", 2, 0, 1, res)
assertUserGroupResource(t, getTestUserGroup(), res)

// set the real one to replace the placeholder
Expand All @@ -1452,19 +1444,17 @@ func TestReplaceAllocation(t *testing.T) {
alloc = app.ReplaceAllocation(ph.GetAllocationKey())
assert.Equal(t, alloc, ph, "returned allocation is not the placeholder")
assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "real allocation counted as placeholder")
if !resources.Equals(app.allocatedResource, res) {
t.Fatalf("real allocation not updated as expected: got %s, expected %s", app.allocatedResource, res)
}
assert.Equal(t, app.placeholderData["tg"].Replaced, int64(1))
assert.Assert(t, resources.Equals(app.allocatedResource, res), "real allocation not updated as expected")
assertPlaceholderData(t, app, "tg", 2, 0, 2, res)
assert.Equal(t, realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime(), "real allocation's placeholder create time not updated as expected: got %s, expected %s", realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime())
assertUserGroupResource(t, getTestUserGroup(), res)

// add the placeholder back to the app, the failure test above changed state and removed the ph
app.SetState(Running.String())
ph.ClearRelease()
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
assert.Equal(t, app.placeholderData["tg"].Count, int64(3))
app.addPlaceholderData(ph)
assertPlaceholderData(t, app, "tg", 3, 0, 2, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))
}

Expand All @@ -1485,14 +1475,14 @@ func TestReplaceAllocationTracking(t *testing.T) {
ph3.SetInstanceType(instType1)
app.AddAllocation(ph1)
assert.NilError(t, err, "could not add ask")
app.addPlaceholderDataWithLocking(ph1)
app.addPlaceholderData(ph1)
assert.Equal(t, true, app.HasPlaceholderAllocation())
app.AddAllocation(ph2)
assert.NilError(t, err, "could not add ask")
app.addPlaceholderDataWithLocking(ph2)
app.addPlaceholderData(ph2)
app.AddAllocation(ph3)
assert.NilError(t, err, "could not add ask")
app.addPlaceholderDataWithLocking(ph3)
app.addPlaceholderData(ph3)

ph1.SetBindTime(time.Now().Add(-10 * time.Second))
ph2.SetBindTime(time.Now().Add(-10 * time.Second))
Expand Down Expand Up @@ -1558,12 +1548,7 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
assert.Assert(t, app.IsAccepted(), "Application should be in accepted state")

// check PlaceHolderData
assert.Equal(t, len(app.placeholderData), 1)
assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1)
assert.Equal(t, app.placeholderData[tg1].Count, int64(1))
assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0))
assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(0))
assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res)
assertPlaceholderData(t, app, tg1, 1, 0, 0, res)

// add the placeholder to the app
ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg")
Expand All @@ -1579,7 +1564,8 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
defer app.RUnlock()
return app.placeholderTimer == nil
})
assert.Equal(t, app.placeholderData[tg1].TimedOut, app.placeholderData[tg1].Count, "When the app is in an accepted state, timeout should equal to count")
// When the app is in an accepted state, timeout should equal to count
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly")
assert.Equal(t, app.stateMachine.Current(), expectedState, "Application did not progress into expected state")
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))
Expand All @@ -1593,10 +1579,8 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
idx++
}
}
// check if the Replaced of PlaceHolderData is 0
assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0))
// Because the Count of PlaceHolderData is only added in AddAllocationAsk, so it is 1
assert.Equal(t, app.placeholderData[tg1].Count, int64(1))
// Because the Count of PlaceHolderData is only added in AddAllocationAsk, so it stays 1
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)

assert.Equal(t, found, 2, "release allocation or ask event not found in list")
// asks are completely cleaned up
Expand Down Expand Up @@ -1631,19 +1615,14 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
app.AddAllocation(ph)
// add PlaceholderData
app.addPlaceholderDataWithLocking(ph)
assert.Equal(t, len(app.placeholderData), 1)
assert.Equal(t, app.placeholderData["tg"].TaskGroupName, "tg")
assert.Equal(t, app.placeholderData["tg"].Count, int64(1))
assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0))
assert.Equal(t, app.placeholderData["tg"].TimedOut, int64(0))
assert.DeepEqual(t, app.placeholderData["tg"].MinResource, res)
assertPlaceholderData(t, app, "tg", 1, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1))

assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation")
ph = newPlaceholderAlloc(appID1, nodeID1, res, "tg")
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
assert.Equal(t, app.placeholderData["tg"].Count, int64(2))
assertPlaceholderData(t, app, "tg", 2, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

alloc := newAllocation(appID1, nodeID1, res)
Expand All @@ -1669,8 +1648,7 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res), "Unexpected allocated resources for the app")
// a released placeholder still holds the resource until release confirmed by the RM
assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
assert.Equal(t, app.placeholderData["tg"].Replaced, int64(0))
assert.Equal(t, app.placeholderData["tg"].TimedOut, int64(1))
assertPlaceholderData(t, app, "tg", 2, 1, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 3))
}

Expand Down Expand Up @@ -1890,14 +1868,14 @@ func TestCanReplace(t *testing.T) {
}
// add the placeholder data
// available tg has one replacement open
app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg1, res))
app.addPlaceholderData(newAllocationAskTG(aKey, appID1, tg1, res))
// unavailable tg has NO replacement open (replaced)
tg2 := "unavailable"
app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg2, res))
app.addPlaceholderData(newAllocationAskTG(aKey, appID1, tg2, res))
app.placeholderData[tg2].Replaced++
// unavailable tg has NO replacement open (timedout)
tg3 := "timedout"
app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg3, res))
app.addPlaceholderData(newAllocationAskTG(aKey, appID1, tg3, res))
app.placeholderData[tg3].TimedOut++
tests = []struct {
name string
Expand Down Expand Up @@ -2737,8 +2715,8 @@ func TestGetOutstandingRequests_AskReplaceable(t *testing.T) {
sr.insert(allocationAsk2)
sr.insert(allocationAsk3)
app.sortedRequests = sr
app.addPlaceholderDataWithLocking(allocationAsk1)
app.addPlaceholderDataWithLocking(allocationAsk2)
app.addPlaceholderData(allocationAsk1)
app.addPlaceholderData(allocationAsk2)

var total []*Allocation
headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10})
Expand Down
Loading

0 comments on commit 566d53c

Please sign in to comment.