Skip to content

Commit

Permalink
[YUNIKORN-2974] Expose preemption and priority settings via REST (#995)
Browse files Browse the repository at this point in the history
Add preemption and priority details to the queue REST information.
Currently the configuration is exposed via the properties. The
properties contain the raw configuration values which need to be
interpreted.

Invalid configuration values are ignored and not set on the queue. This
could cause an incorrect view of what is or should be active on the
queue if based on the properties. Using the active values from the
object to show the real state.

Expose Partition level preemption flag as part of this change to
complement the queue details.

Closes: #995

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
wilfred-s authored and craigcondit committed Nov 22, 2024
1 parent 7c99e6b commit 0356a3a
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 82 deletions.
10 changes: 8 additions & 2 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,14 @@ func (sq *Queue) GetPartitionQueueDAOInfo(include bool) dao.PartitionQueueDAOInf
queueInfo.IsManaged = sq.isManaged
queueInfo.CurrentPriority = sq.getCurrentPriority()
queueInfo.TemplateInfo = sq.template.GetTemplateInfo()
queueInfo.AbsUsedCapacity = resources.CalculateAbsUsedCapacity(
sq.maxResource, sq.allocatedResource).DAOMap()
queueInfo.AbsUsedCapacity = resources.CalculateAbsUsedCapacity(sq.maxResource, sq.allocatedResource).DAOMap()
queueInfo.SortingPolicy = sq.sortType.String()
queueInfo.PrioritySorting = sq.prioritySortEnabled
queueInfo.PreemptionEnabled = sq.preemptionPolicy != policies.DisabledPreemptionPolicy
queueInfo.IsPreemptionFence = sq.preemptionPolicy == policies.FencePreemptionPolicy
queueInfo.PreemptionDelay = sq.preemptionDelay.String()
queueInfo.IsPriorityFence = sq.priorityPolicy == policies.FencePriorityPolicy
queueInfo.PriorityOffset = sq.priorityOffset
queueInfo.Properties = make(map[string]string)
for k, v := range sq.properties {
queueInfo.Properties[k] = v
Expand Down
72 changes: 53 additions & 19 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,37 +1657,71 @@ func TestGetPartitionQueueDAOInfo(t *testing.T) {
},
})
assert.NilError(t, err)
assert.Equal(t, root.template.GetMaxApplications(), root.GetPartitionQueueDAOInfo(true).TemplateInfo.MaxApplications)
assert.DeepEqual(t, root.template.GetProperties(), root.GetPartitionQueueDAOInfo(true).TemplateInfo.Properties)
assert.DeepEqual(t, root.template.GetMaxResource().DAOMap(), root.GetPartitionQueueDAOInfo(true).TemplateInfo.MaxResource)
assert.DeepEqual(t, root.template.GetGuaranteedResource().DAOMap(), root.GetPartitionQueueDAOInfo(true).TemplateInfo.GuaranteedResource)
rootDAO := root.GetPartitionQueueDAOInfo(true)
assert.Equal(t, root.template.GetMaxApplications(), rootDAO.TemplateInfo.MaxApplications)
assert.DeepEqual(t, root.template.GetProperties(), rootDAO.TemplateInfo.Properties)
assert.DeepEqual(t, root.template.GetMaxResource().DAOMap(), rootDAO.TemplateInfo.MaxResource)
assert.DeepEqual(t, root.template.GetGuaranteedResource().DAOMap(), rootDAO.TemplateInfo.GuaranteedResource)

// test resources
root.maxResource = getResource(t)
root.guaranteedResource = getResource(t)
assert.DeepEqual(t, root.GetMaxResource().DAOMap(), root.GetPartitionQueueDAOInfo(true).MaxResource)
assert.DeepEqual(t, root.GetGuaranteedResource().DAOMap(), root.GetPartitionQueueDAOInfo(true).GuaranteedResource)
assert.DeepEqual(t, root.getHeadRoom().DAOMap(), root.GetPartitionQueueDAOInfo(true).HeadRoom)
rootDAO = root.GetPartitionQueueDAOInfo(true)
assert.DeepEqual(t, root.GetMaxResource().DAOMap(), rootDAO.MaxResource)
assert.DeepEqual(t, root.GetGuaranteedResource().DAOMap(), rootDAO.GuaranteedResource)
assert.DeepEqual(t, root.getHeadRoom().DAOMap(), rootDAO.HeadRoom)

