diff --git a/Makefile b/Makefile index 070691b..50ec84a 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ NAME ?= adobe/k8s-shredder K8S_SHREDDER_VERSION ?= "dev" -KINDNODE_VERSION ?= "v1.28.0" +KINDNODE_VERSION ?= "v1.30.4" COMMIT ?= $(shell git rev-parse --short HEAD) TEST_CLUSTERNAME ?= "k8s-shredder-test-cluster" diff --git a/charts/k8s-shredder/Chart.yaml b/charts/k8s-shredder/Chart.yaml index 011ee84..c0ce217 100644 --- a/charts/k8s-shredder/Chart.yaml +++ b/charts/k8s-shredder/Chart.yaml @@ -12,5 +12,5 @@ maintainers: email: aneci@adobe.com url: https://adobe.com -version: 0.1.0 +version: 0.1.1 appVersion: v0.2.1 diff --git a/charts/k8s-shredder/templates/cluster-role.yaml b/charts/k8s-shredder/templates/cluster-role.yaml index 610739f..56ce16f 100644 --- a/charts/k8s-shredder/templates/cluster-role.yaml +++ b/charts/k8s-shredder/templates/cluster-role.yaml @@ -15,4 +15,7 @@ rules: - apiGroups: [apps, extensions] resources: [statefulsets, deployments, replicasets] verbs: [get, list, watch, update, patch] +- apiGroups: [ "argoproj.io" ] + resources: [ rollouts ] + verbs: [ get, list, watch, update, patch ] {{ end }} \ No newline at end of file diff --git a/cmd/root.go b/cmd/root.go index a37ca2b..06d5e2e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -114,6 +114,7 @@ func discoverConfig() { viper.SetDefault("RestartedAtAnnotation", "shredder.ethos.adobe.net/restartedAt") viper.SetDefault("AllowEvictionLabel", "shredder.ethos.adobe.net/allow-eviction") viper.SetDefault("ToBeDeletedTaint", "ToBeDeletedByClusterAutoscaler") + viper.SetDefault("ArgoRolloutsAPIVersion", "v1alpha1") err := viper.ReadInConfig() if err != nil { @@ -144,6 +145,7 @@ func parseConfig() { "RestartedAtAnnotation": cfg.RestartedAtAnnotation, "AllowEvictionLabel": cfg.AllowEvictionLabel, "ToBeDeletedTaint": cfg.ToBeDeletedTaint, + "ArgoRolloutsAPIVersion": cfg.ArgoRolloutsAPIVersion, }).Info("Loaded configuration") } diff --git a/internal/testing/e2e_test.go b/internal/testing/e2e_test.go index 5d9f056..3a89f41 100644 --- a/internal/testing/e2e_test.go +++ b/internal/testing/e2e_test.go @@ -23,6 +23,8 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/exp/slices" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "os" "strings" "testing" @@ -47,6 +49,56 @@ var ( } ) +func grabMetrics(shredderMetrics []string, t *testing.T) []string { + results := make([]string, 0) + warnings := make([]string, 0) + + for _, shredderMetric := range shredderMetrics { + result, warning, err := prometheusQuery(shredderMetric) + if err != nil { + t.Errorf("Error querying Prometheus: %v\n", err) + } + warnings = append(warnings, warning...) + results = append(results, result.String()) + } + + if len(warnings) > 0 { + t.Logf("Warnings: %v\n", strings.Join(warnings, "\n")) + } + + t.Logf("Results: \n%v\n", strings.Join(results, "\n")) + + return results +} + +func prometheusQuery(query string) (model.Value, v1.Warnings, error) { + + client, err := api.NewClient(api.Config{ + Address: "http://localhost:30007", + }) + if err != nil { + fmt.Printf("Error creating client: %v\n", err) + os.Exit(1) + } + + v1api := v1.NewAPI(client) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + return v1api.Query(ctx, query, time.Now(), v1.WithTimeout(5*time.Second)) +} + +func compareTime(expirationTime time.Time, t *testing.T, ch chan time.Time) { + currentTime := time.Now().UTC() + + for !currentTime.After(expirationTime.UTC()) { + t.Logf("Node TTL haven't expired yet: current time(UTC): %s, expire time(UTC): %s", currentTime, expirationTime.UTC()) + time.Sleep(10 * time.Second) + currentTime = time.Now().UTC() + + } + ch <- currentTime +} + // Validates that k8s-shredder cleanup a parked node after its TTL expires func TestNodeIsCleanedUp(t *testing.T) { var err error @@ -110,18 +162,7 @@ func TestNodeIsCleanedUp(t *testing.T) { } } -func compareTime(expirationTime time.Time, t *testing.T, ch chan time.Time) { - currentTime := time.Now().UTC() - - for !currentTime.After(expirationTime.UTC()) { - t.Logf("Node TTL haven't expired yet: current time(UTC): %s, expire time(UTC): %s", currentTime, expirationTime.UTC()) - time.Sleep(10 * time.Second) - currentTime = time.Now().UTC() - - } - ch <- currentTime -} - +// Validates shredder metrics func TestShredderMetrics(t *testing.T) { // Intentionally skipped the gauge metrics as they are going to be wiped out before every eviction loop @@ -140,40 +181,43 @@ func TestShredderMetrics(t *testing.T) { } } -func grabMetrics(shredderMetrics []string, t *testing.T) []string { - results := make([]string, 0) - warnings := make([]string, 0) +func TestArgoRolloutRestartAt(t *testing.T) { + var err error - for _, shredderMetric := range shredderMetrics { - result, warning, err := prometheusQuery(shredderMetric) - if err != nil { - t.Errorf("Error querying Prometheus: %v\n", err) - } - warnings = append(warnings, warning...) - results = append(results, result.String()) - } + appContext, err := utils.NewAppContext(config.Config{ + ParkedNodeTTL: 30 * time.Second, + EvictionLoopInterval: 10 * time.Second, + RollingRestartThreshold: 0.1, + UpgradeStatusLabel: "shredder.ethos.adobe.net/upgrade-status", + ExpiresOnLabel: "shredder.ethos.adobe.net/parked-node-expires-on", + NamespacePrefixSkipInitialEviction: "", + RestartedAtAnnotation: "shredder.ethos.adobe.net/restartedAt", + AllowEvictionLabel: "shredder.ethos.adobe.net/allow-eviction", + ToBeDeletedTaint: "ToBeDeletedByClusterAutoscaler", + ArgoRolloutsAPIVersion: "v1alpha1", + }, false) - if len(warnings) > 0 { - t.Logf("Warnings: %v\n", strings.Join(warnings, "\n")) + if err != nil { + log.Fatalf("Failed to setup application context: %s", err) } - t.Logf("Results: \n%v\n", strings.Join(results, "\n")) - - return results -} + gvr := schema.GroupVersionResource{ + Group: "argoproj.io", + Version: appContext.Config.ArgoRolloutsAPIVersion, + Resource: "rollouts", + } -func prometheusQuery(query string) (model.Value, v1.Warnings, error) { + rollout, err := appContext.DynamicK8SClient.Resource(gvr).Namespace("ns-team-k8s-shredder-test").Get(appContext.Context, "test-app-argo-rollout", metav1.GetOptions{}) + if err != nil { + log.Fatalf("Failed to get the Argo Rollout object: %s", err) + } + _, found, err := unstructured.NestedString(rollout.Object, "spec", "restartAt") - client, err := api.NewClient(api.Config{ - Address: "http://localhost:30007", - }) if err != nil { - fmt.Printf("Error creating client: %v\n", err) - os.Exit(1) + log.Fatalf("Failed to get the Argo Rollout spec.restartAt field: %s", err) } - v1api := v1.NewAPI(client) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - return v1api.Query(ctx, query, time.Now(), v1.WithTimeout(5*time.Second)) + if !found { + t.Fatalf("Argo Rollout object does not have the spec.restartAt field set") + } } diff --git a/internal/testing/local_env_prep.sh b/internal/testing/local_env_prep.sh index 21626fe..0688b0e 100755 --- a/internal/testing/local_env_prep.sh +++ b/internal/testing/local_env_prep.sh @@ -41,9 +41,17 @@ kubectl apply -f "${test_dir}/k8s-shredder.yaml" echo "KIND: deploying prometheus..." kubectl apply -f "${test_dir}/prometheus_stuffs.yaml" +echo "KIND: deploying Argo Rollouts CRD..." +kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-rollouts/v1.7.2/manifests/crds/rollout-crd.yaml + echo "KIND: deploying test applications..." kubectl apply -f "${test_dir}/test_apps.yaml" +# Adjust the correct UID for the test-app-argo-rollout ownerReference +rollout_uid=$(kubectl -n ns-team-k8s-shredder-test get rollout test-app-argo-rollout -ojsonpath='{.metadata.uid}') +cat "${test_dir}/test_apps.yaml" | sed "s/REPLACE_WITH_ROLLOUT_UID/${rollout_uid}/" | kubectl apply -f - + + echo "K8S_SHREDDER: waiting for k8s-shredder deployment to become ready!" retry_count=0 i=1 diff --git a/internal/testing/prometheus_stuffs.yaml b/internal/testing/prometheus_stuffs.yaml index aba9982..0d9b9b5 100644 --- a/internal/testing/prometheus_stuffs.yaml +++ b/internal/testing/prometheus_stuffs.yaml @@ -18,7 +18,7 @@ spec: spec: containers: - name: prometheus - image: prom/prometheus:v2.42.0 + image: prom/prometheus:v2.54.1 args: - "--storage.tsdb.retention.time=1h" - "--config.file=/etc/prometheus/prometheus.yml" diff --git a/internal/testing/rbac.yaml b/internal/testing/rbac.yaml index ef20fe3..57fad7d 100644 --- a/internal/testing/rbac.yaml +++ b/internal/testing/rbac.yaml @@ -31,4 +31,7 @@ rules: - apiGroups: [apps, extensions] resources: [statefulsets, deployments, replicasets] verbs: [get, list, watch, update, patch] + - apiGroups: [ "argoproj.io" ] + resources: [ rollouts ] + verbs: [ get, list, watch, update, patch ] diff --git a/internal/testing/test_apps.yaml b/internal/testing/test_apps.yaml index ae4594c..e22eaf9 100644 --- a/internal/testing/test_apps.yaml +++ b/internal/testing/test_apps.yaml @@ -239,4 +239,69 @@ spec: minAvailable: 1 selector: matchLabels: - app: test-app-statefulset \ No newline at end of file + app: test-app-statefulset +#### FLEX #### +# 1. Good citizen Argo Rollout in Flex world +--- +apiVersion: apps/v1 +kind: ReplicaSet +metadata: + name: test-app-argo-rollout + namespace: ns-team-k8s-shredder-test + ownerReferences: + - apiVersion: argoproj.io/v1alpha1 + kind: Rollout + blockOwnerDeletion: true + name: test-app-argo-rollout + uid: REPLACE_WITH_ROLLOUT_UID +spec: + replicas: 2 + selector: + matchLabels: + app: test-app-argo-rollout + template: + metadata: + labels: + app: test-app-argo-rollout + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - test-app-argo-rollout + topologyKey: kubernetes.io/hostname + containers: + - name: test-app-argo-rollout + image: aaneci/canary + ports: + - containerPort: 8080 + name: web +--- +apiVersion: argoproj.io/v1alpha1 +kind: Rollout +metadata: + name: test-app-argo-rollout + namespace: ns-team-k8s-shredder-test +spec: + replicas: 2 + workloadRef: + apiVersion: apps/v1 + kind: ReplicaSet + name: test-app-argo-rollout +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-app-argo-rollout + namespace: ns-team-k8s-shredder-test +spec: + minAvailable: 10 + selector: + matchLabels: + app: test-app-argo-rollout \ No newline at end of file diff --git a/pkg/config/config.go b/pkg/config/config.go index 02fd239..28e2345 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -33,4 +33,6 @@ type Config struct { AllowEvictionLabel string // ToBeDeletedTaint is used for skipping a subset of parked nodes ToBeDeletedTaint string + // ArgoRolloutsAPIVersion is used for specifying the API version from `argoproj.io` apigroup to be used while handling Argo Rollouts objects + ArgoRolloutsAPIVersion string } diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 6efc7a3..abcf7fd 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -15,6 +15,8 @@ import ( "encoding/json" "fmt" "golang.org/x/exp/slices" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "strings" "sync" "time" @@ -226,15 +228,15 @@ func (h *Handler) processNode(node v1.Node, rr chan *controllerObject) error { continue } - // For pods handled by a deployment or statefulset controller, try to rollout restart those objects - if slices.Contains([]string{"Deployment", "StatefulSet"}, co.Kind) { + // For pods handled by a deployment, statefulset or argo rollouts controller, try to rollout restart those objects + if slices.Contains([]string{"Deployment", "StatefulSet", "Rollout"}, co.Kind) { rolloutRestartInProgress, err := h.isRolloutRestartInProgress(co) if err != nil { h.logger.WithField("key", co.Fingerprint()).Warnf("Failed to get rollout status: %s", err.Error()) metrics.ShredderErrorsTotal.Inc() continue } - // if the rollout restart process is in progress, evict the pod instead of trying t do another rollout restart + // if the rollout restart process is in progress, evict the pod instead of trying to do another rollout restart if rolloutRestartInProgress { err := h.evictPod(pod, deleteOptions) if err != nil { @@ -360,11 +362,36 @@ func (h *Handler) getControllerObject(pod v1.Pod) (*controllerObject, error) { h.logger.Warnf("Pod %s is controlled by an isolated ReplicaSet", pod.Name) return co, nil } - deployment, err := h.appContext.K8sClient.AppsV1().Deployments(pod.Namespace).Get(h.appContext.Context, replicaSet.OwnerReferences[0].Name, metav1.GetOptions{}) - if err != nil { - return co, err + + switch replicaSet.OwnerReferences[0].Kind { + case "Deployment": + + deployment, err := h.appContext.K8sClient.AppsV1().Deployments(pod.Namespace).Get(h.appContext.Context, replicaSet.OwnerReferences[0].Name, metav1.GetOptions{}) + if err != nil { + return co, err + } + return newControllerObject("Deployment", deployment.Name, deployment.Namespace, deployment), nil + case "Rollout": + // Make sure we are dealing with an Argo Rollout + if replicaSet.OwnerReferences[0].APIVersion == fmt.Sprintf("argoproj.io/%s", h.appContext.Config.ArgoRolloutsAPIVersion) { + + gvr := schema.GroupVersionResource{ + Group: "argoproj.io", + Version: h.appContext.Config.ArgoRolloutsAPIVersion, + Resource: "rollouts", + } + + rollout, err := h.appContext.DynamicK8SClient.Resource(gvr).Namespace(pod.Namespace).Get(h.appContext.Context, replicaSet.OwnerReferences[0].Name, metav1.GetOptions{}) + if err != nil { + return co, err + } + return newControllerObject("Rollout", rollout.GetName(), rollout.GetNamespace(), rollout), nil + } else { + return co, errors.Errorf("Controller object of type %s from %s API group is not supported! Please file a git issue or contribute it!", replicaSet.OwnerReferences[0].Kind, replicaSet.OwnerReferences[0].APIVersion) + } + default: + return co, errors.Errorf("Controller object of type %s from %s API group is not supported! Please file a git issue or contribute it!", pod.OwnerReferences[0].Kind, pod.OwnerReferences[0].APIVersion) } - return newControllerObject("Deployment", deployment.Name, deployment.Namespace, deployment), nil case "DaemonSet": h.logger.Warnf("DaemonSets are not covered") @@ -426,6 +453,20 @@ func (h *Handler) isRolloutRestartInProgress(co *controllerObject) (bool, error) if sts.Status.UpdateRevision != sts.Status.CurrentRevision { return true, nil } + case "Rollout": + rollout := co.Object.(*unstructured.Unstructured) + + // TODO - check if the other rollout conditions should be checked as well + // See https://github.com/argoproj/argo-rollouts/blob/bfef7f0d2bb71b085398c35ec95c1b2aacd07187/rollout/sync.go#L618 + isPaused, found, err := unstructured.NestedBool(rollout.Object, "spec", "paused") + if err != nil { + return false, err + } + + if found && isPaused { + h.logger.Warnf("Argo Rollout %s is currently paused, won't restart it!", rollout.GetName()) + return false, nil + } default: return false, errors.Errorf("rollout restart not supported for object of type %s", co.Kind) } @@ -514,8 +555,6 @@ func (h *Handler) doRolloutRestart(co *controllerObject) error { if err != nil { return err } - case "DaemonSet": - return errors.Errorf("DaemonSets are not covered") case "StatefulSet": sts := co.Object.(*appsv1.StatefulSet) _, err := h.appContext.K8sClient.AppsV1().StatefulSets(sts.Namespace). @@ -523,6 +562,26 @@ func (h *Handler) doRolloutRestart(co *controllerObject) error { if err != nil { return err } + case "Rollout": + rollout := co.Object.(*unstructured.Unstructured) + gvr := schema.GroupVersionResource{ + Group: "argoproj.io", + Version: h.appContext.Config.ArgoRolloutsAPIVersion, + Resource: "rollouts", + } + + patchDataRollout, _ := json.Marshal(map[string]interface{}{ + "spec": map[string]interface{}{ + "restartAt": restartedAt, + }, + }) + + _, err := h.appContext.DynamicK8SClient.Resource(gvr).Namespace(rollout.GetNamespace()).Patch(h.appContext.Context, rollout.GetName(), types.MergePatchType, patchDataRollout, patchOptions) + if err != nil { + return err + } + case "DaemonSet": + return errors.Errorf("DaemonSets are not covered") default: return errors.Errorf("invalid controller object") } diff --git a/pkg/utils/context.go b/pkg/utils/context.go index 47eef0f..413e1c4 100644 --- a/pkg/utils/context.go +++ b/pkg/utils/context.go @@ -14,33 +14,42 @@ package utils import ( "context" "github.com/adobe/k8s-shredder/pkg/config" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" ) // AppContext struct stores a context and a k8s client type AppContext struct { - Context context.Context - K8sClient kubernetes.Interface - Config config.Config - dryRun bool + Context context.Context + K8sClient kubernetes.Interface + DynamicK8SClient dynamic.Interface + Config config.Config + dryRun bool } // NewAppContext creates a new AppContext object func NewAppContext(cfg config.Config, dryRun bool) (*AppContext, error) { - clientSet, err := getClusterConfig() + client, err := getK8SClient() if err != nil { return nil, err } + + dynamicClient, err := getDynamicK8SClient() + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) go HandleOsSignals(cancel) return &AppContext{ - Context: ctx, - K8sClient: clientSet, - Config: cfg, - dryRun: dryRun, + Context: ctx, + K8sClient: client, + DynamicK8SClient: dynamicClient, + Config: cfg, + dryRun: dryRun, }, nil } diff --git a/pkg/utils/k8s.go b/pkg/utils/k8s.go index 27ad02c..5cef050 100644 --- a/pkg/utils/k8s.go +++ b/pkg/utils/k8s.go @@ -14,24 +14,40 @@ package utils import ( "github.com/pkg/errors" v1 "k8s.io/api/core/v1" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client/config" "strconv" "time" ) -func getClusterConfig() (*kubernetes.Clientset, error) { +func getK8SClient() (*kubernetes.Clientset, error) { cfg, err := config.GetConfig() if err != nil { return nil, err } - clientSet, err := kubernetes.NewForConfig(cfg) + client, err := kubernetes.NewForConfig(cfg) if err != nil { return nil, err } - return clientSet, nil + return client, nil +} + +func getDynamicK8SClient() (*dynamic.DynamicClient, error) { + cfg, err := config.GetConfig() + if err != nil { + return nil, err + } + + // Create a dynamic client + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, errors.Errorf("Error creating dynamic client: %v", err) + } + + return dynamicClient, nil } // NodeHasTaint check if a node has a taint set