Skip to content

Commit

Permalink
plugin: Rewrite to get state from Pod annotations
Browse files Browse the repository at this point in the history
a.k.a. "Stateless Scheduler".

This is effectively a full rewrite of the scheduler plugin. At a high
level, the existing external interfaces are preserved:

- The scheduler plugin still exposes an HTTP server for the
  autoscaler-agent (for now); and
- The scheduler plugin is still a plugin.

However, instead of storing the state for approved resources in-memory,
in the scheduler plugin, we now treat annotations on the Pod as the
source of truth for requested/approved resources.

A brief overview of the internal changes to make this happen:

1. The state of resource reservations can be constructed entirely from
   Node and Pod objects. We *do* store that, and update as objects
   change, but it's only for convenience and not a strict requirement.

   One tricky piece is with scheduling. For that, we store a set of pods
   that have gone through the plugin methods but haven't actually had
   the spec.nodeName field set.

   For more info, the 'pkg/plugin/state' package contains all the pure
   logic for manipulating resources.

2. Each watch event on Node/Pod objects is now placed into a "reconcile"
   queue similar to the controller framework. Reconcile operations are a
   tuple of (object, event type, desired reconcile time) and are retried
   with backoff on error/panic.

   For a detailed look, the 'pkg/plugin/reconcile' package defines the
   reconcile queue and all its related machinery.

3. The handler for autoscaler-agent requests no longer accesses the
   internal state and instead directly patches the VirtualMachine object
   to set the annotation for requested resources, and then waits for
   that object to be updated.

   Once the autoscaler-agent is converted to read and write those
   annotations directly, we will remove the HTTP server.

4. 'pkg/util/watch' was changed to allow asking to be notified when
   there's changes to an object, via the new (*Store[T]).Listen() API.

   This was required to implement (3), and can be removed once (3) is no
   longer needed, if it doesn't become used in the autoscaler-agent.

5. 'pkg/util/watch' was changed to allow triggering no-op update events,
   which - for our usage - will trigger requeuing the object. This
   solves two problems:

   a. During initial startup, we need to defer resource approval until
      all Pods on the Node have been processed -- otherwise, we may end
      up unintentionally overcommitting resources based on partial
      information.

      So during startup, we track the set of Pods with deferred
      approvals, and then requeue them all once startup is over by
      triggering no-op update events in the watch store.

   b. Whenever we handle changes for some Pod, it's worthwhile to handle
      certain operations on the Node -- e.g., triggering live migration
      if the reserved resources are too high.

      While we *could* do this as part of the Pod reconciling, we get
      more fair behavior (and, better balancing under load) by instead
      triggering re-reconciling the Pod's Node.

   Why can't this be done elsewhere? In short, consistency.
   Fundamentally we need to use a consistent view of the object that
   we're reconciling (else, it might not be no-op), and the source of
   truth for the current value of an object *within the scheduler
   plugin* is the watch store.
  • Loading branch information
