From f7d0e102a8423baa43ef0744d803e45b64cca82c Mon Sep 17 00:00:00 2001 From: haorenhui Date: Tue, 3 Dec 2024 11:02:54 +0100 Subject: [PATCH] [YUNIKORN-2982] Send event when preemption occurs (#1000) Closes: #1000 Signed-off-by: Peter Bacsko --- pkg/scheduler/objects/allocation.go | 5 ++ pkg/scheduler/objects/events/ask_events.go | 9 ++++ .../objects/events/ask_events_test.go | 20 +++++++ pkg/scheduler/objects/preemption.go | 1 + pkg/scheduler/objects/preemption_test.go | 54 +++++++++++++++++++ 5 files changed, 89 insertions(+) diff --git a/pkg/scheduler/objects/allocation.go b/pkg/scheduler/objects/allocation.go index 17fd531c2..a4701cd68 100644 --- a/pkg/scheduler/objects/allocation.go +++ b/pkg/scheduler/objects/allocation.go @@ -480,6 +480,11 @@ func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) { a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, a.applicationID, node, a.GetAllocatedResource()) } +// SendPreemptedBySchedulerEvent updates the event system with the preemption event. +func (a *Allocation) SendPreemptedBySchedulerEvent(preemptorAllocKey, preemptorAppId, preemptorQueuePath string) { + a.askEvents.SendPreemptedByScheduler(a.allocationKey, a.applicationID, preemptorAllocKey, preemptorAppId, preemptorQueuePath, a.GetAllocatedResource()) +} + // GetAllocationLog returns a list of log entries corresponding to allocation preconditions not being met. func (a *Allocation) GetAllocationLog() []*AllocationLogEntry { a.RLock() diff --git a/pkg/scheduler/objects/events/ask_events.go b/pkg/scheduler/objects/events/ask_events.go index 0e10a2530..db6573398 100644 --- a/pkg/scheduler/objects/events/ask_events.go +++ b/pkg/scheduler/objects/events/ask_events.go @@ -105,6 +105,15 @@ func (ae *AskEvents) SendRequiredNodePreemptionFailed(allocKey, appID, node stri ae.eventSystem.AddEvent(event) } +func (ae *AskEvents) SendPreemptedByScheduler(allocKey, appID, preemptorAllocKey, preemptorAppId, preemptorQueuePath string, allocatedResource *resources.Resource) { + if !ae.eventSystem.IsEventTrackingEnabled() { + return + } + message := fmt.Sprintf("Preempted by %s from application %s in %s", preemptorAllocKey, preemptorAppId, preemptorQueuePath) + event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource) + ae.eventSystem.AddEvent(event) +} + func NewAskEvents(evt events.EventSystem) *AskEvents { return newAskEventsWithRate(evt, 15*time.Second, 1) } diff --git a/pkg/scheduler/objects/events/ask_events_test.go b/pkg/scheduler/objects/events/ask_events_test.go index 8f6f8558e..d66806004 100644 --- a/pkg/scheduler/objects/events/ask_events_test.go +++ b/pkg/scheduler/objects/events/ask_events_test.go @@ -178,3 +178,23 @@ func TestRequiredNodePreemptionFailedEvents(t *testing.T) { event = eventSystem.Events[0] assert.Equal(t, "Unschedulable request 'alloc-1' with required node 'node-1', no preemption victim found", event.Message) } + +func TestPreemptedBySchedulerEvents(t *testing.T) { + resource := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) + eventSystem := mock.NewEventSystemDisabled() + events := NewAskEvents(eventSystem) + events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0", "preemptor-app-0", "root.parent.child1", resource) + assert.Equal(t, 0, len(eventSystem.Events)) + + eventSystem = mock.NewEventSystem() + events = NewAskEvents(eventSystem) + events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0", "preemptor-app-0", "root.parent.child1", resource) + assert.Equal(t, 1, len(eventSystem.Events)) + event := eventSystem.Events[0] + assert.Equal(t, "alloc-0", event.ObjectID) + assert.Equal(t, appID, event.ReferenceID) + assert.Equal(t, si.EventRecord_REQUEST, event.Type) + assert.Equal(t, si.EventRecord_NONE, event.EventChangeType) + assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) + assert.Equal(t, "Preempted by preemptor-0 from application preemptor-app-0 in root.parent.child1", event.Message) +} diff --git a/pkg/scheduler/objects/preemption.go b/pkg/scheduler/objects/preemption.go index 441b12dd6..10cacea2b 100644 --- a/pkg/scheduler/objects/preemption.go +++ b/pkg/scheduler/objects/preemption.go @@ -635,6 +635,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) { zap.String("victimNodeID", victim.GetNodeID()), zap.String("victimQueue", victimQueue.Name), ) + victim.SendPreemptedBySchedulerEvent(p.ask.allocationKey, p.ask.applicationID, p.application.queuePath) } else { log.Log(log.SchedPreemption).Warn("BUG: Queue not found for preemption victim", zap.String("queue", p.queue.Name), diff --git a/pkg/scheduler/objects/preemption_test.go b/pkg/scheduler/objects/preemption_test.go index b2fce675f..e845da929 100644 --- a/pkg/scheduler/objects/preemption_test.go +++ b/pkg/scheduler/objects/preemption_test.go @@ -28,8 +28,10 @@ import ( "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/resources" + evtMock "github.com/apache/yunikorn-core/pkg/events/mock" "github.com/apache/yunikorn-core/pkg/mock" "github.com/apache/yunikorn-core/pkg/plugins" + schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -293,6 +295,58 @@ func TestTryPreemption(t *testing.T) { assert.Equal(t, len(ask3.GetAllocationLog()), 0) } +func TestTryPreemption_SendEvent(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 10, "pods": 5}) + iterator := getNodeIteratorFn(node) + rootQ, err := createRootQueue(map[string]string{"first": "20", "pods": "5"}) + assert.NilError(t, err) + parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"}) + assert.NilError(t, err) + childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, map[string]string{"first": "10"}, map[string]string{"first": "5"}) + assert.NilError(t, err) + childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, map[string]string{"first": "10"}, map[string]string{"first": "5"}) + assert.NilError(t, err) + + alloc1, alloc2, err := creatApp1(childQ1, node, nil, map[string]resources.Quantity{"first": 5, "pods": 1}) + assert.NilError(t, err) + + eventSystem := evtMock.NewEventSystem() + events := schedEvt.NewAskEvents(eventSystem) + alloc1.askEvents = events + + app2, ask3, err := creatApp2(childQ2, map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3") + assert.NilError(t, err) + childQ2.incPendingResource(ask3.GetAllocatedResource()) + + headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods": 3}) + preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, iterator(), false) + + // register predicate handler + preemptions := []mock.Preemption{ + mock.NewPreemption(true, "alloc3", nodeID1, []string{"alloc1"}, 0, 0), + } + plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions) + plugins.RegisterSchedulerPlugin(plugin) + defer plugins.UnregisterSchedulerPlugins() + + result, ok := preemptor.TryPreemption() + assert.Assert(t, result != nil, "no result") + assert.NilError(t, plugin.GetPredicateError()) + assert.Assert(t, ok, "no victims found") + assert.Equal(t, "alloc3", result.Request.GetAllocationKey(), "wrong alloc") + assert.Check(t, alloc1.IsPreempted(), "alloc1 not preempted") + assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted") + assert.Equal(t, 1, len(eventSystem.Events)) + event := eventSystem.Events[0] + assert.Equal(t, alloc1.applicationID, event.ReferenceID) + assert.Equal(t, alloc1.allocationKey, event.ObjectID) + assert.Equal(t, si.EventRecord_NONE, event.EventChangeType) + assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) + assert.Equal(t, si.EventRecord_REQUEST, event.Type) + assert.Equal(t, fmt.Sprintf("Preempted by %s from application %s in %s", "alloc3", appID2, "root.parent.child2"), event.Message) + assert.Equal(t, len(ask3.GetAllocationLog()), 0) +} + // TestTryPreemptionOnNode Test try preemption on node with simple queue hierarchy. Since Node doesn't have enough resources to accomodate, preemption happens because of node resource constraint. // Guaranteed and Max resource set on both victim queue path and preemptor queue path in 2 levels. victim and preemptor queue are siblings. // Request (Preemptor) resource type matches with all resource types of the victim. But Guaranteed set only on specific resource type. 2 Victims are available, but 1 should be preempted because further preemption would make usage go below the guaranteed quota