// test allocatingAcceptedApps
root.allocatingAcceptedApps = getAllocatingAcceptedApps()
rootDAO = root.GetPartitionQueueDAOInfo(true)
assert.Equal(t, len(root.allocatingAcceptedApps), 2, "allocatingAcceptedApps size")
assert.Equal(t, len(root.GetPartitionQueueDAOInfo(true).AllocatingAcceptedApps), 1, "AllocatingAcceptedApps size")
assert.Equal(t, root.GetPartitionQueueDAOInfo(true).AllocatingAcceptedApps[0], appID1)
assert.Equal(t, len(rootDAO.AllocatingAcceptedApps), 1, "AllocatingAcceptedApps size")
assert.Equal(t, rootDAO.AllocatingAcceptedApps[0], appID1)

// Test specific queue
_, err = createManagedQueue(root, "leaf-queue", false, nil)
var leaf *Queue
leaf, err = createManagedQueue(root, "leaf-queue", false, nil)
assert.NilError(t, err, "failed to create managed queue")
assert.Equal(t, root.GetPartitionQueueDAOInfo(false).QueueName, "root")
assert.Equal(t, len(root.GetPartitionQueueDAOInfo(false).Children), 0)
assert.Equal(t, len(root.GetPartitionQueueDAOInfo(false).ChildNames), 1)
assert.Equal(t, root.GetPartitionQueueDAOInfo(false).ChildNames[0], "root.leaf-queue")
rootDAO = root.GetPartitionQueueDAOInfo(false)
assert.Equal(t, rootDAO.QueueName, "root")
assert.Equal(t, len(rootDAO.Children), 0)
assert.Equal(t, len(rootDAO.ChildNames), 1)
assert.Equal(t, rootDAO.ChildNames[0], "root.leaf-queue")
// Test hierarchy queue
assert.Equal(t, root.GetPartitionQueueDAOInfo(true).QueueName, "root")
assert.Equal(t, len(root.GetPartitionQueueDAOInfo(true).Children), 1)
assert.Equal(t, len(root.GetPartitionQueueDAOInfo(true).ChildNames), 1)
assert.Equal(t, root.GetPartitionQueueDAOInfo(true).Children[0].QueueName, "root.leaf-queue")
assert.Equal(t, root.GetPartitionQueueDAOInfo(true).ChildNames[0], "root.leaf-queue")
rootDAO = root.GetPartitionQueueDAOInfo(true)
assert.Equal(t, rootDAO.QueueName, "root")
assert.Equal(t, len(rootDAO.Children), 1)
assert.Equal(t, len(rootDAO.ChildNames), 1)
assert.Equal(t, rootDAO.Children[0].QueueName, "root.leaf-queue")
assert.Equal(t, rootDAO.ChildNames[0], "root.leaf-queue")
// special prop checks
leaf.properties = map[string]string{
configs.ApplicationSortPolicy: policies.FairSortPolicy.String(),
configs.PreemptionDelay: "3600s",
configs.PreemptionPolicy: policies.FencePreemptionPolicy.String(),
}
leaf.UpdateQueueProperties()
leafDAO := leaf.GetPartitionQueueDAOInfo(false)
assert.Equal(t, leafDAO.QueueName, "root.leaf-queue")
assert.Equal(t, len(leafDAO.Children), 0, "leaf has no children")
assert.Equal(t, len(leafDAO.ChildNames), 0, "leaf has no children (names)")
assert.Equal(t, leafDAO.PreemptionEnabled, true, "preemption should be enabled")
assert.Equal(t, leafDAO.IsPreemptionFence, true, "fence should have been set")
assert.Equal(t, leafDAO.PreemptionDelay, "1h0m0s", "incorrect delay returned")
assert.Equal(t, leafDAO.SortingPolicy, "fair", "incorrect policy returned")