sharnoff committed Dec 13, 2024
1 parent da9ffb0 commit 6c76e9d
Show file tree
Hide file tree
Showing 39 changed files with 6,027 additions and 4,653 deletions.
3 changes: 2 additions & 1 deletion autoscale-scheduler/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
func main() {
logConfig := zap.NewProductionConfig()
logConfig.Sampling = nil // Disable sampling, which the production config enables by default.
logConfig.DisableStacktrace = true
logger := zap.Must(logConfig.Build()).Named("autoscale-scheduler")

if err := runProgram(logger); err != nil {
Expand Down Expand Up @@ -66,7 +67,7 @@ func runProgram(logger *zap.Logger) (err error) {
redirectKlog(logger.Named("klog"))

constructor := plugin.NewAutoscaleEnforcerPlugin(ctx, logger, conf)
command := app.NewSchedulerCommand(app.WithPlugin(plugin.Name, constructor))
command := app.NewSchedulerCommand(app.WithPlugin(plugin.PluginName, constructor))
// Don't output the full usage whenever any error occurs (otherwise, startup errors get drowned
// out by many pages of scheduler command flags)
command.SilenceUsage = true
Expand Down
22 changes: 10 additions & 12 deletions autoscale-scheduler/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,19 @@ metadata:
data:
autoscale-enforcer-config.json: |
{
"nodeConfig": {
"cpu": { "watermark": 0.9 },
"memory": { "watermark": 0.9 },
"watermark": 0.9,
"scoring": {
"minUsageScore": 0.5,
"maxUsageScore": 0,
"scorePeak": 0.8
"scorePeak": 0.8,
"randomize": false
},
"schedulerName": "autoscale-scheduler",
"eventQueueWorkers": 64,
"reconcileWorkers": 16,
"logSuccessiveFailuresThreshold": 10,
"startupEventHandlingTimeoutSeconds": 15,
"dumpState": {
"port": 10298,
"timeoutSeconds": 5
},
"migrationDeletionRetrySeconds": 5,
"doMigration": true,
"randomizeScores": true
"patchRetryWaitSeconds": 1,
"k8sCRUDTimeoutSeconds": 1,
"nodeMetricLabels": {},
"ignoredNamespaces": []
}
4 changes: 2 additions & 2 deletions autoscale-scheduler/role_binding.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: autoscale-scheduler-virtualmachine-viewer
name: autoscale-scheduler-virtualmachine-editor
namespace: kube-system
subjects:
- kind: ServiceAccount
name: autoscale-scheduler
namespace: kube-system
roleRef:
kind: ClusterRole
name: neonvm-virtualmachine-viewer-role
name: neonvm-virtualmachine-editor-role
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
32 changes: 32 additions & 0 deletions neonvm/apis/neonvm/v1/pod_helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package v1

import (
"encoding/json"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -60,3 +63,32 @@ func MigrationOwnerForPod(pod *corev1.Pod) (metav1.OwnerReference, MigrationRole
var emptyRef metav1.OwnerReference
return emptyRef, "", false
}

// VirtualMachineUsageFromPod returns the resources currently used by the virtual machine, as
// described by the helper usage annotation on the pod.
//
// If the usage annotation is not present, this function returns (nil, nil).
func VirtualMachineUsageFromPod(pod *corev1.Pod) (*VirtualMachineUsage, error) {
return extractFromAnnotation[VirtualMachineUsage](pod, VirtualMachineUsageAnnotation)
}

// VirtualMachineResourcesFromPod returns the information about resources allocated to the virtual
// machine, as encoded by the helper annotation on the pod.
//
// If the annotation is not present, this function returns (nil, nil).
func VirtualMachineResourcesFromPod(pod *corev1.Pod) (*VirtualMachineResources, error) {
return extractFromAnnotation[VirtualMachineResources](pod, VirtualMachineResourcesAnnotation)
}

func extractFromAnnotation[T any](pod *corev1.Pod, annotation string) (*T, error) {
jsonString, ok := pod.Annotations[annotation]
if !ok {
return nil, nil
}

var value T
if err := json.Unmarshal([]byte(jsonString), &value); err != nil {
return nil, fmt.Errorf("could not unmarshal %s annotation: %w", annotation, err)
}
return &value, nil
}
42 changes: 36 additions & 6 deletions pkg/api/vminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ const (
LabelEnableAutoscaling = "autoscaling.neon.tech/enabled"
AnnotationAutoscalingBounds = "autoscaling.neon.tech/bounds"
AnnotationAutoscalingConfig = "autoscaling.neon.tech/config"
AnnotationAutoscalingUnit = "autoscaling.neon.tech/scaling-unit"
AnnotationBillingEndpointID = "autoscaling.neon.tech/billing-endpoint-id"

// For internal use only, between the autoscaler-agent and scheduler plugin:
InternalAnnotationResourcesRequested = "internal.autoscaling.neon.tech/resources-requested"
InternalAnnotationResourcesApproved = "internal.autoscaling.neon.tech/resources-approved"
)

func hasTrueLabel(obj metav1.ObjectMetaAccessor, labelName string) bool {
Expand All @@ -49,6 +54,33 @@ func HasAlwaysMigrateLabel(obj metav1.ObjectMetaAccessor) bool {
return hasTrueLabel(obj, LabelTestingOnlyAlwaysMigrate)
}

func extractAnnotationJSON[T any](obj metav1.ObjectMetaAccessor, annotation string) (*T, error) {
jsonString, ok := obj.GetObjectMeta().GetAnnotations()[annotation]
if !ok {
return nil, nil
}

var value T
if err := json.Unmarshal([]byte(jsonString), &value); err != nil {
return nil, fmt.Errorf("could not unmarshal %s annotation: %w", annotation, err)
}
return &value, nil
}

// ExtractScalingUnit returns the configured scaling unit (aka the "compute unit") for the object,
// based on the AnnotationAutoscalingUnit annotation.
func ExtractScalingUnit(obj metav1.ObjectMetaAccessor) (*Resources, error) {
return extractAnnotationJSON[Resources](obj, AnnotationAutoscalingUnit)
}

func ExtractRequestedScaling(obj metav1.ObjectMetaAccessor) (*Resources, error) {
return extractAnnotationJSON[Resources](obj, InternalAnnotationResourcesRequested)
}

func ExtractApprovedScaling(obj metav1.ObjectMetaAccessor) (*Resources, error) {
return extractAnnotationJSON[Resources](obj, InternalAnnotationResourcesApproved)
}

// VmInfo is the subset of vmv1.VirtualMachineSpec that the scheduler plugin and autoscaler agent
// care about. It takes various labels and annotations into account, so certain fields might be
// different from what's strictly in the VirtualMachine object.
Expand Down Expand Up @@ -161,16 +193,14 @@ func ExtractVmInfo(logger *zap.Logger, vm *vmv1.VirtualMachine) (*VmInfo, error)

func ExtractVmInfoFromPod(logger *zap.Logger, pod *corev1.Pod) (*VmInfo, error) {
logger = logger.With(util.PodNameFields(pod))
resourcesJSON := pod.Annotations[vmv1.VirtualMachineResourcesAnnotation]

var resources vmv1.VirtualMachineResources
if err := json.Unmarshal([]byte(resourcesJSON), &resources); err != nil {
return nil, fmt.Errorf("Error unmarshaling %q: %w",
vmv1.VirtualMachineResourcesAnnotation, err)
resources, err := vmv1.VirtualMachineResourcesFromPod(pod)
if err != nil {
return nil, err
}

vmName := pod.Labels[vmv1.VirtualMachineNameLabel]
return extractVmInfoGeneric(logger, vmName, pod, resources)
return extractVmInfoGeneric(logger, vmName, pod, *resources)
}

func extractVmInfoGeneric(
Expand Down
Loading

0 comments on commit 6c76e9d

Please sign in to comment.