Skip to content

Commit

Permalink
tmp: make updatePod return func to execute after unlock
Browse files Browse the repository at this point in the history
  • Loading branch information
sharnoff committed Dec 31, 2024
1 parent aaa77f2 commit 71fc62f
Showing 1 changed file with 76 additions and 56 deletions.
132 changes: 76 additions & 56 deletions pkg/plugin/handle_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,31 @@ func (s *PluginState) HandlePodEvent(

switch kind {
case reconcile.EventKindAdded, reconcile.EventKindModified:
var needsMoreResources bool
var retryAfter time.Duration
updateResult, err := s.updatePod(logger, pod, expectExists)

var reconcileResult *reconcile.Result
if updateResult != nil && updateResult.retryAfter != nil {
reconcileResult = &reconcile.Result{RetryAfter: *updateResult.retryAfter}
}

err := s.updatePod(logger, pod, expectExists, &needsMoreResources, &retryAfter)
if err != nil {
return nil, err
return reconcileResult, err
}

if needsMoreResources {
// mark this as failing; don't try again sooner than 5 seconds later.
return &reconcile.Result{RetryAfter: 5 * time.Second}, errors.New("not enough resources to grant request for pod")
var retryAfter time.Duration
if updateResult != nil {
if updateResult.afterUnlock != nil {
if err := updateResult.afterUnlock(); err != nil {
return reconcileResult, err
}
}

if updateResult.needsMoreResources {
// mark this as failing; don't try again sooner than 5 seconds later.
return &reconcile.Result{RetryAfter: 5 * time.Second}, errors.New("not enough resources to grant request for pod")
}

retryAfter = lo.FromPtr(updateResult.retryAfter)
}
return &reconcile.Result{RetryAfter: retryAfter}, nil

Expand All @@ -58,33 +72,30 @@ func (s *PluginState) HandlePodEvent(
}
}

type podUpdateResult struct {
needsMoreResources bool
afterUnlock func() error
retryAfter *time.Duration
}

func (s *PluginState) updatePod(
logger *zap.Logger,
pod *corev1.Pod,
expectExists bool,
needsMoreResources *bool,
retryAfter *time.Duration,
) error {
) (*podUpdateResult, error) {
newPod, err := state.PodStateFromK8sObj(pod)
if err != nil {
return fmt.Errorf("could not get state from Pod object: %w", err)
return nil, fmt.Errorf("could not get state from Pod object: %w", err)
}

var ns *nodeState // pre-declare this so we can update metrics in a defer

s.mu.Lock()
var unlocked bool
unlock := func() {
defer s.mu.Unlock()

var ns *nodeState // pre-declare this so we can update metrics in a defer
defer func() {
if ns != nil {
s.updateNodeMetricsAndRequeue(logger, ns)
}
s.mu.Unlock()
unlocked = true
}
defer func() {
if !unlocked {
unlock()
}
}()

tentativeNode, scheduled := s.tentativelyScheduled[pod.UID]
Expand All @@ -106,7 +117,7 @@ func (s *PluginState) updatePod(
if !scheduled && pod.Spec.NodeName == "" {
// still hasn't been scheduled, nothing to do yet.
logger.Info("Skipping event for Pod that has not yet been scheduled")
return nil
return nil, nil
}

nodeName := pod.Spec.NodeName
Expand All @@ -119,7 +130,7 @@ func (s *PluginState) updatePod(
var ok bool
ns, ok = s.nodes[nodeName]
if !ok {
return fmt.Errorf("pod's node %q is not present in local state", nodeName)
return nil, fmt.Errorf("pod's node %q is not present in local state", nodeName)
}

// make the changes in Speculatively() so that we can log both states before committing, and
Expand Down Expand Up @@ -162,7 +173,7 @@ func (s *PluginState) updatePod(
//
// All that's left is to handle VMs that are the responsibility of *this* scheduler.
if lo.IsEmpty(newPod.VirtualMachine) || pod.Spec.SchedulerName != s.config.SchedulerName {
return nil
return nil, nil
}

if _, ok := ns.requestedMigrations[newPod.UID]; ok {
Expand All @@ -175,27 +186,27 @@ func (s *PluginState) updatePod(
} else {
// Otherwise: the pod is not migrating, but *is* migratable. Let's trigger migration.
logger.Info("Creating migration for Pod")
// we need to release the lock to trigger the migration, otherwise we may slow down
// processing due to API delays.
unlock()
if err := s.createMigrationForPod(logger, newPod); err != nil {
return fmt.Errorf("could not create migration for Pod: %w", err)
}
// All done for now; retry in 5s if the pod is not migrating yet.
*retryAfter = 5 * time.Second
return nil
return &podUpdateResult{
needsMoreResources: false,
// we need to release the lock to trigger the migration, otherwise we may slow down
// processing due to API delays.
afterUnlock: func() error {
if err := s.createMigrationForPod(logger, newPod); err != nil {
return fmt.Errorf("could not create migration for Pod: %w", err)
}
return nil
},
// All done for now; retry in 5s if the pod is not migrating yet.
retryAfter: lo.ToPtr(5 * time.Second),
}, nil
}
}

if !newPod.Migrating {
var err error
*needsMoreResources, err = s.reconcilePodResources(logger, ns, pod, newPod, retryAfter, unlock)
if err != nil {
return err
}
return s.reconcilePodResources(logger, ns, pod, newPod), nil
}

return nil
return nil, nil
}

func (s *PluginState) createMigrationForPod(logger *zap.Logger, pod state.Pod) error {
Expand Down Expand Up @@ -223,16 +234,16 @@ func (s *PluginState) reconcilePodResources(
ns *nodeState,
oldPodObj *corev1.Pod,
oldPod state.Pod,
retryAfter *time.Duration,
unlock func(),
) (needsMoreResources bool, _ error) {
) *podUpdateResult {
// Quick check: Does this pod have autoscaling enabled? if no, then we shouldn't set our
// annotations on it -- particularly because we may end up with stale approved resources when
// the VM scales, and that can cause issues if autoscaling is enabled later.
if !api.HasAutoscalingEnabled(oldPodObj) {
return false, nil
return nil
}

var needsMoreResources bool

desiredPod := oldPod
ns.node.Speculatively(func(n *state.Node) (commit bool) {
// Do a pass of reconciling this pod, in case there's resources it's requested that we can
Expand Down Expand Up @@ -271,8 +282,8 @@ func (s *PluginState) reconcilePodResources(

if !s.startupDone {
s.requeueAfterStartup[oldPod.UID] = struct{}{}
needsMoreResources = false // don't report anything, we're waiting for startup to finish!
return needsMoreResources, nil
// don't report anything, even if needsMoreResources. We're waiting for startup to finish!
return nil
}
if oldPod == desiredPod && hasApprovedAnnotation {
// no changes, nothing to do. Although, if we *do* need more resources, log something about
Expand All @@ -284,7 +295,11 @@ func (s *PluginState) reconcilePodResources(
zap.Object("Node", ns.node),
)
}
return needsMoreResources, nil
return &podUpdateResult{
needsMoreResources: needsMoreResources,
afterUnlock: nil,
retryAfter: nil,
}
}

// Startup done. Either we have changes or the pod is missing the approved resources annotation.
Expand All @@ -302,12 +317,16 @@ func (s *PluginState) reconcilePodResources(
}

if now.Before(canRetryAt) {
*retryAfter = canRetryAt.Sub(now)
retryAfter := canRetryAt.Sub(now)
logger.Warn(
"Want to patch VirtualMachine for reserved resources, but too soon to re-patch. Waiting.",
zap.Duration("retryAfter", *retryAfter),
zap.Duration("retryAfter", retryAfter),
)
return needsMoreResources, errors.New("too soon to re-patch VirtualMachine, waiting")
return &podUpdateResult{
needsMoreResources: needsMoreResources,
afterUnlock: nil,
retryAfter: &retryAfter,
}
}

newPod := desiredPod
Expand Down Expand Up @@ -345,13 +364,14 @@ func (s *PluginState) reconcilePodResources(
}

ns.podsVMPatchedAt[oldPod.UID] = now
unlock()
err := s.patchReservedResourcesForPod(logger, oldPodObj, desiredPod)
if err != nil {
return false, err
}

return needsMoreResources, nil
return &podUpdateResult{
needsMoreResources: needsMoreResources,
afterUnlock: func() error {
return s.patchReservedResourcesForPod(logger, oldPodObj, desiredPod)
},
retryAfter: nil,
}
}

func (s *PluginState) patchReservedResourcesForPod(
Expand Down

0 comments on commit 71fc62f

Please sign in to comment.