Skip to content

Commit

Permalink
Merge branch 'kedacore:main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
amit1-100ms authored Sep 25, 2023
2 parents d8a26ce + fc3f31c commit 1e554ab
Show file tree
Hide file tree
Showing 200 changed files with 62,480 additions and 122 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: A new experimental Kafka Scaler implementation using kafka-go library `apache-kafka` ([#4692](https://github.com/kedacore/keda/issues/4692))
- **General**: Introduce new Google Cloud Tasks scaler functionality to scale based on the queue length ([#3613](https://github.com/kedacore/keda/issues/3613))
- **AWS SQS Scaler**: Support for scaling to include delayed messages. ([#4377](https://github.com/kedacore/keda/issues/4377))
- **Governance**: KEDA transitioned to CNCF Graduated project ([#63](https://github.com/kedacore/governance/issues/63))
Expand All @@ -60,16 +61,18 @@ Here is an overview of all new **experimental** features:
- **General**: Add support for formula based evaluation of metric values ([#2440](https://github.com/kedacore/keda/issues/2440))

### Improvements

- **General**: Add apiserver Prometheus metrics to KEDA Metric Server ([#4460](https://github.com/kedacore/keda/issues/4460))
- **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764))
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
- **General**: Adding a changelog validating script to check for formatting and order ([#3190](https://github.com/kedacore/keda/issues/3190))
- **General**: Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledObject to pause autoscaling ([#3304](https://github.com/kedacore/keda/issues/3304))
- **General**: Update golangci-lint version documented in CONTRIBUTING.md since old version doesn't support go 1.20 (N/A)
- **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905))
- **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528))
- **Prometheus Scaler**: Remove trailing whitespaces in customAuthHeader and customAuthValue ([#4960](https://github.com/kedacore/keda/issues/4960))
- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700))
- **Redis Scalers**: Add TLS authentication support for Redis and Redis stream scalers ([#4917](https://github.com/kedacore/keda/issues/4917))

### Fixes
- **General**: Add validations for stabilizationWindowSeconds ([#4976](https://github.com/kedacore/keda/issues/4976))
Expand Down
20 changes: 20 additions & 0 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type ScaledObject struct {

const ScaledObjectOwnerAnnotation = "scaledobject.keda.sh/name"
const ScaledObjectTransferHpaOwnershipAnnotation = "scaledobject.keda.sh/transfer-hpa-ownership"
const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"
const PausedAnnotation = "autoscaling.keda.sh/paused"

// HealthStatus is the status for a ScaledObject's health
type HealthStatus struct {
Expand Down Expand Up @@ -183,6 +185,24 @@ func (so *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier("ScaledObject", so.Namespace, so.Name)
}

// HasPausedAnnotition returns whether this ScaledObject has PausedAnnotation or PausedReplicasAnnotation
func (so *ScaledObject) HasPausedAnnotation() bool {
_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
return pausedAnnotationFound || pausedReplicasAnnotationFound
}

// NeedToBePausedByAnnotation will check whether ScaledObject needs to be paused based on PausedAnnotation or PausedReplicaCount
func (so *ScaledObject) NeedToBePausedByAnnotation() bool {
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
if pausedReplicasAnnotationFound {
return so.Status.PausedReplicaCount != nil
}

_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
return pausedAnnotationFound
}

// IsUsingModifiers determines whether scalingModifiers are defined or not
func (so *ScaledObject) IsUsingModifiers() bool {
return so.Spec.Advanced != nil && !reflect.DeepEqual(so.Spec.Advanced.ScalingModifiers, ScalingModifiers{})
Expand Down
64 changes: 62 additions & 2 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"

"github.com/prometheus/client_golang/prometheus/collectors"
appsv1 "k8s.io/api/apps/v1"
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/client-go/kubernetes/scheme"
kubemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
Expand Down Expand Up @@ -96,10 +102,9 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
cfg.Burst = adapterClientRequestBurst
cfg.DisableCompression = disableCompression

metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort)
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Metrics: server.Options{
BindAddress: metricsBindAddress,
BindAddress: "0", // disabled since we use our own server to serve metrics
},
Scheme: scheme,
Cache: ctrlcache.Options{
Expand Down Expand Up @@ -131,6 +136,58 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
return kedaprovider.NewProvider(ctx, logger, mgr.GetClient(), *grpcClient), stopCh, nil
}

// getMetricHandler returns a http handler that exposes metrics from controller-runtime and apiserver
func getMetricHandler() http.HandlerFunc {
// Register apiserver metrics in legacy registry
// this contains the apiserver_* metrics
apimetrics.Register()

// unregister duplicate collectors that are already handled by controller-runtime's registry
legacyregistry.Registerer().Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
legacyregistry.Registerer().Unregister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll)))

// Return handler that serves metrics from both legacy and controller-runtime registry
return func(w http.ResponseWriter, req *http.Request) {
legacyregistry.Handler().ServeHTTP(w, req)

kubemetrics.HandlerFor(ctrlmetrics.Registry, kubemetrics.HandlerOpts{}).ServeHTTP(w, req)
}
}

// RunMetricsServer runs a http listener and handles the /metrics endpoint
// this is needed to consolidate apiserver and controller-runtime metrics
// we have to use a separate http server & can't rely on the controller-runtime implementation
// because apiserver doesn't provide a way to register metrics to other prometheus registries
func RunMetricsServer(ctx context.Context, stopCh <-chan struct{}) {
h := getMetricHandler()
mux := http.NewServeMux()
mux.Handle("/metrics", h)
metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort)

server := &http.Server{
Addr: metricsBindAddress,
Handler: mux,
}

go func() {
logger.Info("starting /metrics server endpoint")
// nosemgrep: use-tls
err := server.ListenAndServe()
if err != http.ErrServerClosed {
panic(err)
}
}()

go func() {
<-stopCh
logger.Info("Shutting down the /metrics server gracefully...")

if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "http server shutdown error")
}
}()
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
// By default the Metrics Service gRPC Server runs in the same namespace on the keda-operator pod.
func generateDefaultMetricsServiceAddr() string {
Expand Down Expand Up @@ -196,6 +253,9 @@ func main() {
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)

RunMetricsServer(ctx, stopCh)

if err = cmd.Run(stopCh); err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log

// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop.
func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) {
_, pausedAnnotation := scaledJob.GetAnnotations()[kedacontrollerutil.PausedAnnotation]
_, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation]
pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue
if pausedAnnotation {
if !pausedStatus {
Expand Down
44 changes: 39 additions & 5 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package keda
import (
"context"
"fmt"
"strconv"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -123,6 +124,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(
predicate.Or(
kedacontrollerutil.PausedPredicate{},
kedacontrollerutil.PausedReplicasPredicate{},
kedacontrollerutil.ScaleObjectReadyConditionPredicate{},
predicate.GenerationChangedPredicate{},
Expand Down Expand Up @@ -206,12 +208,17 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) {
// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause
// autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object.
_, pausedAnnotationFound := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
pausedAnnotationFound := scaledObject.HasPausedAnnotation()
if pausedAnnotationFound {
scaledToPausedCount := true
if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
// If scaledobject is in paused condition but replica count is not equal to paused replica count, the following scaling logic needs to be trigger again.
scaledToPausedCount = r.checkIfTargetResourceReachPausedCount(ctx, logger, scaledObject)
if scaledToPausedCount {
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}
}
if scaledObject.Status.PausedReplicaCount != nil {
if scaledObject.NeedToBePausedByAnnotation() && scaledToPausedCount {
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
msg = "failed to stop the scale loop for paused ScaledObject"
Expand Down Expand Up @@ -279,7 +286,6 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}

if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled")
}
Expand All @@ -303,6 +309,34 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, lo
return r.Client.Update(ctx, scaledObject)
}

func (r *ScaledObjectReconciler) checkIfTargetResourceReachPausedCount(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) bool {
pausedReplicaCount, pausedReplicasAnnotationFound := scaledObject.GetAnnotations()[kedav1alpha1.PausedReplicasAnnotation]
if !pausedReplicasAnnotationFound {
return true
}
pausedReplicaCountNum, err := strconv.ParseInt(pausedReplicaCount, 10, 32)
if err != nil {
return true
}

gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
if err != nil {
logger.Error(err, "failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
return true
}
gvkString := gvkr.GVKString()
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// check if we already know.
var scale *autoscalingv1.Scale
gr := gvkr.GroupResource()
scale, errScale := (r.ScaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
return true
}
return scale.Spec.Replicas == int32(pausedReplicaCountNum)
}

// checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
Expand All @@ -316,7 +350,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// do we need the scale to update the status later?
_, present := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
present := scaledObject.HasPausedAnnotation()
removePausedStatus := scaledObject.Status.PausedReplicaCount != nil && !present
wantStatusUpdate := scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil || removePausedStatus

Expand Down
11 changes: 5 additions & 6 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
"github.com/kedacore/keda/v2/pkg/mock/mock_scaling"
"github.com/kedacore/keda/v2/pkg/scalers"
Expand Down Expand Up @@ -912,7 +911,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
annotations := make(map[string]string)
annotations[kedacontrollerutil.PausedReplicasAnnotation] = "1"
annotations[kedav1alpha1.PausedReplicasAnnotation] = "1"
so.SetAnnotations(annotations)
pollingInterval := int32(6)
so.Spec.PollingInterval = &pollingInterval
Expand All @@ -924,8 +923,7 @@ var _ = Describe("ScaledObjectController", func() {
Eventually(func() bool {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
_, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
return paused
return so.HasPausedAnnotation()
}).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue())

Eventually(func() metav1.ConditionStatus {
Expand All @@ -952,7 +950,7 @@ var _ = Describe("ScaledObjectController", func() {
Name: soName,
Namespace: "default",
Annotations: map[string]string{
kedacontrollerutil.PausedReplicasAnnotation: pausedReplicasCountForAnnotation,
kedav1alpha1.PausedReplicasAnnotation: pausedReplicasCountForAnnotation,
},
},
Spec: kedav1alpha1.ScaledObjectSpec{
Expand Down Expand Up @@ -992,7 +990,8 @@ var _ = Describe("ScaledObjectController", func() {
// validate annotation is set correctly
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
pausedReplicasCount, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
paused := so.HasPausedAnnotation()
pausedReplicasCount := so.GetAnnotations()[kedav1alpha1.PausedReplicasAnnotation]
Expect(paused).To(Equal(true))
Expect(pausedReplicasCount).To(Equal(pausedReplicasCountForAnnotation))

Expand Down
12 changes: 4 additions & 8 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"

const PausedAnnotation = "autoscaling.keda.sh/paused"

type PausedReplicasPredicate struct {
predicate.Funcs
}
Expand All @@ -27,11 +23,11 @@ func (PausedReplicasPredicate) Update(e event.UpdateEvent) bool {
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedReplicasAnnotation]
newPausedValue = newAnnotations[kedav1alpha1.PausedReplicasAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedReplicasAnnotation]
oldPausedValue = oldAnnotations[kedav1alpha1.PausedReplicasAnnotation]
}

return newPausedValue != oldPausedValue
Expand Down Expand Up @@ -84,11 +80,11 @@ func (PausedPredicate) Update(e event.UpdateEvent) bool {
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedAnnotation]
newPausedValue = newAnnotations[kedav1alpha1.PausedAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedAnnotation]
oldPausedValue = oldAnnotations[kedav1alpha1.PausedAnnotation]
}

return newPausedValue != oldPausedValue
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/redis/go-redis/v9 v9.1.0
github.com/robfig/cron/v3 v3.0.1
github.com/segmentio/kafka-go v0.4.42
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.16.0
Expand Down
Loading

0 comments on commit 1e554ab

Please sign in to comment.