diff --git a/go.mod b/go.mod index 26e243e83..d432592ec 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core go 1.21 require ( - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 + github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146 github.com/google/btree v1.1.2 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 026aa4f79..1a191ecfa 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146 h1:CZ4U7y19YSxNJVBNox3DahhuoxDL++naBl/kj+kqVFc= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index c2bec588d..ca2aebe4b 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -661,9 +661,6 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) { if sr := nodeInfo.SchedulableResource; sr != nil { partition.updatePartitionResource(node.SetCapacity(resources.NewResourceFromProto(sr))) } - if or := nodeInfo.OccupiedResource; or != nil { - node.SetOccupiedResource(resources.NewResourceFromProto(or)) - } case si.NodeInfo_DRAIN_NODE: if node.IsSchedulable() { // set the state to not schedulable diff --git a/pkg/scheduler/context_test.go b/pkg/scheduler/context_test.go index ea9d34145..854623e48 100644 --- a/pkg/scheduler/context_test.go +++ b/pkg/scheduler/context_test.go @@ -100,7 +100,6 @@ func TestContext_UpdateNode(t *testing.T) { SchedulableResource: &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 10}}}, } full := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) - half := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) partition := context.GetPartition(pName) if partition == nil { t.Fatalf("partition should have been found") @@ -121,15 +120,9 @@ func TestContext_UpdateNode(t *testing.T) { assert.Assert(t, resources.Equals(full, partition.GetTotalPartitionResource()), "partition resource should be updated") // try to update: fail due to unknown action n.SchedulableResource = &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 5}}} - n.OccupiedResource = &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 5}}} context.updateNode(n) node := partition.GetNode("test-1") assert.Assert(t, resources.Equals(full, node.GetAvailableResource()), "node available resource should not be updated") - n.Action = si.NodeInfo_UPDATE - context.updateNode(n) - assert.Assert(t, resources.Equals(half, partition.GetTotalPartitionResource()), "partition resource should be updated") - assert.Assert(t, resources.IsZero(node.GetAvailableResource()), "node available should have been updated to zero") - assert.Assert(t, resources.Equals(half, node.GetOccupiedResource()), "node occupied should have been updated") // other actions n = &si.NodeInfo{ diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go index db51fa746..29d26f80f 100644 --- a/pkg/scheduler/objects/node.go +++ b/pkg/scheduler/objects/node.go @@ -73,7 +73,7 @@ func NewNode(proto *si.NodeInfo) *Node { reservations: make(map[string]*reservation), totalResource: resources.NewResourceFromProto(proto.SchedulableResource), allocatedResource: resources.NewResource(), - occupiedResource: resources.NewResourceFromProto(proto.OccupiedResource), + occupiedResource: resources.NewResource(), allocations: make(map[string]*Allocation), schedulable: true, listeners: make([]NodeListener, 0), diff --git a/pkg/scheduler/objects/node_test.go b/pkg/scheduler/objects/node_test.go index 4b38e4360..bf0d7b6c5 100644 --- a/pkg/scheduler/objects/node_test.go +++ b/pkg/scheduler/objects/node_test.go @@ -41,14 +41,14 @@ func TestNewNode(t *testing.T) { if node != nil { t.Error("node not returned correctly: node is nul or incorrect name") } - proto := newProto(testNode, nil, nil, nil) + proto := newProto(testNode, nil, nil) node = NewNode(proto) if node == nil || node.NodeID != testNode { t.Error("node not returned correctly: node is nul or incorrect name") } totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, "second": 100}) - proto = newProto(testNode, totalRes, nil, map[string]string{}) + proto = newProto(testNode, totalRes, map[string]string{}) node = NewNode(proto) if node == nil || node.NodeID != testNode { t.Fatal("node not returned correctly: node is nul or incorrect name") @@ -74,25 +74,20 @@ func TestNewNode(t *testing.T) { assert.Equal(t, "rack1", node.Rackname) assert.Equal(t, "partition1", node.Partition) - // test capacity/available/occupied resources + // test capacity/available resources totalResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, "second": 100}) - occupiedResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30, "second": 20}) availableResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 70, "second": 80}) - proto = newProto(testNode, totalResources, occupiedResources, map[string]string{}) + proto = newProto(testNode, totalResources, map[string]string{}) node = NewNode(proto) assert.Equal(t, node.NodeID, testNode, "node not returned correctly: node is nul or incorrect name") if !resources.Equals(node.GetCapacity(), totalResources) { t.Errorf("node total resources not set correctly: %v expected got %v", totalResources, node.GetCapacity()) } - if !resources.Equals(node.GetAvailableResource(), availableResources) { + if !resources.Equals(node.GetAvailableResource(), totalResources) { t.Errorf("node available resources not set correctly: %v expected got %v", availableResources, node.GetAvailableResource()) } - if !resources.Equals(node.GetOccupiedResource(), occupiedResources) { - t.Errorf("node occupied resources not set correctly: %v expected got %v", - occupiedResources, node.GetOccupiedResource()) - } } func TestCheckConditions(t *testing.T) { @@ -328,7 +323,7 @@ func TestAttributes(t *testing.T) { testname := fmt.Sprintf("Attributes in the node %d", index) t.Run(testname, func(t *testing.T) { nodename := fmt.Sprintf("%s-%d", testNode, index) - node := NewNode(newProto(nodename, nil, nil, tt.inputs)) + node := NewNode(newProto(nodename, nil, tt.inputs)) if node == nil || node.NodeID != nodename { t.Error("node not returned correctly: node is nul or incorrect name") } @@ -363,7 +358,7 @@ func TestAttributes(t *testing.T) { } func TestGetInstanceType(t *testing.T) { - proto := newProto(testNode, nil, nil, map[string]string{ + proto := newProto(testNode, nil, map[string]string{ common.NodePartition: "partition1", "label1": "key1", "label2": "key2", @@ -789,8 +784,7 @@ func TestAddRemoveListener(t *testing.T) { func TestNodeEvents(t *testing.T) { mockEvents := evtMock.NewEventSystem() total := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, "memory": 100}) - occupied := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 10}) - proto := newProto(testNode, total, occupied, map[string]string{ + proto := newProto(testNode, total, map[string]string{ "ready": "true", }) node := NewNode(proto) @@ -905,8 +899,7 @@ func TestPreconditions(t *testing.T) { plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(true, map[string]int{})) total := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, "memory": 100}) - occupied := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 10}) - proto := newProto(testNode, total, occupied, map[string]string{ + proto := newProto(testNode, total, map[string]string{ "ready": "true", }) res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) diff --git a/pkg/scheduler/objects/nodesorting_test.go b/pkg/scheduler/objects/nodesorting_test.go index 896b0e709..8d14ee217 100644 --- a/pkg/scheduler/objects/nodesorting_test.go +++ b/pkg/scheduler/objects/nodesorting_test.go @@ -166,13 +166,13 @@ func TestSortPolicyWeighting(t *testing.T) { nc.SetNodeSortingPolicy(fair) totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000, "memory": 16000}) - proto1 := newProto("test1", totalRes, nil, map[string]string{}) + proto1 := newProto("test1", totalRes, map[string]string{}) node1 := NewNode(proto1) if err := nc.AddNode(node1); err != nil { t.Fatal("Failed to add node1") } - proto2 := newProto("test2", totalRes, nil, map[string]string{}) + proto2 := newProto("test2", totalRes, map[string]string{}) node2 := NewNode(proto2) if err := nc.AddNode(node2); err != nil { t.Fatal("Failed to add node2") @@ -232,13 +232,13 @@ func TestSortPolicy(t *testing.T) { nc.SetNodeSortingPolicy(bp) totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000, "memory": 4000}) - proto1 := newProto("test1", totalRes, nil, map[string]string{}) + proto1 := newProto("test1", totalRes, map[string]string{}) node1 := NewNode(proto1) if err := nc.AddNode(node1); err != nil { t.Fatal("Failed to add node1") } - proto2 := newProto("test2", totalRes, nil, map[string]string{}) + proto2 := newProto("test2", totalRes, map[string]string{}) node2 := NewNode(proto2) if err := nc.AddNode(node2); err != nil { t.Fatal("Failed to add node2") @@ -313,7 +313,7 @@ func TestAbsResourceUsage(t *testing.T) { nc.SetNodeSortingPolicy(fair) totalRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0, "memory": 16000}) - proto1 := newProto("test1", totalRes, nil, map[string]string{}) + proto1 := newProto("test1", totalRes, map[string]string{}) node1 := NewNode(proto1) if err := nc.AddNode(node1); err != nil { t.Fatal("Failed to add node1") diff --git a/pkg/scheduler/objects/utilities_test.go b/pkg/scheduler/objects/utilities_test.go index 1f81bddfd..f1369a7ea 100644 --- a/pkg/scheduler/objects/utilities_test.go +++ b/pkg/scheduler/objects/utilities_test.go @@ -188,7 +188,7 @@ func newNodeInternal(nodeID string, total, occupied *resources.Resource) *Node { return sn } -func newProto(nodeID string, totalResource, occupiedResource *resources.Resource, attributes map[string]string) *si.NodeInfo { +func newProto(nodeID string, totalResource *resources.Resource, attributes map[string]string) *si.NodeInfo { proto := si.NodeInfo{ NodeID: nodeID, Attributes: attributes, @@ -204,15 +204,6 @@ func newProto(nodeID string, totalResource, occupiedResource *resources.Resource } } - if occupiedResource != nil { - proto.OccupiedResource = &si.Resource{ - Resources: map[string]*si.Quantity{}, - } - for name, value := range occupiedResource.Resources { - quantity := si.Quantity{Value: int64(value)} - proto.OccupiedResource.Resources[name] = &quantity - } - } return &proto } diff --git a/pkg/scheduler/tests/operation_test.go b/pkg/scheduler/tests/operation_test.go index 2245697b0..4a0c60158 100644 --- a/pkg/scheduler/tests/operation_test.go +++ b/pkg/scheduler/tests/operation_test.go @@ -564,97 +564,6 @@ partitions: } } -func TestUpdateNodeOccupiedResources(t *testing.T) { - // Register RM - configData := ` -partitions: - - - name: default - queues: - - name: root - submitacl: "*" - queues: - - name: a - resources: - max: - memory: 150 - vcore: 20 -` - // Start all tests - ms := &mockScheduler{} - defer ms.Stop() - - err := ms.Init(configData, false, false) - assert.NilError(t, err, "RegisterResourceManager failed") - - // Check queues of cache and scheduler. - partitionInfo := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default") - assert.Assert(t, partitionInfo.GetTotalPartitionResource() == nil, "partition info max resource nil") - - // Register a node - err = ms.proxy.UpdateNode(&si.NodeRequest{ - Nodes: []*si.NodeInfo{ - { - NodeID: "node-1:1234", - Attributes: map[string]string{}, - SchedulableResource: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 100}, - "vcore": {Value: 10}, - }, - }, - Action: si.NodeInfo_CREATE, - }, - }, - RmID: "rm:123", - }) - - assert.NilError(t, err, "NodeRequest failed") - - // Wait until node is registered - context := ms.scheduler.GetClusterContext() - ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) - waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000) - - // verify node capacity - assert.Equal(t, len(partitionInfo.GetNodes()), 1) - node1 := partitionInfo.GetNode("node-1:1234") - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - schedulingNode1 := ms.scheduler.GetClusterContext(). - GetNode("node-1:1234", "[rm:123]default") - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100)) - - // update node capacity - err = ms.proxy.UpdateNode(&si.NodeRequest{ - Nodes: []*si.NodeInfo{ - { - NodeID: "node-1:1234", - Attributes: map[string]string{}, - OccupiedResource: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 80}, - "vcore": {Value: 5}, - }, - }, - Action: si.NodeInfo_UPDATE, - }, - }, - RmID: "rm:123", - }) - - assert.NilError(t, err, "NodeRequest failed") - - waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", - []string{"node-1:1234"}, 20, 1000) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5)) - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20)) -} - func TestForeignPodResourceUsage(t *testing.T) { // Register RM configData := ` diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index 56845fd9e..05986c039 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -647,20 +647,15 @@ func newAllocationAskPreempt(allocKey, appID string, prio int32, res *resources. }) } -func newNodeWithResources(nodeID string, max, occupied *resources.Resource) *objects.Node { +func newNodeMaxResource(nodeID string, max *resources.Resource) *objects.Node { proto := &si.NodeInfo{ NodeID: nodeID, Attributes: map[string]string{}, SchedulableResource: max.ToProto(), - OccupiedResource: occupied.ToProto(), } return objects.NewNode(proto) } -func newNodeMaxResource(nodeID string, max *resources.Resource) *objects.Node { - return newNodeWithResources(nodeID, max, nil) -} - // partition with an expected basic queue hierarchy // root -> parent -> leaf1 //