From 71fc62f596e55bb23f4d9e1ec2f6326dcc3354e7 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Tue, 31 Dec 2024 08:19:18 -0800 Subject: [PATCH] tmp: make updatePod return func to execute after unlock --- pkg/plugin/handle_pod.go | 132 ++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 56 deletions(-) diff --git a/pkg/plugin/handle_pod.go b/pkg/plugin/handle_pod.go index c395677fb..d2c89b337 100644 --- a/pkg/plugin/handle_pod.go +++ b/pkg/plugin/handle_pod.go @@ -36,17 +36,31 @@ func (s *PluginState) HandlePodEvent( switch kind { case reconcile.EventKindAdded, reconcile.EventKindModified: - var needsMoreResources bool - var retryAfter time.Duration + updateResult, err := s.updatePod(logger, pod, expectExists) + + var reconcileResult *reconcile.Result + if updateResult != nil && updateResult.retryAfter != nil { + reconcileResult = &reconcile.Result{RetryAfter: *updateResult.retryAfter} + } - err := s.updatePod(logger, pod, expectExists, &needsMoreResources, &retryAfter) if err != nil { - return nil, err + return reconcileResult, err } - if needsMoreResources { - // mark this as failing; don't try again sooner than 5 seconds later. - return &reconcile.Result{RetryAfter: 5 * time.Second}, errors.New("not enough resources to grant request for pod") + var retryAfter time.Duration + if updateResult != nil { + if updateResult.afterUnlock != nil { + if err := updateResult.afterUnlock(); err != nil { + return reconcileResult, err + } + } + + if updateResult.needsMoreResources { + // mark this as failing; don't try again sooner than 5 seconds later. + return &reconcile.Result{RetryAfter: 5 * time.Second}, errors.New("not enough resources to grant request for pod") + } + + retryAfter = lo.FromPtr(updateResult.retryAfter) } return &reconcile.Result{RetryAfter: retryAfter}, nil @@ -58,33 +72,30 @@ func (s *PluginState) HandlePodEvent( } } +type podUpdateResult struct { + needsMoreResources bool + afterUnlock func() error + retryAfter *time.Duration +} + func (s *PluginState) updatePod( logger *zap.Logger, pod *corev1.Pod, expectExists bool, - needsMoreResources *bool, - retryAfter *time.Duration, -) error { +) (*podUpdateResult, error) { newPod, err := state.PodStateFromK8sObj(pod) if err != nil { - return fmt.Errorf("could not get state from Pod object: %w", err) + return nil, fmt.Errorf("could not get state from Pod object: %w", err) } - var ns *nodeState // pre-declare this so we can update metrics in a defer - s.mu.Lock() - var unlocked bool - unlock := func() { + defer s.mu.Unlock() + + var ns *nodeState // pre-declare this so we can update metrics in a defer + defer func() { if ns != nil { s.updateNodeMetricsAndRequeue(logger, ns) } - s.mu.Unlock() - unlocked = true - } - defer func() { - if !unlocked { - unlock() - } }() tentativeNode, scheduled := s.tentativelyScheduled[pod.UID] @@ -106,7 +117,7 @@ func (s *PluginState) updatePod( if !scheduled && pod.Spec.NodeName == "" { // still hasn't been scheduled, nothing to do yet. logger.Info("Skipping event for Pod that has not yet been scheduled") - return nil + return nil, nil } nodeName := pod.Spec.NodeName @@ -119,7 +130,7 @@ func (s *PluginState) updatePod( var ok bool ns, ok = s.nodes[nodeName] if !ok { - return fmt.Errorf("pod's node %q is not present in local state", nodeName) + return nil, fmt.Errorf("pod's node %q is not present in local state", nodeName) } // make the changes in Speculatively() so that we can log both states before committing, and @@ -162,7 +173,7 @@ func (s *PluginState) updatePod( // // All that's left is to handle VMs that are the responsibility of *this* scheduler. if lo.IsEmpty(newPod.VirtualMachine) || pod.Spec.SchedulerName != s.config.SchedulerName { - return nil + return nil, nil } if _, ok := ns.requestedMigrations[newPod.UID]; ok { @@ -175,27 +186,27 @@ func (s *PluginState) updatePod( } else { // Otherwise: the pod is not migrating, but *is* migratable. Let's trigger migration. logger.Info("Creating migration for Pod") - // we need to release the lock to trigger the migration, otherwise we may slow down - // processing due to API delays. - unlock() - if err := s.createMigrationForPod(logger, newPod); err != nil { - return fmt.Errorf("could not create migration for Pod: %w", err) - } - // All done for now; retry in 5s if the pod is not migrating yet. - *retryAfter = 5 * time.Second - return nil + return &podUpdateResult{ + needsMoreResources: false, + // we need to release the lock to trigger the migration, otherwise we may slow down + // processing due to API delays. + afterUnlock: func() error { + if err := s.createMigrationForPod(logger, newPod); err != nil { + return fmt.Errorf("could not create migration for Pod: %w", err) + } + return nil + }, + // All done for now; retry in 5s if the pod is not migrating yet. + retryAfter: lo.ToPtr(5 * time.Second), + }, nil } } if !newPod.Migrating { - var err error - *needsMoreResources, err = s.reconcilePodResources(logger, ns, pod, newPod, retryAfter, unlock) - if err != nil { - return err - } + return s.reconcilePodResources(logger, ns, pod, newPod), nil } - return nil + return nil, nil } func (s *PluginState) createMigrationForPod(logger *zap.Logger, pod state.Pod) error { @@ -223,16 +234,16 @@ func (s *PluginState) reconcilePodResources( ns *nodeState, oldPodObj *corev1.Pod, oldPod state.Pod, - retryAfter *time.Duration, - unlock func(), -) (needsMoreResources bool, _ error) { +) *podUpdateResult { // Quick check: Does this pod have autoscaling enabled? if no, then we shouldn't set our // annotations on it -- particularly because we may end up with stale approved resources when // the VM scales, and that can cause issues if autoscaling is enabled later. if !api.HasAutoscalingEnabled(oldPodObj) { - return false, nil + return nil } + var needsMoreResources bool + desiredPod := oldPod ns.node.Speculatively(func(n *state.Node) (commit bool) { // Do a pass of reconciling this pod, in case there's resources it's requested that we can @@ -271,8 +282,8 @@ func (s *PluginState) reconcilePodResources( if !s.startupDone { s.requeueAfterStartup[oldPod.UID] = struct{}{} - needsMoreResources = false // don't report anything, we're waiting for startup to finish! - return needsMoreResources, nil + // don't report anything, even if needsMoreResources. We're waiting for startup to finish! + return nil } if oldPod == desiredPod && hasApprovedAnnotation { // no changes, nothing to do. Although, if we *do* need more resources, log something about @@ -284,7 +295,11 @@ func (s *PluginState) reconcilePodResources( zap.Object("Node", ns.node), ) } - return needsMoreResources, nil + return &podUpdateResult{ + needsMoreResources: needsMoreResources, + afterUnlock: nil, + retryAfter: nil, + } } // Startup done. Either we have changes or the pod is missing the approved resources annotation. @@ -302,12 +317,16 @@ func (s *PluginState) reconcilePodResources( } if now.Before(canRetryAt) { - *retryAfter = canRetryAt.Sub(now) + retryAfter := canRetryAt.Sub(now) logger.Warn( "Want to patch VirtualMachine for reserved resources, but too soon to re-patch. Waiting.", - zap.Duration("retryAfter", *retryAfter), + zap.Duration("retryAfter", retryAfter), ) - return needsMoreResources, errors.New("too soon to re-patch VirtualMachine, waiting") + return &podUpdateResult{ + needsMoreResources: needsMoreResources, + afterUnlock: nil, + retryAfter: &retryAfter, + } } newPod := desiredPod @@ -345,13 +364,14 @@ func (s *PluginState) reconcilePodResources( } ns.podsVMPatchedAt[oldPod.UID] = now - unlock() - err := s.patchReservedResourcesForPod(logger, oldPodObj, desiredPod) - if err != nil { - return false, err - } - return needsMoreResources, nil + return &podUpdateResult{ + needsMoreResources: needsMoreResources, + afterUnlock: func() error { + return s.patchReservedResourcesForPod(logger, oldPodObj, desiredPod) + }, + retryAfter: nil, + } } func (s *PluginState) patchReservedResourcesForPod(