Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2924] [core] Remove occupiedResource handling logic #984

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21

require (
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146 h1:CZ4U7y19YSxNJVBNox3DahhuoxDL++naBl/kj+kqVFc=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
3 changes: 0 additions & 3 deletions pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,6 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) {
if sr := nodeInfo.SchedulableResource; sr != nil {
partition.updatePartitionResource(node.SetCapacity(resources.NewResourceFromProto(sr)))
}
if or := nodeInfo.OccupiedResource; or != nil {
node.SetOccupiedResource(resources.NewResourceFromProto(or))
}
case si.NodeInfo_DRAIN_NODE:
if node.IsSchedulable() {
// set the state to not schedulable
Expand Down
7 changes: 0 additions & 7 deletions pkg/scheduler/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func TestContext_UpdateNode(t *testing.T) {
SchedulableResource: &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 10}}},
}
full := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
half := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
partition := context.GetPartition(pName)
if partition == nil {
t.Fatalf("partition should have been found")
Expand All @@ -121,15 +120,9 @@ func TestContext_UpdateNode(t *testing.T) {
assert.Assert(t, resources.Equals(full, partition.GetTotalPartitionResource()), "partition resource should be updated")
// try to update: fail due to unknown action
n.SchedulableResource = &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 5}}}
n.OccupiedResource = &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 5}}}
context.updateNode(n)
node := partition.GetNode("test-1")
assert.Assert(t, resources.Equals(full, node.GetAvailableResource()), "node available resource should not be updated")
n.Action = si.NodeInfo_UPDATE
context.updateNode(n)
assert.Assert(t, resources.Equals(half, partition.GetTotalPartitionResource()), "partition resource should be updated")
assert.Assert(t, resources.IsZero(node.GetAvailableResource()), "node available should have been updated to zero")
assert.Assert(t, resources.Equals(half, node.GetOccupiedResource()), "node occupied should have been updated")

// other actions
n = &si.NodeInfo{
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewNode(proto *si.NodeInfo) *Node {
reservations: make(map[string]*reservation),
totalResource: resources.NewResourceFromProto(proto.SchedulableResource),
allocatedResource: resources.NewResource(),
occupiedResource: resources.NewResourceFromProto(proto.OccupiedResource),
occupiedResource: resources.NewResource(),
allocations: make(map[string]*Allocation),
schedulable: true,
listeners: make([]NodeListener, 0),
Expand Down
25 changes: 9 additions & 16 deletions pkg/scheduler/objects/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func TestNewNode(t *testing.T) {
if node != nil {
t.Error("node not returned correctly: node is nul or incorrect name")
}
proto := newProto(testNode, nil, nil, nil)
proto := newProto(testNode, nil, nil)
node = NewNode(proto)
if node == nil || node.NodeID != testNode {
t.Error("node not returned correctly: node is nul or incorrect name")
}

totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, "second": 100})
proto = newProto(testNode, totalRes, nil, map[string]string{})
proto = newProto(testNode, totalRes, map[string]string{})
node = NewNode(proto)
if node == nil || node.NodeID != testNode {
t.Fatal("node not returned correctly: node is nul or incorrect name")
Expand All @@ -74,25 +74,20 @@ func TestNewNode(t *testing.T) {
assert.Equal(t, "rack1", node.Rackname)
assert.Equal(t, "partition1", node.Partition)

// test capacity/available/occupied resources
// test capacity/available resources
totalResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, "second": 100})
occupiedResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30, "second": 20})
availableResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 70, "second": 80})
proto = newProto(testNode, totalResources, occupiedResources, map[string]string{})
proto = newProto(testNode, totalResources, map[string]string{})
node = NewNode(proto)
assert.Equal(t, node.NodeID, testNode, "node not returned correctly: node is nul or incorrect name")
if !resources.Equals(node.GetCapacity(), totalResources) {
t.Errorf("node total resources not set correctly: %v expected got %v",
totalResources, node.GetCapacity())
}
if !resources.Equals(node.GetAvailableResource(), availableResources) {
if !resources.Equals(node.GetAvailableResource(), totalResources) {
t.Errorf("node available resources not set correctly: %v expected got %v",
availableResources, node.GetAvailableResource())
}
if !resources.Equals(node.GetOccupiedResource(), occupiedResources) {
t.Errorf("node occupied resources not set correctly: %v expected got %v",
occupiedResources, node.GetOccupiedResource())
}
}