// special prop checks
leaf.properties = map[string]string{
configs.ApplicationSortPolicy: policies.FifoSortPolicy.String(),
configs.PreemptionDelay: "10s",
configs.PreemptionPolicy: policies.DisabledPreemptionPolicy.String(),
}
leaf.UpdateQueueProperties()
leafDAO = leaf.GetPartitionQueueDAOInfo(false)
assert.Equal(t, leafDAO.PreemptionEnabled, false, "preemption should not be enabled")
assert.Equal(t, leafDAO.IsPreemptionFence, false, "queue should not be a fence")
assert.Equal(t, leafDAO.PreemptionDelay, "10s", "incorrect delay returned")
assert.Equal(t, leafDAO.SortingPolicy, "fifo", "incorrect policy returned")
}

func getAllocatingAcceptedApps() map[string]bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (pc *PartitionContext) tryAllocate() *objects.AllocationResult {
return nil
}
// try allocating from the root down
result := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode, pc.isPreemptionEnabled())
result := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled())
if result != nil {
return pc.allocate(result)
}
Expand Down Expand Up @@ -1609,7 +1609,7 @@ func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64 {
return policy.ResourceWeights()
}

func (pc *PartitionContext) isPreemptionEnabled() bool {
func (pc *PartitionContext) IsPreemptionEnabled() bool {
pc.RLock()
defer pc.RUnlock()
return pc.preemptionEnabled
Expand Down
16 changes: 7 additions & 9 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3778,29 +3778,27 @@ func TestUpdatePreemption(t *testing.T) {

partition, err := newBasePartition()
assert.NilError(t, err, "Partition creation failed")
assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by default")
assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should be enabled by default")

partition.updatePreemption(configs.PartitionConfig{})
assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by empty config")
assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should be enabled by empty config")

partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{}})
assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by empty preemption section")
assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should be enabled by empty preemption section")

partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{Enabled: nil}})
assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by explicit nil")
assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should be enabled by explicit nil")

partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{Enabled: &True}})
assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by explicit true")
assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should be enabled by explicit true")

partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{Enabled: &False}})
assert.Assert(t, !partition.isPreemptionEnabled(), "preeemption should be disabled by explicit false")
assert.Assert(t, !partition.IsPreemptionEnabled(), "preeemption should be disabled by explicit false")
}

