From 0b6960fd1a777d3478c8378ceb6128b7bba07f32 Mon Sep 17 00:00:00 2001 From: Kevin McDermott Date: Fri, 14 Jul 2023 14:48:44 +0100 Subject: [PATCH] Duplicate the service as well as the Deployment. When creating the clones of the source deployment, this also clones the Service if they exist. Not all Deployments will have a service, for example, source-controller does, but the others don't. --- config/default/kustomization.yaml | 16 +- config/rbac/role.yaml | 12 + go.mod | 1 + go.sum | 4 + .../controller/fluxshardset_controller.go | 94 +++-- .../fluxshardset_controller_test.go | 240 +++++++---- internal/deploys/deploys.go | 205 ++++++---- internal/deploys/deploys_test.go | 378 +++++++++++------- test/cleanup.go | 46 +++ test/deployment.go | 60 ++- test/shardset.go | 2 +- tests/e2e/fluxshardset_controller_test.go | 90 ++--- tests/e2e/main_test.go | 6 + 13 files changed, 778 insertions(+), 376 deletions(-) create mode 100644 test/cleanup.go diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index bd5f246..42f3b2a 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -1,3 +1,11 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +patches: +# Protect the /metrics endpoint by putting it behind auth. +# If you want your controller-manager to expose the /metrics +# endpoint w/o any authn/z, please comment the following line. +- path: manager_auth_proxy_patch.yaml + # Adds namespace to all resources. namespace: flux-system @@ -26,14 +34,6 @@ resources: # [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'. #- ../prometheus -patchesStrategicMerge: -# Protect the /metrics endpoint by putting it behind auth. -# If you want your controller-manager to expose the /metrics -# endpoint w/o any authn/z, please comment the following line. -- manager_auth_proxy_patch.yaml - - - # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in # crd/kustomization.yaml #- manager_webhook_patch.yaml diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 576141e..767482c 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -5,6 +5,18 @@ metadata: creationTimestamp: null name: manager-role rules: +- apiGroups: + - "" + resources: + - services + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - apps resources: diff --git a/go.mod b/go.mod index 3c0109b..6de9d06 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/weaveworks/flux-shard-controller go 1.19 require ( + dario.cat/mergo v1.0.0 github.com/fluxcd/pkg/apis/meta v1.1.0 github.com/fluxcd/pkg/runtime v0.38.1 github.com/gitops-tools/pkg v0.1.0 diff --git a/go.sum b/go.sum index 7fc16c4..55e31dc 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= +dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230106234847-43070de90fa1 h1:EKPd1INOIyr5hWOWhvpmQpY6tKjeG0hT1s3AMC/9fic= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ= @@ -23,6 +26,7 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/controller/fluxshardset_controller.go b/internal/controller/fluxshardset_controller.go index 3a2d06f..f016844 100644 --- a/internal/controller/fluxshardset_controller.go +++ b/internal/controller/fluxshardset_controller.go @@ -19,7 +19,11 @@ package controller import ( "context" "fmt" + "reflect" + "dario.cat/mergo" + "github.com/fluxcd/pkg/runtime/patch" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +42,7 @@ import ( deploys "github.com/weaveworks/flux-shard-controller/internal/deploys" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) var accessor = meta.NewAccessor() @@ -53,6 +58,7 @@ type FluxShardSetReconciler struct { // +kubebuilder:rbac:groups=templates.weave.works,resources=fluxshardsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=templates.weave.works,resources=fluxshardsets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -89,7 +95,7 @@ func (r *FluxShardSetReconciler) Reconcile(ctx context.Context, req ctrl.Request if inventory != nil { templatesv1.SetReadyWithInventory(&shardSet, inventory, templatesv1.ReconciliationSucceededReason, - fmt.Sprintf("%d shard(s) created", len(inventory.Entries))) + fmt.Sprintf("%d resources created", len(inventory.Entries))) if err := r.patchStatus(ctx, req, shardSet.Status); client.IgnoreNotFound(err) != nil { templatesv1.SetFluxShardSetReadiness(&shardSet, metav1.ConditionFalse, templatesv1.ReconciliationFailedReason, err.Error()) @@ -104,7 +110,7 @@ func (r *FluxShardSetReconciler) Reconcile(ctx context.Context, req ctrl.Request func (r *FluxShardSetReconciler) removeResourceRefs(ctx context.Context, deletions []templatesv1.ResourceRef) error { logger := log.FromContext(ctx) for _, v := range deletions { - d, err := deploymentFromResourceRef(v) + d, err := unstructuredFromResourceRef(v) if err != nil { return err } @@ -143,7 +149,12 @@ func (r *FluxShardSetReconciler) reconcileResources(ctx context.Context, fluxSha return nil, client.IgnoreNotFound(err) } - generatedDeployments, err := deploys.GenerateDeployments(fluxShardSet, srcDeploy) + srcService, err := r.getSourceService(ctx, srcDeploy) + if err != nil { + return nil, fmt.Errorf("failed to find Service for Deployment %s: %w", client.ObjectKeyFromObject(srcDeploy), err) + } + + generatedResources, err := deploys.GenerateDeployments(fluxShardSet, srcDeploy, srcService) if err != nil { return nil, fmt.Errorf("failed to generate deployments: %w", err) } @@ -156,43 +167,49 @@ func (r *FluxShardSetReconciler) reconcileResources(ctx context.Context, fluxSha // newInventory holds the resource refs for the generated resources. newInventory := sets.New[templatesv1.ResourceRef]() - for _, newDeployment := range generatedDeployments { - ref, err := templatesv1.ResourceRefFromObject(newDeployment) + for _, newResource := range generatedResources { + ref, err := templatesv1.ResourceRefFromObject(newResource) if err != nil { return nil, fmt.Errorf("failed to update inventory: %w", err) } - if existingInventory.Has(ref) { newInventory.Insert(ref) - existing := &appsv1.Deployment{} - err = r.Client.Get(ctx, client.ObjectKeyFromObject(newDeployment), existing) + existing := runtimeObjectFromObject(newResource) + err = r.Client.Get(ctx, client.ObjectKeyFromObject(newResource), existing) if err == nil { - newDeployment = copyDeploymentContent(existing, newDeployment) - if err := r.Client.Patch(ctx, newDeployment, client.MergeFrom(existing)); err != nil { - return nil, fmt.Errorf("failed to update Deployment: %w", err) + patchHelper, err := patch.NewHelper(existing, r.Client) + if err != nil { + return nil, err + } + if err := mergo.Merge(existing, newResource, mergo.WithOverride); err != nil { + return nil, err } - if err := logResourceMessage(logger, "updated deployment", newDeployment); err != nil { + if err := patchHelper.Patch(ctx, existing); err != nil { + return nil, err + } + + if err := logResourceMessage(logger, "updated resource", newResource); err != nil { return nil, err } continue } if !apierrors.IsNotFound(err) { - return nil, fmt.Errorf("failed to load existing Deployment: %w", err) + return nil, fmt.Errorf("failed to load existing resource %s: %w", client.ObjectKeyFromObject(newResource), err) } } - if err := controllerutil.SetOwnerReference(fluxShardSet, newDeployment, r.Scheme); err != nil { + if err := controllerutil.SetOwnerReference(fluxShardSet, newResource, r.Scheme); err != nil { return nil, fmt.Errorf("failed to set owner reference: %w", err) } - if err := r.Client.Create(ctx, newDeployment); err != nil { - return nil, fmt.Errorf("failed to create Deployment: %w", err) + if err := r.Client.Create(ctx, newResource); err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) } newInventory.Insert(ref) - if err := logResourceMessage(logger, "created new deployment", newDeployment); err != nil { + if err := logResourceMessage(logger, "created new resource", newResource); err != nil { return nil, err } } @@ -228,6 +245,19 @@ func (r *FluxShardSetReconciler) getSourceDeployment(ctx context.Context, fluxSh return srcDeploy, nil } +func (r *FluxShardSetReconciler) getSourceService(ctx context.Context, deploy *appsv1.Deployment) (*corev1.Service, error) { + srcServiceKey := client.ObjectKey{ + Name: deploy.GetName(), + Namespace: deploy.GetNamespace(), + } + srcService := &corev1.Service{} + if err := r.Client.Get(ctx, srcServiceKey, srcService); err != nil { + return nil, client.IgnoreNotFound(err) + } + + return srcService, nil +} + func (r *FluxShardSetReconciler) patchStatus(ctx context.Context, req ctrl.Request, newStatus templatesv1.FluxShardSetStatus) error { var set templatesv1.FluxShardSet if err := r.Client.Get(ctx, req.NamespacedName, &set); err != nil { @@ -256,19 +286,6 @@ func (r *FluxShardSetReconciler) deploymentsToFluxShardSet(ctx context.Context, return result } -func deploymentFromResourceRef(ref templatesv1.ResourceRef) (*appsv1.Deployment, error) { - objMeta, err := object.ParseObjMetadata(ref.ID) - if err != nil { - return nil, fmt.Errorf("failed to parse object ID %s: %w", ref.ID, err) - } - d := appsv1.Deployment{} - - d.Namespace = objMeta.Namespace - d.Name = objMeta.Name - - return &d, nil -} - func logResourceMessage(logger logr.Logger, msg string, obj runtime.Object) error { namespace, err := accessor.Namespace(obj) if err != nil { @@ -307,3 +324,20 @@ func indexDeployments(o client.Object) []string { return []string{fmt.Sprintf("%s/%s", fss.GetNamespace(), fss.Spec.SourceDeploymentRef.Name)} } + +func runtimeObjectFromObject(o client.Object) client.Object { + return reflect.New(reflect.TypeOf(o).Elem()).Interface().(client.Object) +} + +func unstructuredFromResourceRef(ref templatesv1.ResourceRef) (*unstructured.Unstructured, error) { + objMeta, err := object.ParseObjMetadata(ref.ID) + if err != nil { + return nil, fmt.Errorf("failed to parse object ID %s: %w", ref.ID, err) + } + u := unstructured.Unstructured{} + u.SetGroupVersionKind(objMeta.GroupKind.WithVersion(ref.Version)) + u.SetName(objMeta.Name) + u.SetNamespace(objMeta.Namespace) + + return &u, nil +} diff --git a/internal/controller/fluxshardset_controller_test.go b/internal/controller/fluxshardset_controller_test.go index 59d11db..4910e04 100644 --- a/internal/controller/fluxshardset_controller_test.go +++ b/internal/controller/fluxshardset_controller_test.go @@ -10,11 +10,11 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -65,11 +65,11 @@ func TestReconciliation(t *testing.T) { } test.AssertNoError(t, reconciler.SetupWithManager(mgr)) - test.AssertNoError(t, k8sClient.Create(context.TODO(), test.NewNamespace("test-ns"))) + test.AssertNoError(t, k8sClient.Create(context.TODO(), test.NewNamespace("flux-system"))) t.Run("reconciling creation of new deployment with shard", func(t *testing.T) { ctx := context.TODO() - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } @@ -89,11 +89,11 @@ func TestReconciliation(t *testing.T) { }) test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) - defer deleteFluxShardSet(t, k8sClient, shardSet) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) reconcileAndReload(t, k8sClient, reconciler, shardSet) - wantDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-1"), func(d *appsv1.Deployment) { + wantDeployment := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = map[string]string{ "templates.weave.works/shard-set": "test-shard-set", "app.kubernetes.io/managed-by": "flux-shard-controller", @@ -108,22 +108,77 @@ func TestReconciliation(t *testing.T) { // Check inventory updated with fluxshardset and new deployment(want) and condition of number of resources created test.AssertInventoryHasItems(t, shardSet, want...) - assertFluxShardSetCondition(t, shardSet, meta.ReadyCondition, "1 shard(s) created") + assertFluxShardSetCondition(t, shardSet, meta.ReadyCondition, "1 resources created") + // Check deployments existing include the new deployment + assertDeploymentsExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", "kustomize-controller-shard-1") + }) + + t.Run("reconciling creation of new deployment with service", func(t *testing.T) { + ctx := context.TODO() + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-label-selector=!sharding.fluxcd.io/key", + } + }) + test.AssertNoError(t, k8sClient.Create(ctx, srcDeployment)) + defer deleteObject(t, k8sClient, srcDeployment) + + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, k8sClient.Create(ctx, srcService)) + defer deleteObject(t, k8sClient, srcService) + + shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { + set.Spec.Shards = []templatesv1.ShardSpec{ + { + Name: "shard-1", + }, + } + set.Spec.SourceDeploymentRef = templatesv1.SourceDeploymentReference{ + Name: srcDeployment.Name, + } + }) + + test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) + + reconcileAndReload(t, k8sClient, reconciler, shardSet) + + wantDeployment := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { + d.ObjectMeta.Labels = map[string]string{ + "templates.weave.works/shard-set": "test-shard-set", + "app.kubernetes.io/managed-by": "flux-shard-controller", + } + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", + } + }) + wantService := test.NewService("kustomize-controller-shard-1") + want := []runtime.Object{ + wantDeployment, + wantService, + } + // Check inventory updated with fluxshardset and new deployment(want) and condition of number of resources created + test.AssertInventoryHasItems(t, shardSet, want...) + assertFluxShardSetCondition(t, shardSet, meta.ReadyCondition, "2 resources created") // Check deployments existing include the new deployment - assertDeploymentsExist(t, k8sClient, "default", "kustomize-controller", "kustomize-controller-shard-1") + assertDeploymentsExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", "kustomize-controller-shard-1") + assertServicesExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", "kustomize-controller-shard-1") }) t.Run("reconciling creation of new deployment when it already exists", func(t *testing.T) { ctx := context.TODO() - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } }) test.AssertNoError(t, k8sClient.Create(ctx, srcDeployment)) defer deleteObject(t, k8sClient, srcDeployment) + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, k8sClient.Create(ctx, srcService)) + defer deleteObject(t, k8sClient, srcService) shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { set.Spec.Shards = []templatesv1.ShardSpec{ @@ -137,9 +192,9 @@ func TestReconciliation(t *testing.T) { }) test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) - defer deleteFluxShardSet(t, k8sClient, shardSet) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) - shard1 := test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-1"), func(d *appsv1.Deployment) { + shard1 := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = map[string]string{ "templates.weave.works/shard-set": "test-shard-set", "app.kubernetes.io/managed-by": "flux-shard-controller", @@ -153,7 +208,7 @@ func TestReconciliation(t *testing.T) { // Reconcile _, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(shardSet)}) - test.AssertErrorMatch(t, `failed to create Deployment: deployments.apps "kustomize-controller-shard-1" already exists`, err) + test.AssertErrorMatch(t, `failed to create resource: deployments.apps "kustomize-controller-shard-1" already exists`, err) // reload the shardset test.AssertNoError(t, k8sClient.Get(ctx, client.ObjectKeyFromObject(shardSet), shardSet)) @@ -162,18 +217,21 @@ func TestReconciliation(t *testing.T) { t.Errorf("expected Inventory to be nil, but got %v", shardSet.Status.Inventory) } assertFluxShardSetCondition(t, shardSet, meta.ReadyCondition, - `failed to create Deployment: deployments.apps "kustomize-controller-shard-1" already exists`) + `failed to create resource: deployments.apps "kustomize-controller-shard-1" already exists`) }) t.Run("Delete resources when removing shard from fluxshardset shards", func(t *testing.T) { ctx := context.TODO() - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } }) test.AssertNoError(t, k8sClient.Create(ctx, srcDeployment)) defer deleteObject(t, k8sClient, srcDeployment) + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, k8sClient.Create(ctx, srcService)) + defer deleteObject(t, k8sClient, srcService) // Create shard set and src deployment shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { @@ -190,14 +248,14 @@ func TestReconciliation(t *testing.T) { } }) test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) - defer deleteFluxShardSet(t, k8sClient, shardSet) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) reconcileAndReload(t, k8sClient, reconciler, shardSet) // Check fluxshardset - assertDeploymentsExist(t, k8sClient, "default", "kustomize-controller", "kustomize-controller-shard-1", "kustomize-controller-shard-2") + assertDeploymentsExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", "kustomize-controller-shard-1", "kustomize-controller-shard-2") - shard1Deploy := test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-1"), func(d *appsv1.Deployment) { + shard1Deploy := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = map[string]string{ "templates.weave.works/shard-set": "test-shard-set", "app.kubernetes.io/managed-by": "flux-shard-controller", @@ -207,7 +265,7 @@ func TestReconciliation(t *testing.T) { } }) - shard2Deploy := test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-2"), func(d *appsv1.Deployment) { + shard2Deploy := test.NewDeployment("kustomize-controller-shard-2", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = map[string]string{ "templates.weave.works/shard-set": "test-shard-set", "app.kubernetes.io/managed-by": "flux-shard-controller", @@ -216,7 +274,9 @@ func TestReconciliation(t *testing.T) { "--watch-label-selector=sharding.fluxcd.io/key in (shard-2)", } }) - test.AssertInventoryHasItems(t, shardSet, shard1Deploy, shard2Deploy) + test.AssertInventoryHasItems(t, shardSet, shard1Deploy, shard2Deploy, + test.NewService("kustomize-controller-shard-1"), + test.NewService("kustomize-controller-shard-2")) // Update shard set by removing shard-2 shardSet.Spec.Shards = []templatesv1.ShardSpec{ @@ -228,20 +288,24 @@ func TestReconciliation(t *testing.T) { reconcileAndReload(t, k8sClient, reconciler, shardSet) // Check deployment for shard-1 exists and deployment for shard-2 is deleted - test.AssertInventoryHasItems(t, shardSet, shard1Deploy) - assertDeploymentsExist(t, k8sClient, "default", "kustomize-controller", "kustomize-controller-shard-1") - assertDeploymentsDontExist(t, k8sClient, "default", "shard-2-kustomize-controller") + test.AssertInventoryHasItems(t, shardSet, shard1Deploy, test.NewService("kustomize-controller-shard-1")) + assertDeploymentsExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", "kustomize-controller-shard-1") + assertServicesExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", "kustomize-controller-shard-1") + assertDeploymentsDontExist(t, k8sClient, shardSet.Namespace, "shard-2-kustomize-controller") }) t.Run("Create new deployments with new shard names and delete old deployments after removing shard names", func(t *testing.T) { ctx := context.TODO() - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } }) test.AssertNoError(t, k8sClient.Create(ctx, srcDeployment)) defer k8sClient.Delete(ctx, srcDeployment) + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, k8sClient.Create(ctx, srcService)) + defer deleteObject(t, k8sClient, srcService) // Create shard set and src deployment shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { @@ -258,11 +322,14 @@ func TestReconciliation(t *testing.T) { } }) test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) - defer deleteFluxShardSet(t, k8sClient, shardSet) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) reconcileAndReload(t, k8sClient, reconciler, shardSet) - assertDeploymentsExist(t, k8sClient, "default", "kustomize-controller", "kustomize-controller-shard-a", "kustomize-controller-shard-b") + assertDeploymentsExist(t, k8sClient, shardSet.Namespace, + "kustomize-controller", "kustomize-controller-shard-a", "kustomize-controller-shard-b") + assertServicesExist(t, k8sClient, shardSet.Namespace, + "kustomize-controller", "kustomize-controller-shard-a", "kustomize-controller-shard-b") // Removing shard shardSet.Spec.Shards = []templatesv1.ShardSpec{ @@ -277,7 +344,7 @@ func TestReconciliation(t *testing.T) { reconcileAndReload(t, k8sClient, reconciler, shardSet) createDeployment := func(shardID string) *appsv1.Deployment { - return test.MakeTestDeployment(nsn("default", "kustomize-controller-"+shardID), func(d *appsv1.Deployment) { + return test.NewDeployment("kustomize-controller-"+shardID, func(d *appsv1.Deployment) { d.ObjectMeta.Labels = map[string]string{ "templates.weave.works/shard-set": "test-shard-set", "app.kubernetes.io/managed-by": "flux-shard-controller", @@ -288,19 +355,30 @@ func TestReconciliation(t *testing.T) { }) } - test.AssertInventoryHasItems(t, shardSet, createDeployment("shard-a"), createDeployment("shard-c")) - assertDeploymentsExist(t, k8sClient, "default", "kustomize-controller", "kustomize-controller-shard-a", "kustomize-controller-shard-c") - assertDeploymentsDontExist(t, k8sClient, "default", "shard-b-kustomize-controller") + test.AssertInventoryHasItems(t, shardSet, + createDeployment("shard-a"), createDeployment("shard-c"), + test.NewService("kustomize-controller-shard-a"), + test.NewService("kustomize-controller-shard-c"), + ) + assertDeploymentsExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", + "kustomize-controller-shard-a", "kustomize-controller-shard-c") + assertServicesExist(t, k8sClient, shardSet.Namespace, "kustomize-controller", + "kustomize-controller-shard-a", "kustomize-controller-shard-c") + assertDeploymentsDontExist(t, k8sClient, shardSet.Namespace, "kustomize-controller-shard-b") + assertServicesDontExist(t, k8sClient, shardSet.Namespace, "kustomize-controller-shard-b") }) t.Run("don't create deployments if src deployment not ignoring sharding", func(t *testing.T) { ctx := context.TODO() - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Annotations = map[string]string{} d.ObjectMeta.Name = "kustomize-controller" }) test.AssertNoError(t, k8sClient.Create(ctx, srcDeployment)) defer k8sClient.Delete(ctx, srcDeployment) + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, k8sClient.Create(ctx, srcService)) + defer deleteObject(t, k8sClient, srcService) // Create shard set and src deployment shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { @@ -314,9 +392,9 @@ func TestReconciliation(t *testing.T) { } }) test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) - defer deleteFluxShardSet(t, k8sClient, shardSet) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) - expectedErrMsg := "failed to generate deployments: deployment default/kustomize-controller is not configured to ignore sharding" + expectedErrMsg := `failed to generate deployments: deployment flux-system/kustomize-controller is not configured to ignore sharding` reconcileWithErrorAndReload(t, k8sClient, reconciler, shardSet, expectedErrMsg) assertFluxShardSetCondition(t, shardSet, meta.ReadyCondition, expectedErrMsg) @@ -324,9 +402,8 @@ func TestReconciliation(t *testing.T) { t.Run("Update generated deployments when src deployment updated existing annotations", func(t *testing.T) { ctx := context.TODO() - // Create shard set and src deployment - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } @@ -337,6 +414,9 @@ func TestReconciliation(t *testing.T) { }) test.AssertNoError(t, k8sClient.Create(ctx, srcDeployment)) defer k8sClient.Delete(ctx, srcDeployment) + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, k8sClient.Create(ctx, srcService)) + defer deleteObject(t, k8sClient, srcService) shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { set.Spec.Shards = []templatesv1.ShardSpec{ @@ -350,11 +430,11 @@ func TestReconciliation(t *testing.T) { }) test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) - defer deleteFluxShardSet(t, k8sClient, shardSet) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) reconcileAndReload(t, k8sClient, reconciler, shardSet) - shard1Deploy := test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-1"), func(d *appsv1.Deployment) { + shard1Deploy := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = test.ShardLabels("shard-1") d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", @@ -376,13 +456,12 @@ func TestReconciliation(t *testing.T) { // Update src deployment srcDeployment.Annotations = map[string]string{ - "deployment.kubernetes.io/revision": "1", - "test-annot": "test", + "testing-annotation": "test", } test.AssertNoError(t, k8sClient.Update(ctx, srcDeployment)) shard1Deploy.Annotations = map[string]string{ - "test-annot": "test", + "testing-annotation": "test", } reconcileAndReload(t, k8sClient, reconciler, shardSet) @@ -392,14 +471,13 @@ func TestReconciliation(t *testing.T) { t.Fatalf("generated deployments don't match expected, diff: %s", diff) } - test.AssertInventoryHasItems(t, shardSet, shard1Deploy) - + test.AssertInventoryHasItems(t, shardSet, shard1Deploy, test.NewService("kustomize-controller-shard-1")) }) t.Run("Update generated deployments when src deployment updated existing container image", func(t *testing.T) { ctx := context.TODO() // Create shard set and src deployment - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } @@ -408,6 +486,9 @@ func TestReconciliation(t *testing.T) { }) test.AssertNoError(t, k8sClient.Create(ctx, srcDeployment)) defer k8sClient.Delete(ctx, srcDeployment) + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, k8sClient.Create(ctx, srcService)) + defer deleteObject(t, k8sClient, srcService) shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { set.Spec.Shards = []templatesv1.ShardSpec{ @@ -420,10 +501,10 @@ func TestReconciliation(t *testing.T) { } }) test.AssertNoError(t, k8sClient.Create(ctx, shardSet)) - defer deleteFluxShardSet(t, k8sClient, shardSet) + defer test.DeleteFluxShardSet(t, k8sClient, shardSet) reconcileAndReload(t, k8sClient, reconciler, shardSet) - shard1Deploy := test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-1"), func(d *appsv1.Deployment) { + shard1Deploy := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = test.ShardLabels("shard-1") d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-1") d.Spec.Template.Spec.Containers[0].Args = []string{ @@ -446,17 +527,16 @@ func TestReconciliation(t *testing.T) { // Update src deployment container image version and shard1Deploy srcDeployment.Spec.Template.Spec.Containers[0].Image = "ghcr.io/fluxcd/kustomize-controller:v0.35.2" test.AssertNoError(t, k8sClient.Update(ctx, srcDeployment)) - - shard1Deploy.Spec.Template.Spec.Containers[0].Image = "ghcr.io/fluxcd/kustomize-controller:v0.35.2" reconcileAndReload(t, k8sClient, reconciler, shardSet) + shard1Deploy.Spec.Template.Spec.Containers[0].Image = "ghcr.io/fluxcd/kustomize-controller:v0.35.2" updatedGenDepl := &appsv1.Deployment{} test.AssertNoError(t, k8sClient.Get(ctx, client.ObjectKeyFromObject(shard1Deploy), updatedGenDepl)) if diff := cmp.Diff(updatedGenDepl, shard1Deploy, ignoreObjectMeta, ignoreTypeMeta); diff != "" { - t.Fatalf("generated deployments don't match expected, diff: %s", diff) + t.Fatalf("updated deployment doesn't match expected, diff: %s", diff) } - test.AssertInventoryHasItems(t, shardSet, shard1Deploy) + test.AssertInventoryHasItems(t, shardSet, shard1Deploy, test.NewService("kustomize-controller-shard-1")) }) } @@ -474,6 +554,20 @@ func assertDeploymentsExist(t *testing.T, cl client.Client, ns string, want ...s } } +func assertServicesExist(t *testing.T, cl client.Client, ns string, want ...string) { + t.Helper() + sl := &corev1.ServiceList{} + test.AssertNoError(t, cl.List(context.TODO(), sl, client.InNamespace(ns))) + + existingSvcs := []string{} + for _, svc := range sl.Items { + existingSvcs = append(existingSvcs, svc.Name) + } + if diff := cmp.Diff(want, existingSvcs); diff != "" { + t.Fatalf("didn't find services, got different names: \n%s", diff) + } +} + func assertDeploymentsDontExist(t *testing.T, cl client.Client, ns string, deps ...string) { t.Helper() d := &appsv1.DeploymentList{} @@ -498,6 +592,30 @@ func assertDeploymentsDontExist(t *testing.T, cl client.Client, ns string, deps } } +func assertServicesDontExist(t *testing.T, cl client.Client, ns string, svcs ...string) { + t.Helper() + sl := &corev1.ServiceList{} + test.AssertNoError(t, cl.List(context.TODO(), sl, client.InNamespace(ns))) + + existingSvcs := []string{} + for _, svc := range sl.Items { + existingSvcs = append(existingSvcs, svc.Name) + } + + matches := []string{} + for _, svc := range svcs { + for _, existingSvc := range existingSvcs { + if svc == existingSvc { + matches = append(matches, svc) + } + } + } + if len(matches) > 0 { + cmp.Diff(matches, []string{}) + t.Fatalf("found deployments that shouldn't be found:\n%s", cmp.Diff(matches, []string{})) + } +} + func assertFluxShardSetCondition(t *testing.T, shardset *templatesv1.FluxShardSet, condType, msg string) { t.Helper() cond := apimeta.FindStatusCondition(shardset.Status.Conditions, condType) @@ -530,31 +648,9 @@ func reconcileWithErrorAndReload(t *testing.T, cl client.Client, reconciler *Flu test.AssertNoError(t, cl.Get(ctx, client.ObjectKeyFromObject(shardSet), shardSet)) } -func deleteFluxShardSet(t *testing.T, cl client.Client, shardset *templatesv1.FluxShardSet) { - ctx := context.TODO() - t.Helper() - - test.AssertNoError(t, cl.Get(ctx, client.ObjectKeyFromObject(shardset), shardset)) - - if shardset.Status.Inventory != nil { - for _, v := range shardset.Status.Inventory.Entries { - d, err := deploymentFromResourceRef(v) - test.AssertNoError(t, err) - test.AssertNoError(t, cl.Delete(ctx, d)) - } - } - - test.AssertNoError(t, cl.Delete(ctx, shardset)) -} - -func nsn(namespace, name string) types.NamespacedName { - return types.NamespacedName{ - Name: name, - Namespace: namespace, - } -} - func deleteObject(t *testing.T, cl client.Client, obj client.Object) { t.Helper() - test.AssertNoError(t, cl.Delete(context.TODO(), obj)) + if err := cl.Delete(context.TODO(), obj); err != nil { + t.Fatalf("failed to delete object during cleanup: %s", err) + } } diff --git a/internal/deploys/deploys.go b/internal/deploys/deploys.go index f141e54..dbbe260 100644 --- a/internal/deploys/deploys.go +++ b/internal/deploys/deploys.go @@ -6,6 +6,7 @@ import ( "github.com/weaveworks/flux-shard-controller/api/v1alpha1" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -13,17 +14,97 @@ import ( const ( ignoreShardsSelector = "!sharding.fluxcd.io/key" ignoreShardsSelectorArg = "--watch-label-selector=" + ignoreShardsSelector + storageAdvAddressArg = "--storage-adv-addr=" shardsSelector = "sharding.fluxcd.io/key" ) +// GenerateDeployments creates list of new deployments to process the set of +// shards declared in the ShardSet. +func GenerateDeployments(fluxShardSet *v1alpha1.FluxShardSet, srcDeploy *appsv1.Deployment, srcSvc *corev1.Service) ([]client.Object, error) { + if !deploymentIgnoresShardLabels(srcDeploy) { + return nil, fmt.Errorf("deployment %s is not configured to ignore sharding", client.ObjectKeyFromObject(srcDeploy)) + } + shardResources := []client.Object{} + for _, shard := range fluxShardSet.Spec.Shards { + shardLabels := map[string]string{ + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": fluxShardSet.Name, + "templates.weave.works/shard": shard.Name, + "sharding.fluxcd.io/role": "shard", + } + + newDeployment := newDeploymentFromDeployment(*srcDeploy) + newDeploymentName := fmt.Sprintf("%s-%s", srcDeploy.ObjectMeta.Name, shard.Name) + err := updateNewDeployment(newDeployment, fluxShardSet.Name, shard.Name, srcDeploy.ObjectMeta.Name, newDeploymentName, shardLabels) + if err != nil { + return nil, err + } + shardResources = append(shardResources, newDeployment) + + if srcSvc != nil { + newSvc := newServiceFromService(*srcSvc, newDeploymentName, shardLabels, newDeployment.Spec.Template.ObjectMeta.Labels) + shardResources = append(shardResources, newSvc) + } + } + + return shardResources, nil +} + +func deploymentIgnoresShardLabels(deploy *appsv1.Deployment) bool { + for i := range deploy.Spec.Template.Spec.Containers { + container := deploy.Spec.Template.Spec.Containers[i] + for _, arg := range container.Args { + if arg == ignoreShardsSelectorArg { + return true + } + } + } + + return false +} + +func generateSelectorStr(key, shardSelector string, operator metav1.LabelSelectorOperator, values []string) (string, error) { + selector := &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: shardSelector, + Operator: operator, + Values: values, + }, + }, + } + + labelSelector, err := metav1.LabelSelectorAsSelector(selector) + if err != nil { + return "", fmt.Errorf("failed to generate label selector: %v", err) + } + + return fmt.Sprintf("%s=%s", key, labelSelector), nil +} + +func replaceArg(args []string, argName, newArg string) { + for i := range args { + if strings.HasPrefix(args[i], argName) { + args[i] = newArg + } + } +} + +func findStorageAddressArg(args []string) string { + for i := range args { + if strings.HasPrefix(args[i], storageAdvAddressArg) { + return args[i] + } + } + + return "" +} + // newDeploymentFromDeployment takes a Deployment loaded from the Cluster and // clears out the Metadata fields that are needed in the cluster. func newDeploymentFromDeployment(src appsv1.Deployment) *appsv1.Deployment { depl := src.DeepCopy() depl.CreationTimestamp = metav1.Time{} - if depl.Annotations == nil { - depl.Annotations = map[string]string{} - } // This is really a work around for the test cases // But it doesn't cost anything to do it here. @@ -43,21 +124,14 @@ func newDeploymentFromDeployment(src appsv1.Deployment) *appsv1.Deployment { } // updateNewDeployment updates the deployment with sharding related fields such as name and required labels -func updateNewDeployment(depl *appsv1.Deployment, shardSetName, shardName, newDeploymentName string) error { +func updateNewDeployment(depl *appsv1.Deployment, shardSetName, shardName, oldDeploymentName, newDeploymentName string, labels map[string]string) error { // Add sharding labels if depl.ObjectMeta.Labels == nil { depl.ObjectMeta.Labels = map[string]string{} } - shardLabels := map[string]string{ - "app.kubernetes.io/managed-by": "flux-shard-controller", - "templates.weave.works/shard-set": shardSetName, - "templates.weave.works/shard": shardName, - "sharding.fluxcd.io/role": "shard", - } - depl.ObjectMeta.Labels = merge( - shardLabels, + labels, depl.ObjectMeta.Labels, ) // generate selector args string @@ -70,10 +144,16 @@ func updateNewDeployment(depl *appsv1.Deployment, shardSetName, shardName, newDe if container.Args == nil { container.Args = []string{} } + + storageAdvAddress := "" + if storageAddr := findStorageAddressArg(container.Args); storageAddr != "" { + storageAdvAddress = strings.Replace(storageAddr, oldDeploymentName, newDeploymentName, 1) + } + if container.Name == "manager" { replaceArg(container.Args, ignoreShardsSelectorArg, selectorArgs) + replaceArg(container.Args, storageAdvAddressArg, storageAdvAddress) } - } // Update deployment name @@ -81,89 +161,62 @@ func updateNewDeployment(depl *appsv1.Deployment, shardSetName, shardName, newDe // This makes the selector and template labels match. depl.Spec.Selector.MatchLabels = merge( - shardLabels, + labels, depl.Spec.Selector.MatchLabels, ) depl.Spec.Template.ObjectMeta.Labels = merge( - shardLabels, + labels, depl.Spec.Template.ObjectMeta.Labels, ) return nil } -// return a copy off the "dest" map, with the elements of the "src" map applied -// over the top. -func merge[K comparable, V any](src, dest map[K]V) map[K]V { - merged := map[K]V{} - for k, v := range dest { - merged[k] = v - } - for k, v := range src { - merged[k] = v - } +// newServiceFromService takes a Service loaded from the Cluster and +// clears out the Metadata fields that are needed in the cluster. +func newServiceFromService(src corev1.Service, newName string, labels, selector map[string]string) *corev1.Service { + svc := src.DeepCopy() + svc.CreationTimestamp = metav1.Time{} - return merged -} + svc.ObjectMeta.Labels = merge( + labels, + svc.ObjectMeta.Labels, + ) -// GenerateDeployments creates list of new deployments to process the set of -// shards declared in the ShardSet. -func GenerateDeployments(fluxShardSet *v1alpha1.FluxShardSet, src *appsv1.Deployment) ([]*appsv1.Deployment, error) { - if !deploymentIgnoresShardLabels(src) { - return nil, fmt.Errorf("deployment %s is not configured to ignore sharding", client.ObjectKeyFromObject(src)) - } - generatedDeployments := []*appsv1.Deployment{} - for _, shard := range fluxShardSet.Spec.Shards { - deployment := newDeploymentFromDeployment(*src) - newDeploymentName := fmt.Sprintf("%s-%s", src.ObjectMeta.Name, shard.Name) - err := updateNewDeployment(deployment, fluxShardSet.Name, shard.Name, newDeploymentName) - if err != nil { - return nil, err - } + svc.Spec.Selector = selector + svc.Spec.ClusterIP = "" + svc.Spec.ClusterIPs = nil - generatedDeployments = append(generatedDeployments, deployment) + // This is really a work around for the test cases + // But it doesn't cost anything to do it here. + // https://github.com/kubernetes-sigs/controller-runtime/issues?q=is%3Aissue+typemeta+empty+is%3Aclosed+ + svc.TypeMeta = metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", } - return generatedDeployments, nil -} - -func deploymentIgnoresShardLabels(deploy *appsv1.Deployment) bool { - for i := range deploy.Spec.Template.Spec.Containers { - container := deploy.Spec.Template.Spec.Containers[i] - for _, arg := range container.Args { - if arg == ignoreShardsSelectorArg { - return true - } - } - } + delete(svc.Annotations, "deployment.kubernetes.io/revision") - return false -} + svc.ObjectMeta.Name = newName + svc.Generation = 0 + svc.ResourceVersion = "" + svc.UID = "" + svc.Status = corev1.ServiceStatus{} -func replaceArg(args []string, ignoreShardsSelectorArgs string, newArg string) { - for i := range args { - if strings.HasPrefix(args[i], ignoreShardsSelectorArgs) { - args[i] = newArg - } - } + return svc } -func generateSelectorStr(key, shardSelector string, operator metav1.LabelSelectorOperator, values []string) (string, error) { - selector := &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: shardSelector, - Operator: operator, - Values: values, - }, - }, +// return a copy of the "dest" map, with the elements of the "src" map applied +// over the top. +func merge[K comparable, V any](src, dest map[K]V) map[K]V { + merged := map[K]V{} + for k, v := range dest { + merged[k] = v } - - labelSelector, err := metav1.LabelSelectorAsSelector(selector) - if err != nil { - return "", fmt.Errorf("failed to generate label selector: %v", err) + for k, v := range src { + merged[k] = v } - return fmt.Sprintf("%s=%s", key, labelSelector), nil + return merged } diff --git a/internal/deploys/deploys_test.go b/internal/deploys/deploys_test.go index 10e09c7..22be6fc 100644 --- a/internal/deploys/deploys_test.go +++ b/internal/deploys/deploys_test.go @@ -9,14 +9,14 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" shardv1 "github.com/weaveworks/flux-shard-controller/api/v1alpha1" "github.com/weaveworks/flux-shard-controller/test" ) -const testControllerName = "kustomize-controller" +const testControllerName = "source-controller" func TestNewDeploymentFromDeployment(t *testing.T) { depl := loadDeploymentFixture(t, "testdata/kustomize-controller.yaml") @@ -34,7 +34,8 @@ func TestGenerateDeployments(t *testing.T) { name string fluxShardSet *shardv1.FluxShardSet src *appsv1.Deployment - wantDeps []*appsv1.Deployment + svc *corev1.Service + wantObjs []client.Object }{ { name: "generate when no shards are defined", @@ -46,12 +47,12 @@ func TestGenerateDeployments(t *testing.T) { Shards: []shardv1.ShardSpec{}, }, }, - src: newTestDeployment(func(d *appsv1.Deployment) { + src: test.NewDeployment(testControllerName, func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } }), - wantDeps: []*appsv1.Deployment{}, + wantObjs: []client.Object{}, }, { name: "generation when one shard is defined", @@ -70,44 +71,119 @@ func TestGenerateDeployments(t *testing.T) { }, }, }, - src: newTestDeployment(func(d *appsv1.Deployment) { - d.ObjectMeta.Name = "kustomize-controller" + src: test.NewDeployment(testControllerName, func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", + "--storage-adv-addr=source-controller.$(RUNTIME_NAMESPACE).svc.cluster.local.", } }), - wantDeps: []*appsv1.Deployment{ - newTestDeployment(func(d *appsv1.Deployment) { - d.Annotations = map[string]string{} - d.ObjectMeta.Labels = map[string]string{ - "sharding.fluxcd.io/role": "shard", - "app.kubernetes.io/managed-by": "flux-shard-controller", - "templates.weave.works/shard": "shard-1", - "templates.weave.works/shard-set": "test-shard-set", - } - d.ObjectMeta.Name = "kustomize-controller-shard-1" - d.Spec.Template.Spec.Containers[0].Args = []string{ - "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", - } - d.Spec.Selector = &metav1.LabelSelector{ - MatchLabels: map[string]string{ + svc: test.NewService("source-controller", func(svc *corev1.Service) { + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + } + }), + wantObjs: []client.Object{ + test.NewDeployment( + "source-controller-shard-1", + func(d *appsv1.Deployment) { + d.ObjectMeta.Labels = map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard": "shard-1", + "templates.weave.works/shard-set": "test-shard-set", + } + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", + "--storage-adv-addr=source-controller-shard-1.$(RUNTIME_NAMESPACE).svc.cluster.local.", + } + d.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": "test-shard-set", + "templates.weave.works/shard": "shard-1", + }, + } + d.Spec.Template.ObjectMeta.Labels = map[string]string{ "sharding.fluxcd.io/role": "shard", - "app": "kustomize-controller", + "app": "source-controller", "app.kubernetes.io/managed-by": "flux-shard-controller", "templates.weave.works/shard-set": "test-shard-set", "templates.weave.works/shard": "shard-1", - }, - } - d.Spec.Template.ObjectMeta.Labels = map[string]string{ + } + }), + test.NewService("source-controller-shard-1", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{ "sharding.fluxcd.io/role": "shard", - "app": "kustomize-controller", "app.kubernetes.io/managed-by": "flux-shard-controller", "templates.weave.works/shard-set": "test-shard-set", "templates.weave.works/shard": "shard-1", } + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "sharding.fluxcd.io/role": "shard", + "templates.weave.works/shard": "shard-1", + "templates.weave.works/shard-set": "test-shard-set", + } }), }, }, + { + name: "generation when source controller has no service", + fluxShardSet: &shardv1.FluxShardSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-shard-set", + }, + Spec: shardv1.FluxShardSetSpec{ + SourceDeploymentRef: shardv1.SourceDeploymentReference{ + Name: testControllerName, + }, + Shards: []shardv1.ShardSpec{ + { + Name: "shard-1", + }, + }, + }, + }, + src: test.NewDeployment(testControllerName, func(d *appsv1.Deployment) { + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-label-selector=!sharding.fluxcd.io/key", + } + }), + wantObjs: []client.Object{ + test.NewDeployment( + "source-controller-shard-1", + func(d *appsv1.Deployment) { + d.ObjectMeta.Labels = map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard": "shard-1", + "templates.weave.works/shard-set": "test-shard-set", + } + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", + } + d.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": "test-shard-set", + "templates.weave.works/shard": "shard-1", + }, + } + d.Spec.Template.ObjectMeta.Labels = map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": "test-shard-set", + "templates.weave.works/shard": "shard-1", + } + }), + }, + }, { name: "generation when two shards is defined", fluxShardSet: &shardv1.FluxShardSet{ @@ -128,44 +204,78 @@ func TestGenerateDeployments(t *testing.T) { }, }, }, - src: newTestDeployment(func(d *appsv1.Deployment) { + src: test.NewDeployment(testControllerName, func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } }), - wantDeps: []*appsv1.Deployment{ - newTestDeployment(func(d *appsv1.Deployment) { - d.Annotations = map[string]string{} + svc: test.NewService("source-controller", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{} + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + } + }), + wantObjs: []client.Object{ + test.NewDeployment("source-controller-shard-a", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = test.ShardLabels("shard-a") - d.ObjectMeta.Name = "kustomize-controller-shard-a" d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=sharding.fluxcd.io/key in (shard-a)", } d.Spec.Selector = &metav1.LabelSelector{ MatchLabels: test.ShardLabels("shard-a", map[string]string{ - "app": "kustomize-controller", + "app": "source-controller", }), } d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-a", map[string]string{ - "app": "kustomize-controller", + "app": "source-controller", }) }), - newTestDeployment(func(d *appsv1.Deployment) { - d.Annotations = map[string]string{} + test.NewService("source-controller-shard-a", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": "test-shard-set", + "templates.weave.works/shard": "shard-a", + } + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "sharding.fluxcd.io/role": "shard", + "templates.weave.works/shard": "shard-a", + "templates.weave.works/shard-set": "test-shard-set", + } + }), + test.NewDeployment("source-controller-shard-b", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = test.ShardLabels("shard-b") - d.ObjectMeta.Name = "kustomize-controller-shard-b" + d.ObjectMeta.Name = "source-controller-shard-b" d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=sharding.fluxcd.io/key in (shard-b)", } d.Spec.Selector = &metav1.LabelSelector{ MatchLabels: test.ShardLabels("shard-b", map[string]string{ - "app": "kustomize-controller", + "app": "source-controller", }), } d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-b", map[string]string{ - "app": "kustomize-controller", + "app": "source-controller", }) }), + test.NewService("source-controller-shard-b", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": "test-shard-set", + "templates.weave.works/shard": "shard-b", + } + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "sharding.fluxcd.io/role": "shard", + "templates.weave.works/shard": "shard-b", + "templates.weave.works/shard-set": "test-shard-set", + } + }), }, }, { @@ -185,29 +295,52 @@ func TestGenerateDeployments(t *testing.T) { }, }, }, - src: newTestDeployment(func(d *appsv1.Deployment) { - d.Spec.Template.Spec.Containers[0].Args = []string{ - "--watch-all-namespaces=true", - "--watch-label-selector=!sharding.fluxcd.io/key", - } - }), - wantDeps: []*appsv1.Deployment{ - newTestDeployment(func(d *appsv1.Deployment) { - d.Annotations = map[string]string{} - d.ObjectMeta.Labels = test.ShardLabels("shard-1") - d.ObjectMeta.Name = "kustomize-controller-shard-1" + src: test.NewDeployment( + testControllerName, + func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-all-namespaces=true", - "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", + "--watch-label-selector=!sharding.fluxcd.io/key", } - d.Spec.Selector = &metav1.LabelSelector{ - MatchLabels: test.ShardLabels("shard-1", map[string]string{ - "app": "kustomize-controller", - }), + }), + svc: test.NewService("source-controller", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{} + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + } + }), + wantObjs: []client.Object{ + test.NewDeployment( + "source-controller-shard-1", + func(d *appsv1.Deployment) { + d.ObjectMeta.Labels = test.ShardLabels("shard-1") + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-all-namespaces=true", + "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", + } + d.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: test.ShardLabels("shard-1", map[string]string{ + "app": "source-controller", + }), + } + d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-1", map[string]string{ + "app": "source-controller", + }) + }), + test.NewService("source-controller-shard-1", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": "test-shard-set", + "templates.weave.works/shard": "shard-1", + } + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "sharding.fluxcd.io/role": "shard", + "templates.weave.works/shard": "shard-1", + "templates.weave.works/shard-set": "test-shard-set", } - d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-1", map[string]string{ - "app": "kustomize-controller", - }) }), }, }, @@ -228,34 +361,57 @@ func TestGenerateDeployments(t *testing.T) { }, }, }, - src: newTestDeployment(func(d *appsv1.Deployment) { - d.ObjectMeta.Name = "kustomize-controller" - d.Spec.Template.Spec.Containers[0].Args = []string{ - "--watch-label-selector=!sharding.fluxcd.io/key", - } - d.Annotations = map[string]string{ - "test-annot": "test", - "deployment.kubernetes.io/revision": "test", - } - }), - wantDeps: []*appsv1.Deployment{ - newTestDeployment(func(d *appsv1.Deployment) { + src: test.NewDeployment( + "source-controller", + func(d *appsv1.Deployment) { + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-label-selector=!sharding.fluxcd.io/key", + } d.Annotations = map[string]string{ - "test-annot": "test", + "test-annot": "test", + "deployment.kubernetes.io/revision": "test", } - d.ObjectMeta.Labels = test.ShardLabels("shard-1") - d.ObjectMeta.Name = "kustomize-controller-shard-1" - d.Spec.Template.Spec.Containers[0].Args = []string{ - "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", + }), + svc: test.NewService("source-controller", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{} + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + } + }), + wantObjs: []client.Object{ + test.NewDeployment( + "source-controller-shard-1", + func(d *appsv1.Deployment) { + d.Annotations = map[string]string{ + "test-annot": "test", + } + d.ObjectMeta.Labels = test.ShardLabels("shard-1") + d.Spec.Template.Spec.Containers[0].Args = []string{ + "--watch-label-selector=sharding.fluxcd.io/key in (shard-1)", + } + d.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: test.ShardLabels("shard-1", map[string]string{ + "app": "source-controller", + }), + } + d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-1", map[string]string{ + "app": "source-controller", + }) + }), + test.NewService("source-controller-shard-1", func(svc *corev1.Service) { + svc.ObjectMeta.Labels = map[string]string{ + "sharding.fluxcd.io/role": "shard", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "templates.weave.works/shard-set": "test-shard-set", + "templates.weave.works/shard": "shard-1", } - d.Spec.Selector = &metav1.LabelSelector{ - MatchLabels: test.ShardLabels("shard-1", map[string]string{ - "app": "kustomize-controller", - }), + svc.Spec.Selector = map[string]string{ + "app": "source-controller", + "app.kubernetes.io/managed-by": "flux-shard-controller", + "sharding.fluxcd.io/role": "shard", + "templates.weave.works/shard": "shard-1", + "templates.weave.works/shard-set": "test-shard-set", } - d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-1", map[string]string{ - "app": "kustomize-controller", - }) }), }, }, @@ -263,12 +419,12 @@ func TestGenerateDeployments(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - generatedDeps, err := GenerateDeployments(tt.fluxShardSet, tt.src) + generatedDeps, err := GenerateDeployments(tt.fluxShardSet, tt.src, tt.svc) if err != nil { t.Fatal(err) } - if diff := cmp.Diff(tt.wantDeps, generatedDeps); diff != "" { + if diff := cmp.Diff(tt.wantObjs, generatedDeps); diff != "" { t.Fatalf("generated deployments dont match wanted: \n%s", diff) } }) @@ -286,14 +442,14 @@ func TestGenerateDeployments_errors(t *testing.T) { { // The deployment does not have --watch-label-selector= name: "deployment does not have sharding args", - src: newTestDeployment(), - wantErr: "deployment flux-system/kustomize-controller is not configured to ignore sharding", + src: test.NewDeployment("source-controller"), + wantErr: "deployment flux-system/source-controller is not configured to ignore sharding", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := GenerateDeployments(tt.fluxShardSet, tt.src) + _, err := GenerateDeployments(tt.fluxShardSet, tt.src, nil) if msg := err.Error(); msg != tt.wantErr { t.Fatalf("wanted error %q, got %q", tt.wantErr, msg) @@ -302,54 +458,6 @@ func TestGenerateDeployments_errors(t *testing.T) { } } -func newTestDeployment(opts ...func(*appsv1.Deployment)) *appsv1.Deployment { - deploy := &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - Kind: "Deployment", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: testControllerName, - Namespace: "flux-system", - }, - Spec: appsv1.DeploymentSpec{ - Replicas: pointer.Int32(1), - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "kustomize-controller", - }, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": "kustomize-controller", - }, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "manager", - Args: []string{ - "--log-level=info", - "--log-encoding=json", - "--enable-leader-election", - }, - Image: "ghcr.io/fluxcd/kustomize-controller:v0.35.1", - }, - }, - ServiceAccountName: "kustomize-controller", - }, - }, - }, - } - - for _, opt := range opts { - opt(deploy) - } - - return deploy -} - // This test is a test of the LabelSelector mechanism and could be removed. func TestLabelSelectorShards(t *testing.T) { selectorTests := []struct { diff --git a/test/cleanup.go b/test/cleanup.go new file mode 100644 index 0000000..77cc8b9 --- /dev/null +++ b/test/cleanup.go @@ -0,0 +1,46 @@ +package test + +import ( + "context" + "fmt" + "testing" + + templatesv1 "github.com/weaveworks/flux-shard-controller/api/v1alpha1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/cli-utils/pkg/object" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// DeleteFluxShardSet will remove all the resources in the inventory and remove +// the resource. +// This is only needed in tests because Owned resources are not automatically +// cleaned up. +func DeleteFluxShardSet(t *testing.T, cl client.Client, shardset *templatesv1.FluxShardSet) { + ctx := context.TODO() + t.Helper() + + AssertNoError(t, cl.Get(ctx, client.ObjectKeyFromObject(shardset), shardset)) + + if shardset.Status.Inventory != nil { + for _, v := range shardset.Status.Inventory.Entries { + d, err := unstructuredFromResourceRef(v) + AssertNoError(t, err) + AssertNoError(t, cl.Delete(ctx, d)) + } + } + + AssertNoError(t, cl.Delete(ctx, shardset)) +} + +func unstructuredFromResourceRef(ref templatesv1.ResourceRef) (*unstructured.Unstructured, error) { + objMeta, err := object.ParseObjMetadata(ref.ID) + if err != nil { + return nil, fmt.Errorf("failed to parse object ID %s: %w", ref.ID, err) + } + u := unstructured.Unstructured{} + u.SetGroupVersionKind(objMeta.GroupKind.WithVersion(ref.Version)) + u.SetName(objMeta.Name) + u.SetNamespace(objMeta.Namespace) + + return &u, nil +} diff --git a/test/deployment.go b/test/deployment.go index 0c9027f..24d716c 100644 --- a/test/deployment.go +++ b/test/deployment.go @@ -5,21 +5,24 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" ) -// MakeTestDeployment creates a new Deployment and apply the opts to it. -func MakeTestDeployment(name types.NamespacedName, opts ...func(*appsv1.Deployment)) *appsv1.Deployment { +// DefaultNamespace is the namespace used for new resources, this can be +// overridden via an option.' +const DefaultNamespace = "flux-system" + +// NewDeployment creates a new Deployment and apply the opts to it. +func NewDeployment(name string, opts ...func(*appsv1.Deployment)) *appsv1.Deployment { deploy := &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ Kind: "Deployment", APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: name.Name, - Namespace: name.Namespace, + Name: name, + Namespace: DefaultNamespace, }, Spec: appsv1.DeploymentSpec{ Replicas: pointer.Int32(1), @@ -27,7 +30,7 @@ func MakeTestDeployment(name types.NamespacedName, opts ...func(*appsv1.Deployme RevisionHistoryLimit: pointer.Int32(10), Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": name.Name, + "app": name, }, }, Strategy: appsv1.DeploymentStrategy{ @@ -44,7 +47,7 @@ func MakeTestDeployment(name types.NamespacedName, opts ...func(*appsv1.Deployme "prometheus.io/scrape": "true", }, Labels: map[string]string{ - "app": name.Name, + "app": name, }, }, Spec: corev1.PodSpec{ @@ -156,3 +159,46 @@ func MakeTestDeployment(name types.NamespacedName, opts ...func(*appsv1.Deployme return deploy } + +// NewServiceForDeployment creates a new Service with the correct labels +// for a Deployment and applies the opts to it. +func NewServiceForDeployment(deploy *appsv1.Deployment, opts ...func(*corev1.Service)) *corev1.Service { + return NewService(deploy.GetName(), append(opts, func(svc *corev1.Service) { + svc.ObjectMeta.Namespace = deploy.GetNamespace() + svc.Spec.Selector = deploy.Spec.Selector.MatchLabels + })...) +} + +// NewService creates and returns a new Service. +func NewService(name string, opts ...func(*corev1.Service)) *corev1.Service { + svc := &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: DefaultNamespace, + }, + Spec: corev1.ServiceSpec{ + IPFamilies: []corev1.IPFamily{ + corev1.IPv4Protocol, + }, + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{ + { + Port: 80, + Name: "http", + TargetPort: intstr.FromString("http"), + }, + }, + }, + } + + for _, opt := range opts { + opt(svc) + } + + return svc + +} diff --git a/test/shardset.go b/test/shardset.go index ea72e8f..d74ef7f 100644 --- a/test/shardset.go +++ b/test/shardset.go @@ -12,7 +12,7 @@ func NewFluxShardSet(opts ...func(*templatesv1.FluxShardSet)) *templatesv1.FluxS fluxshardset := &templatesv1.FluxShardSet{ ObjectMeta: metav1.ObjectMeta{ Name: "test-shard-set", - Namespace: "default", + Namespace: DefaultNamespace, }, Spec: templatesv1.FluxShardSetSpec{}, } diff --git a/tests/e2e/fluxshardset_controller_test.go b/tests/e2e/fluxshardset_controller_test.go index f0e6f65..30dda4f 100644 --- a/tests/e2e/fluxshardset_controller_test.go +++ b/tests/e2e/fluxshardset_controller_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "fmt" "regexp" "sort" "testing" @@ -11,11 +12,12 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" @@ -27,18 +29,24 @@ var ignoreObjectMeta = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "UID", "OwnerRe func TestCreatingDeployments(t *testing.T) { ctx := context.TODO() - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } }) test.AssertNoError(t, testEnv.Create(ctx, srcDeployment)) - defer func() { - test.AssertNoError(t, testEnv.Get(ctx, client.ObjectKeyFromObject(srcDeployment), srcDeployment)) - deleteObject(t, testEnv, srcDeployment) - }() + defer deleteObject(t, testEnv, srcDeployment) + + srcService := test.NewService("kustomize-controller", func(svc *corev1.Service) { + svc.Spec.Selector = map[string]string{ + "app": "kustomize-controller", + } + }) + test.AssertNoError(t, testEnv.Create(ctx, srcService)) + defer deleteObject(t, testEnv, srcService) shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { + set.ObjectMeta.Namespace = srcDeployment.GetNamespace() set.Spec.Shards = []templatesv1.ShardSpec{ { Name: "shard-1", @@ -50,10 +58,11 @@ func TestCreatingDeployments(t *testing.T) { }) test.AssertNoError(t, testEnv.Create(ctx, shardSet)) - defer deleteShardSetAndWaitForNotFound(t, testEnv, shardSet) + defer deleteFluxShardSetAndWaitForNotFound(t, testEnv, shardSet) - waitForFluxShardSetCondition(t, testEnv, shardSet, `1 shard\(s\) created`) - want := test.MakeTestDeployment(nsn(srcDeployment.GetNamespace(), "kustomize-controller-shard-1"), func(d *appsv1.Deployment) { + waitForFluxShardSetCondition(t, testEnv, shardSet, "2 resources created") + want := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { + d.ObjectMeta.Namespace = srcDeployment.GetNamespace() d.ObjectMeta.Labels = test.ShardLabels("shard-1") d.Spec.Selector = &metav1.LabelSelector{ MatchLabels: test.ShardLabels("shard-1", map[string]string{ @@ -77,17 +86,18 @@ func TestCreatingDeployments(t *testing.T) { func TestUpdatingDeployments(t *testing.T) { ctx := context.TODO() - srcDeployment := test.MakeTestDeployment(nsn("default", "kustomize-controller"), func(d *appsv1.Deployment) { + srcDeployment := test.NewDeployment("kustomize-controller", func(d *appsv1.Deployment) { d.Spec.Template.Spec.Containers[0].Args = []string{ "--watch-label-selector=!sharding.fluxcd.io/key", } d.Spec.Template.Spec.Containers[0].Image = "ghcr.io/fluxcd/kustomize-controller:v0.35.0" }) test.AssertNoError(t, testEnv.Create(ctx, srcDeployment)) - defer func() { - test.AssertNoError(t, testEnv.Get(ctx, client.ObjectKeyFromObject(srcDeployment), srcDeployment)) - deleteObject(t, testEnv, srcDeployment) - }() + defer deleteObject(t, testEnv, srcDeployment) + + srcService := test.NewService("kustomize-controller") + test.AssertNoError(t, testEnv.Create(ctx, srcService)) + defer deleteObject(t, testEnv, srcService) shardSet := test.NewFluxShardSet(func(set *templatesv1.FluxShardSet) { set.Spec.Shards = []templatesv1.ShardSpec{ @@ -101,9 +111,9 @@ func TestUpdatingDeployments(t *testing.T) { }) test.AssertNoError(t, testEnv.Create(ctx, shardSet)) - defer deleteShardSetAndWaitForNotFound(t, testEnv, shardSet) - waitForFluxShardSetCondition(t, testEnv, shardSet, `1 shard\(s\) created`) - waitForFluxShardSetInventory(t, testEnv, shardSet, test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-1"))) + defer deleteFluxShardSetAndWaitForNotFound(t, testEnv, shardSet) + waitForFluxShardSetCondition(t, testEnv, shardSet, `2 resources created`) + waitForFluxShardSetInventory(t, testEnv, shardSet, test.NewDeployment("kustomize-controller-shard-1"), test.NewService("kustomize-controller-shard-1")) test.AssertNoError(t, testEnv.Get(ctx, client.ObjectKeyFromObject(srcDeployment), srcDeployment)) srcDeployment.Spec.Template.Spec.Containers[0].Image = "ghcr.io/fluxcd/kustomize-controller:v0.35.2" @@ -111,7 +121,7 @@ func TestUpdatingDeployments(t *testing.T) { test.AssertNoError(t, testEnv.Get(ctx, client.ObjectKeyFromObject(srcDeployment), srcDeployment)) - shard1Deploy := test.MakeTestDeployment(nsn("default", "kustomize-controller-shard-1"), func(d *appsv1.Deployment) { + shard1Deploy := test.NewDeployment("kustomize-controller-shard-1", func(d *appsv1.Deployment) { d.ObjectMeta.Labels = test.ShardLabels("shard-1") d.Spec.Template.ObjectMeta.Labels = test.ShardLabels("shard-1") d.Spec.Template.Spec.Containers[0].Args = []string{ @@ -179,9 +189,9 @@ func waitForFluxShardSetCondition(t *testing.T, k8sClient client.Client, set *te t.Fatal(err) } - if !match { - t.Logf("failed to match %q to %q", message, cond.Message) - } + // if !match { + // t.Logf("failed to match %q to %q", message, cond.Message) + // } return match }, timeout).Should(gomega.BeTrue()) } @@ -210,30 +220,10 @@ func deleteObject(t *testing.T, cl client.Client, obj client.Object) { // Owned resources are not automatically deleted in the testenv setup. // This cleans the resources from the inventory, and then removes the Shard Set // and waits for it to be gone. -func deleteShardSetAndWaitForNotFound(t *testing.T, cl client.Client, set *templatesv1.FluxShardSet) { +func deleteFluxShardSetAndWaitForNotFound(t *testing.T, cl client.Client, set *templatesv1.FluxShardSet) { t.Helper() ctx := context.TODO() - test.AssertNoError(t, cl.Get(ctx, client.ObjectKeyFromObject(set), set)) - - if set.Status.Inventory != nil { - for _, v := range set.Status.Inventory.Entries { - t.Logf("deleting %s", v.ID) - objMeta, err := object.ParseObjMetadata(v.ID) - if err != nil { - t.Logf("failed to delete resource: %s", v.ID) - t.Errorf("failed to delete resource ref %s when cleaning up", v.ID) - continue - } - var deploy appsv1.Deployment - test.AssertNoError(t, cl.Get(ctx, client.ObjectKey{Name: objMeta.Name, Namespace: objMeta.Namespace}, &deploy)) - - if err := cl.Delete(ctx, &deploy); err != nil { - t.Errorf("failed to delete deployment %+v when cleaning up", deploy.ObjectMeta) - } - } - } - - deleteObject(t, cl, set) + test.DeleteFluxShardSet(t, cl, set) g := gomega.NewWithT(t) g.Eventually(func() bool { @@ -253,9 +243,15 @@ func deleteShardSetAndWaitForNotFound(t *testing.T, cl client.Client, set *templ } } -func nsn(namespace, name string) client.ObjectKey { - return types.NamespacedName{ - Name: name, - Namespace: namespace, +func unstructuredFromResourceRef(ref templatesv1.ResourceRef) (*unstructured.Unstructured, error) { + objMeta, err := object.ParseObjMetadata(ref.ID) + if err != nil { + return nil, fmt.Errorf("failed to parse object ID %s: %w", ref.ID, err) } + u := unstructured.Unstructured{} + u.SetGroupVersionKind(objMeta.GroupKind.WithVersion(ref.Version)) + u.SetName(objMeta.Name) + u.SetNamespace(objMeta.Namespace) + + return &u, nil } diff --git a/tests/e2e/main_test.go b/tests/e2e/main_test.go index a5bec03..5a96bb6 100644 --- a/tests/e2e/main_test.go +++ b/tests/e2e/main_test.go @@ -1,6 +1,7 @@ package tests import ( + "context" "fmt" "os" "path/filepath" @@ -9,6 +10,7 @@ import ( "github.com/fluxcd/pkg/runtime/testenv" templatesv1 "github.com/weaveworks/flux-shard-controller/api/v1alpha1" + "github.com/weaveworks/flux-shard-controller/test" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes/scheme" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -33,6 +35,10 @@ func TestMain(m *testing.M) { filepath.Join("..", "..", "config", "crd", "bases"), )) + if err := testEnv.Create(context.TODO(), test.NewNamespace("flux-system")); err != nil { + panic(fmt.Sprintf("failed to create namespace flux-system: %s", err)) + } + if err := (&controller.FluxShardSetReconciler{ Client: testEnv, Scheme: testEnv.GetScheme(),