Skip to content

Commit

Permalink
[YUNIKORN-2978] Fix handling of reserved allocations where node diffe…
Browse files Browse the repository at this point in the history
…rs (#996)

YUNIKORN-2700 introduced a bug where allocations of previously-reserved
tasks were not handled correctly in the case where we schedule on a
different node than the reservation. Ensure that we unreserve and
allocate using the proper node in both cases.

Also introduce additional logging of allocations on nodes to make
finding issues like this easier in the future.

Closes: #996
  • Loading branch information
craigcondit committed Nov 19, 2024
1 parent ac32595 commit 135bc78
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 20 deletions.
22 changes: 22 additions & 0 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
}
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, alloc.allocationKey, alloc.GetAllocatedResource())
log.Log(log.SchedNode).Info("node allocation removed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("allocatedResource", alloc.GetAllocatedResource()),
zap.Bool("placeholder", alloc.IsPlaceholder()),
zap.String("targetNode", sn.NodeID))
return alloc
}

Expand Down Expand Up @@ -382,6 +388,10 @@ func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation {
delta := resources.Sub(newResource, existingResource)
delta.Prune()

log.Log(log.SchedNode).Info("node foreign allocation updated",
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("deltaResource", delta),
zap.String("targetNode", sn.NodeID))
sn.occupiedResource.AddTo(delta)
sn.occupiedResource.Prune()
sn.refreshAvailableResource()
Expand Down Expand Up @@ -416,6 +426,12 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, alloc.allocationKey, res)
log.Log(log.SchedNode).Info("node allocation processed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("allocatedResource", alloc.GetAllocatedResource()),
zap.Bool("placeholder", alloc.IsPlaceholder()),
zap.String("targetNode", sn.NodeID))
result = true
return result
}
Expand All @@ -440,6 +456,12 @@ func (sn *Node) ReplaceAllocation(allocationKey string, replace *Allocation, del
sn.allocatedResource.AddTo(delta)
sn.availableResource.SubFrom(delta)
sn.availableResource.Prune()
log.Log(log.SchedNode).Info("node allocation replaced",
zap.String("appID", replace.GetApplicationID()),
zap.String("allocationKey", replace.GetAllocationKey()),
zap.Stringer("allocatedResource", replace.GetAllocatedResource()),
zap.String("placeholderKey", allocationKey),
zap.String("targetNode", sn.NodeID))
if !before.FitIn(sn.allocatedResource) {
log.Log(log.SchedNode).Warn("unexpected increase in node usage after placeholder replacement",
zap.String("placeholder allocationKey", allocationKey),
Expand Down
51 changes: 31 additions & 20 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,44 +869,55 @@ func (pc *PartitionContext) allocate(result *objects.AllocationResult) *objects.
// find the node make sure it still exists
// if the node was passed in use that ID instead of the one from the allocation
// the node ID is set when a reservation is allocated on a non-reserved node
var nodeID string
alloc := result.Request
if result.ReservedNodeID == "" {
nodeID = result.NodeID
} else {
nodeID = result.ReservedNodeID
log.Log(log.SchedPartition).Debug("Reservation allocated on different node",
zap.String("current node", result.NodeID),
zap.String("reserved node", nodeID),
zap.String("appID", appID))
}
node := pc.GetNode(nodeID)
if node == nil {
log.Log(log.SchedPartition).Info("Node was removed while allocating",
zap.String("nodeID", nodeID),
targetNodeID := result.NodeID
targetNode := pc.GetNode(targetNodeID)
if targetNode == nil {
log.Log(log.SchedPartition).Info("Target node was removed while allocating",
zap.String("nodeID", targetNodeID),
zap.String("appID", appID))

// attempt to deallocate
if alloc.IsAllocated() {
allocKey := alloc.GetAllocationKey()
if _, err := app.DeallocateAsk(allocKey); err != nil {
log.Log(log.SchedPartition).Warn("Failed to unwind allocation",
zap.String("nodeID", nodeID),
zap.String("nodeID", targetNodeID),
zap.String("appID", appID),
zap.String("allocationKey", allocKey),
zap.Error(err))
}
}
return nil
}

// reservation
if result.ResultType == objects.Reserved {
pc.reserve(app, node, result.Request)
pc.reserve(app, targetNode, result.Request)
return nil
}

// unreserve
if result.ResultType == objects.Unreserved || result.ResultType == objects.AllocatedReserved {
pc.unReserve(app, node, result.Request)
var reservedNodeID string
if result.ReservedNodeID == "" {
reservedNodeID = result.NodeID
} else {
reservedNodeID = result.ReservedNodeID
log.Log(log.SchedPartition).Debug("Reservation allocated on different node",
zap.String("current node", result.NodeID),
zap.String("reserved node", reservedNodeID),
zap.String("appID", appID))
}

reservedNode := pc.GetNode(reservedNodeID)
if reservedNode != nil {
pc.unReserve(app, reservedNode, result.Request)
} else {
log.Log(log.SchedPartition).Info("Reserved node was removed while allocating",
zap.String("nodeID", reservedNodeID),
zap.String("appID", appID))
}
if result.ResultType == objects.Unreserved {
return nil
}
Expand All @@ -915,8 +926,8 @@ func (pc *PartitionContext) allocate(result *objects.AllocationResult) *objects.
}

alloc.SetBindTime(time.Now())
alloc.SetNodeID(nodeID)
alloc.SetInstanceType(node.GetInstanceType())
alloc.SetNodeID(targetNodeID)
alloc.SetInstanceType(targetNode.GetInstanceType())

// track the number of allocations
pc.updateAllocationCount(1)
Expand All @@ -929,7 +940,7 @@ func (pc *PartitionContext) allocate(result *objects.AllocationResult) *objects.
zap.String("allocationKey", result.Request.GetAllocationKey()),
zap.Stringer("allocatedResource", result.Request.GetAllocatedResource()),
zap.Bool("placeholder", result.Request.IsPlaceholder()),
zap.String("targetNode", alloc.GetNodeID()))
zap.String("targetNode", targetNodeID))
// pass the allocation result back to the RM via the cluster context
return result
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2369,6 +2369,9 @@ func TestAllocReserveNewNode(t *testing.T) {
assert.Equal(t, 0, len(node1.GetReservationKeys()), "old node should have no more reservations")
assert.Equal(t, 0, len(app.GetReservations()), "ask should have been reserved")
assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
alloc2 := node2.GetAllocation("alloc-2")
assert.Assert(t, alloc2 != nil, "alloc was nil")
assert.Equal(t, nodeID2, alloc2.GetNodeID(), "wrong node id")
}

func TestTryAllocateReserve(t *testing.T) {
Expand Down

0 comments on commit 135bc78

Please sign in to comment.