func TestUpdateNodeSortingPolicy(t *testing.T) {
partition, err := newBasePartition()
if err != nil {
t.Errorf("Partition creation failed: %s", err.Error())
}
assert.NilError(t, err, "Partition creation failed unexpectedly")

if partition.nodes.GetNodeSortingPolicy().PolicyType().String() != policies.FairnessPolicy.String() {
t.Error("Node policy is not set with the default policy which is fair policy.")
Expand Down
1 change: 1 addition & 0 deletions pkg/webservice/dao/partition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type PartitionInfo struct {
Name string `json:"name"` // no omitempty, name should not be empty
Capacity PartitionCapacity `json:"capacity"` // no omitempty, omitempty doesn't work on a structure value
NodeSortingPolicy NodeSortingPolicy `json:"nodeSortingPolicy"` // no omitempty, omitempty doesn't work on a structure value
PreemptionEnabled bool `json:"preemptionEnabled"` // no omitempty, false shows preemption status better
TotalNodes int `json:"totalNodes,omitempty"`
Applications map[string]int `json:"applications,omitempty"`
TotalContainers int `json:"totalContainers,omitempty"`
Expand Down
10 changes: 9 additions & 1 deletion pkg/webservice/dao/queue_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dao

type TemplateInfo struct {
Expand All @@ -27,7 +28,7 @@ type TemplateInfo struct {
type PartitionQueueDAOInfo struct {
QueueName string `json:"queuename"` // no omitempty, queue name should not be empty
Status string `json:"status,omitempty"`
Partition string `json:"partition"` // no omitempty, queue name should not be empty
Partition string `json:"partition"` // no omitempty, partition name should not be empty
PendingResource map[string]int64 `json:"pendingResource,omitempty"`
MaxResource map[string]int64 `json:"maxResource,omitempty"`
GuaranteedResource map[string]int64 `json:"guaranteedResource,omitempty"`
Expand All @@ -46,4 +47,11 @@ type PartitionQueueDAOInfo struct {
RunningApps uint64 `json:"runningApps,omitempty"`
CurrentPriority int32 `json:"currentPriority"` // no omitempty, as the current priority value may be 0, which is a valid priority level
AllocatingAcceptedApps []string `json:"allocatingAcceptedApps,omitempty"`
SortingPolicy string `json:"sortingPolicy,omitempty"`
PrioritySorting bool `json:"prioritySorting"` // no omitempty, false shows priority sorting status better
PreemptionEnabled bool `json:"preemptionEnabled"` // no omitempty, false shows preemption status better
IsPreemptionFence bool `json:"isPreemptionFence"` // no omitempty, a false value gives a quick way to understand whether it's fenced.
PreemptionDelay string `json:"preemptionDelay,omitempty"`
IsPriorityFence bool `json:"isPriorityFence"` // no omitempty, a false value gives a quick way to understand whether it's fenced.
PriorityOffset int32 `json:"priorityOffset,omitempty"`
}
1 change: 1 addition & 0 deletions pkg/webservice/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ func getPartitionInfoDAO(lists map[string]*scheduler.PartitionContext) []*dao.Pa
partitionInfo.Name = common.GetPartitionNameWithoutClusterID(partitionContext.Name)
partitionInfo.State = partitionContext.GetCurrentState()
partitionInfo.LastStateTransitionTime = partitionContext.GetStateTime().UnixNano()
partitionInfo.PreemptionEnabled = partitionContext.IsPreemptionEnabled()

capacityInfo := dao.PartitionCapacity{}
capacity := partitionContext.GetTotalPartitionResource()
Expand Down
91 changes: 42 additions & 49 deletions pkg/webservice/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,71 +116,62 @@ partitions:

const configMultiPartitions = `
partitions:
-
name: gpu
- name: gpu
preemption:
enabled: false
queues:
-
name: root
-
name: default
- name: root
- name: default
nodesortpolicy:
type: fair
queues:
-
name: root
queues:
-
name: default
submitacl: "*"
- name: root
queues:
- name: default
submitacl: "*"
`

const configTwoLevelQueues = `
partitions:
-
name: gpu
- name: gpu
queues:
-
name: root
-
name: default
- name: root
- name: default
nodesortpolicy:
type: binpacking
queues:
-
name: root
- name: root
properties:
application.sort.policy: fifo
childtemplate:
maxapplications: 10
properties:
application.sort.policy: fifo
childtemplate:
maxapplications: 10
resources:
guaranteed:
memory: 400000
max:
memory: 600000
queues:
- name: a
queues:
- name: a1
properties:
application.sort.policy: fifo
resources:
guaranteed:
memory: 400000
max:
memory: 600000
queues:
-
name: a
queues:
-
name: a1
properties:
application.sort.policy: fifo
resources:
guaranteed:
memory: 500000
vcore: 50000
max:
memory: 800000
vcore: 80000
resources:
guaranteed:
memory: 500000
vcore: 50000
max:
memory: 800000
vcore: 80000
resources:
guaranteed:
memory: 500000
vcore: 50000
max:
memory: 800000
vcore: 80000
resources:
guaranteed:
memory: 500000
vcore: 50000
max:
memory: 800000
vcore: 80000
`

const userGroupLimitsConfig = `
Expand Down Expand Up @@ -1092,6 +1083,7 @@ func TestPartitions(t *testing.T) { //nolint:funlen
assert.DeepEqual(t, cs["default"].Capacity.UsedCapacity, map[string]int64{"memory": 300, "vcore": 700})
assert.DeepEqual(t, cs["default"].Capacity.Utilization, map[string]int64{"memory": 30, "vcore": 70})
assert.Equal(t, cs["default"].State, "Active")
assert.Assert(t, cs["default"].PreemptionEnabled, "preemption should be enabled on default")

assert.Assert(t, cs["gpu"] != nil)
assert.Equal(t, cs["gpu"].ClusterID, "rm-123")
Expand All @@ -1100,6 +1092,7 @@ func TestPartitions(t *testing.T) { //nolint:funlen
assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["vcore"], 1.0)
assert.Equal(t, cs["default"].NodeSortingPolicy.ResourceWeights["memory"], 1.0)
assert.Equal(t, cs["gpu"].Applications["total"], 0)
assert.Assert(t, !cs["gpu"].PreemptionEnabled, "preemption should be disabled on gpu")
}

func TestMetricsNotEmpty(t *testing.T) {
Expand Down

0 comments on commit 0356a3a

Please sign in to comment.