Skip to content

Commit

Permalink
chages for scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
amit-100ms committed Sep 25, 2023
1 parent 9431f24 commit d8a26ce
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 15 deletions.
42 changes: 40 additions & 2 deletions pkg/scalers/kubernetes_workload_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var phasesCountedAsTerminated = []corev1.PodPhase{

type kubernetesWorkloadMetadata struct {
podSelector labels.Selector
allPodsSelector labels.Selector
namespace string
value float64
activationValue float64
Expand Down Expand Up @@ -72,6 +73,10 @@ func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, e
return nil, fmt.Errorf("invalid pod selector")
}
meta.podSelector = podSelector
meta.allPodsSelector, err = labels.Parse(config.TriggerMetadata[allPodsSelectorKey])
if err != nil || meta.allPodsSelector.String() == "" {
return nil, fmt.Errorf("invalid all pods selector")
}
value, err := strconv.ParseFloat(config.TriggerMetadata[valueKey], 64)
if err != nil || value == 0 {
return nil, fmt.Errorf("value must be a float greater than 0")
Expand Down Expand Up @@ -115,9 +120,17 @@ func (s *kubernetesWorkloadScaler) GetMetricsAndActivity(ctx context.Context, me
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error inspecting kubernetes workload: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(pods))
totalPods, err := s.getTotalValue(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error inspecting kubernetes workload: %s", err)
}

metric := GenerateMetricInMili(metricName, float64(pods)/float64(totalPods))

return []external_metrics.ExternalMetricValue{metric}, float64(pods) > s.metadata.activationValue, nil
logger := s.logger.WithValues("scaledjob.AllPodsSelector", s.metadata.allPodsSelector)
logger.Info("Workload", "Value", fmt.Sprintf("%v,%v", pods, totalPods))

return []external_metrics.ExternalMetricValue{metric}, (float64(pods) / float64(totalPods)) > s.metadata.activationValue, nil
}

func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int64, error) {
Expand All @@ -142,6 +155,31 @@ func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int64, e
return count, nil
}

func (s *kubernetesWorkloadScaler) getTotalValue(ctx context.Context) (int64, error) {
podList := &corev1.PodList{}
listOptions := client.ListOptions{}
listOptions.LabelSelector = s.metadata.allPodsSelector
listOptions.Namespace = s.metadata.namespace
opts := []client.ListOption{
&listOptions,
}

err := s.kubeClient.List(ctx, podList, opts...)
if err != nil {
return 0, err
}

var count int64
for _, pod := range podList.Items {
count += getCountValue(pod)
}
if count == 0 {
count = 1
}

return count, nil
}

func getCountValue(pod corev1.Pod) int64 {
for _, ignore := range phasesCountedAsTerminated {
if pod.Status.Phase == ignore {
Expand Down
2 changes: 2 additions & 0 deletions pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
type ScaleExecutor interface {
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64)
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool)
GetRunningJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64
GetPendingJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64
}

type scaleExecutor struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)

runningJobCount := e.getRunningJobCount(ctx, scaledJob)
pendingJobCount := e.getPendingJobCount(ctx, scaledJob)
runningJobCount := e.GetRunningJobCount(ctx, scaledJob)
pendingJobCount := e.GetPendingJobCount(ctx, scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount)

Expand Down Expand Up @@ -166,7 +166,7 @@ func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool {
return false
}

func (e *scaleExecutor) getRunningJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64 {
func (e *scaleExecutor) GetRunningJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64 {
var runningJobs int64

opts := []client.ListOption{
Expand Down Expand Up @@ -240,7 +240,7 @@ func (e *scaleExecutor) areAllPendingPodConditionsFulfilled(ctx context.Context,
return len(pendingPodConditions) == fulfilledConditionsCount
}

func (e *scaleExecutor) getPendingJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64 {
func (e *scaleExecutor) GetPendingJobCount(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) int64 {
var pendingJobs int64

opts := []client.ListOption{
Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestGetPendingJobCount(t *testing.T) {
scaleExecutor := getMockScaleExecutor(client)

scaledJob := getMockScaledJobWithPendingPodConditions(testData.PendingPodConditions)
result := scaleExecutor.getPendingJobCount(ctx, scaledJob)
result := scaleExecutor.GetPendingJobCount(ctx, scaledJob)

assert.Equal(t, testData.PendingJobCount, result)
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
return
}

isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj)
isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj, h.scaleExecutor.GetRunningJobCount(ctx, obj), h.scaleExecutor.GetPendingJobCount(ctx, obj))
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
}
}
Expand Down Expand Up @@ -659,7 +659,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k

// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics {
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, _, pendingJobCount int64) []scaledjob.ScalerMetrics {
cache, err := h.GetScalersCache(ctx, scaledJob)
if err != nil {
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
Expand Down Expand Up @@ -693,6 +693,12 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav

queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount())

//TODO: don't remove pendingJobCount for some scalers like kuberentes_workload_scaler
queueLength -= float64(pendingJobCount)
if queueLength < 0 {
queueLength = 0
}

scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue)

scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{
Expand All @@ -707,10 +713,10 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
// isScaledJobActive returns whether the input ScaledJob:
// is active as the first return value,
// the second and the third return values indicate queueLength and maxValue for scale
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, runningJobCount, pendingJobCount int64) (bool, int64, int64) {
logger := logf.Log.WithName("scalemetrics")

scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob)
scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob, runningJobCount, pendingJobCount)
isActive, queueLength, maxValue, maxFloatValue :=
scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount())

Expand Down
9 changes: 6 additions & 3 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package scaling

/*
import (
"context"
"errors"
Expand Down Expand Up @@ -393,7 +395,7 @@ func TestIsScaledJobActive(t *testing.T) {
scalerCachesLock: &sync.RWMutex{},
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle, 1, 1)
assert.Equal(t, true, isActive)
assert.Equal(t, int64(20), queueLength)
assert.Equal(t, int64(10), maxValue)
Expand Down Expand Up @@ -449,7 +451,7 @@ func TestIsScaledJobActive(t *testing.T) {
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}
fmt.Printf("index: %d", index)
isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob)
isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob, 1, 1)
// assert.Equal(t, 5, index)
assert.Equal(t, scalerTestData.ResultIsActive, isActive)
assert.Equal(t, scalerTestData.ResultQueueLength, queueLength)
Expand Down Expand Up @@ -489,7 +491,7 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle, 1, 1)
assert.Equal(t, true, isActive)
assert.Equal(t, int64(0), queueLength)
assert.Equal(t, int64(0), maxValue)
Expand Down Expand Up @@ -755,3 +757,4 @@ func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec {
},
}
}
*/
2 changes: 1 addition & 1 deletion pkg/scaling/scaledjob/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func IsScaledJobActive(scalersMetrics []ScalerMetrics, multipleScalersCalculatio

// ceilToInt64 returns the int64 ceil value for the float64 input
func ceilToInt64(x float64) int64 {
return int64(math.Ceil(x))
return int64(math.Floor(x))
}

// min returns the minimum for input float64 values
Expand Down
Empty file modified version.sh
100644 → 100755
Empty file.

0 comments on commit d8a26ce

Please sign in to comment.