func TestCheckConditions(t *testing.T) {
Expand Down Expand Up @@ -328,7 +323,7 @@ func TestAttributes(t *testing.T) {
testname := fmt.Sprintf("Attributes in the node %d", index)
t.Run(testname, func(t *testing.T) {
nodename := fmt.Sprintf("%s-%d", testNode, index)
node := NewNode(newProto(nodename, nil, nil, tt.inputs))
node := NewNode(newProto(nodename, nil, tt.inputs))
if node == nil || node.NodeID != nodename {
t.Error("node not returned correctly: node is nul or incorrect name")
}
Expand Down Expand Up @@ -363,7 +358,7 @@ func TestAttributes(t *testing.T) {
}

func TestGetInstanceType(t *testing.T) {
proto := newProto(testNode, nil, nil, map[string]string{
proto := newProto(testNode, nil, map[string]string{
common.NodePartition: "partition1",
"label1": "key1",
"label2": "key2",
Expand Down Expand Up @@ -789,8 +784,7 @@ func TestAddRemoveListener(t *testing.T) {
func TestNodeEvents(t *testing.T) {
mockEvents := evtMock.NewEventSystem()
total := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, "memory": 100})
occupied := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 10})
proto := newProto(testNode, total, occupied, map[string]string{
proto := newProto(testNode, total, map[string]string{
"ready": "true",
})
node := NewNode(proto)
Expand Down Expand Up @@ -905,8 +899,7 @@ func TestPreconditions(t *testing.T) {

plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(true, map[string]int{}))
total := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, "memory": 100})
occupied := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 10})
proto := newProto(testNode, total, occupied, map[string]string{
proto := newProto(testNode, total, map[string]string{
"ready": "true",
})
res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/objects/nodesorting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func TestSortPolicyWeighting(t *testing.T) {
nc.SetNodeSortingPolicy(fair)
totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000, "memory": 16000})

proto1 := newProto("test1", totalRes, nil, map[string]string{})
proto1 := newProto("test1", totalRes, map[string]string{})
node1 := NewNode(proto1)
if err := nc.AddNode(node1); err != nil {
t.Fatal("Failed to add node1")
}

proto2 := newProto("test2", totalRes, nil, map[string]string{})
proto2 := newProto("test2", totalRes, map[string]string{})
node2 := NewNode(proto2)
if err := nc.AddNode(node2); err != nil {
t.Fatal("Failed to add node2")
Expand Down Expand Up @@ -232,13 +232,13 @@ func TestSortPolicy(t *testing.T) {
nc.SetNodeSortingPolicy(bp)
totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000, "memory": 4000})

proto1 := newProto("test1", totalRes, nil, map[string]string{})
proto1 := newProto("test1", totalRes, map[string]string{})
node1 := NewNode(proto1)
if err := nc.AddNode(node1); err != nil {
t.Fatal("Failed to add node1")
}

proto2 := newProto("test2", totalRes, nil, map[string]string{})
proto2 := newProto("test2", totalRes, map[string]string{})
node2 := NewNode(proto2)
if err := nc.AddNode(node2); err != nil {
t.Fatal("Failed to add node2")
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestAbsResourceUsage(t *testing.T) {
nc.SetNodeSortingPolicy(fair)
totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0, "memory": 16000})

proto1 := newProto("test1", totalRes, nil, map[string]string{})
proto1 := newProto("test1", totalRes, map[string]string{})
node1 := NewNode(proto1)
if err := nc.AddNode(node1); err != nil {
t.Fatal("Failed to add node1")
Expand Down
11 changes: 1 addition & 10 deletions pkg/scheduler/objects/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func newNodeInternal(nodeID string, total, occupied *resources.Resource) *Node {
return sn
}

func newProto(nodeID string, totalResource, occupiedResource *resources.Resource, attributes map[string]string) *si.NodeInfo {
func newProto(nodeID string, totalResource *resources.Resource, attributes map[string]string) *si.NodeInfo {
proto := si.NodeInfo{
NodeID: nodeID,
Attributes: attributes,
Expand All @@ -204,15 +204,6 @@ func newProto(nodeID string, totalResource, occupiedResource *resources.Resource
}
}

if occupiedResource != nil {
proto.OccupiedResource = &si.Resource{
Resources: map[string]*si.Quantity{},
}
for name, value := range occupiedResource.Resources {
quantity := si.Quantity{Value: int64(value)}
proto.OccupiedResource.Resources[name] = &quantity
}
}
return &proto
}

Expand Down
91 changes: 0 additions & 91 deletions pkg/scheduler/tests/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,97 +564,6 @@ partitions:
}
}

func TestUpdateNodeOccupiedResources(t *testing.T) {
// Register RM
configData := `
partitions:
-
name: default
queues:
- name: root
submitacl: "*"
queues:
- name: a
resources:
max:
memory: 150
vcore: 20
`
// Start all tests
ms := &mockScheduler{}
defer ms.Stop()

err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")

// Check queues of cache and scheduler.
partitionInfo := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
assert.Assert(t, partitionInfo.GetTotalPartitionResource() == nil, "partition info max resource nil")

// Register a node
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100},
"vcore": {Value: 10},
},
},
Action: si.NodeInfo_CREATE,
},
},
RmID: "rm:123",
})

assert.NilError(t, err, "NodeRequest failed")

// Wait until node is registered
context := ms.scheduler.GetClusterContext()
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000)

// verify node capacity
assert.Equal(t, len(partitionInfo.GetNodes()), 1)
node1 := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
schedulingNode1 := ms.scheduler.GetClusterContext().
GetNode("node-1:1234", "[rm:123]default")
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))

// update node capacity
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
{
NodeID: "node-1:1234",
Attributes: map[string]string{},
OccupiedResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 80},
"vcore": {Value: 5},
},
},
Action: si.NodeInfo_UPDATE,
},
},
RmID: "rm:123",
})

assert.NilError(t, err, "NodeRequest failed")

waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 20, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20))
}

func TestForeignPodResourceUsage(t *testing.T) {
// Register RM
configData := `
Expand Down
7 changes: 1 addition & 6 deletions pkg/scheduler/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,20 +647,15 @@ func newAllocationAskPreempt(allocKey, appID string, prio int32, res *resources.
})
}

func newNodeWithResources(nodeID string, max, occupied *resources.Resource) *objects.Node {
func newNodeMaxResource(nodeID string, max *resources.Resource) *objects.Node {
proto := &si.NodeInfo{
NodeID: nodeID,
Attributes: map[string]string{},
SchedulableResource: max.ToProto(),
OccupiedResource: occupied.ToProto(),
}
return objects.NewNode(proto)
}

func newNodeMaxResource(nodeID string, max *resources.Resource) *objects.Node {
return newNodeWithResources(nodeID, max, nil)
}

// partition with an expected basic queue hierarchy
// root -> parent -> leaf1
//
Expand Down
Loading