From e4b4ebcdfb71b47f2e38bb6f98cab7b30c5bdd22 Mon Sep 17 00:00:00 2001 From: Aitor Perez Cedres Date: Mon, 11 Dec 2023 17:02:26 +0000 Subject: [PATCH] Refactor integration tests: controller isolation Controllers now are scoped to a dedicated namespace. Controllers now are started and stopped for each test case. This provides isolation from other test suites, ensuring there's no environment pollution. We also bumped Kubernetes to 1.26, because this version had some changes regarding Pod Admission Controllers, and it will be the minimum supported Kubernetes version in the next minor release. Signed-off-by: Aitor Perez Cedres --- Makefile | 2 +- api/v1beta1/user_types_test.go | 2 +- controllers/binding_controller_test.go | 85 ++++- controllers/exchange_controller_test.go | 94 ++++-- controllers/federation_controller_test.go | 144 ++++++--- controllers/permission_controller_test.go | 154 ++++++--- controllers/policy_controller_test.go | 142 ++++++--- controllers/queue_controller_test.go | 138 ++++++--- .../schemareplication_controller_test.go | 114 +++++-- controllers/shovel_controller_test.go | 168 +++++++--- controllers/suite_test.go | 270 ++++++---------- controllers/super_stream_controller_test.go | 262 +++++++++------- .../topicpermission_controller_test.go | 291 ++++++++++++------ controllers/topology_controller_test.go | 51 +-- controllers/user_controller_test.go | 152 ++++++--- controllers/vhost_controller_test.go | 144 ++++++--- go.sum | 4 + internal/binding_test.go | 1 + 18 files changed, 1471 insertions(+), 747 deletions(-) diff --git a/Makefile b/Makefile index 1b28eb37..c3c2fcba 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ install-tools: @$(get_mod_code_generator) go install golang.org/x/vuln/cmd/govulncheck@latest -ENVTEST_K8S_VERSION = 1.22.1 +ENVTEST_K8S_VERSION = 1.26.1 ARCHITECTURE = amd64 LOCAL_TESTBIN = $(CURDIR)/testbin diff --git a/api/v1beta1/user_types_test.go b/api/v1beta1/user_types_test.go index 9c44b534..4a416584 100644 --- a/api/v1beta1/user_types_test.go +++ b/api/v1beta1/user_types_test.go @@ -87,7 +87,7 @@ var _ = Describe("user spec", func() { username = "invalid-user" }) It("fails to create the user", func() { - Expect(k8sClient.Create(ctx, &user)).To(MatchError(`User.rabbitmq.com "invalid-user" is invalid: spec.tags: Unsupported value: "invalid": supported values: "management", "policymaker", "monitoring", "administrator"`)) + Expect(k8sClient.Create(ctx, &user)).To(MatchError(`User.rabbitmq.com "invalid-user" is invalid: spec.tags[1]: Unsupported value: "invalid": supported values: "management", "policymaker", "monitoring", "administrator"`)) }) }) }) diff --git a/controllers/binding_controller_test.go b/controllers/binding_controller_test.go index 94d8c4b5..05d5b79f 100644 --- a/controllers/binding_controller_test.go +++ b/controllers/binding_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("bindingController", func() { - var binding topology.Binding - var bindingName string + var ( + binding topology.Binding + bindingName string + bindingMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + bindingMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{bindingNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(bindingMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = bindingMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: bindingMgr.GetClient(), + Type: &topology.Binding{}, + Scheme: bindingMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.BindingReconciler{}, + }).SetupWithManager(bindingMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { binding = topology.Binding{ ObjectMeta: metav1.ObjectMeta{ Name: bindingName, - Namespace: "default", + Namespace: bindingNamespace, }, Spec: topology.BindingSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,9 +101,9 @@ var _ = Describe("bindingController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Create(ctx, &binding)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &binding, @@ -71,9 +126,9 @@ var _ = Describe("bindingController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Create(ctx, &binding)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &binding, @@ -96,9 +151,9 @@ var _ = Describe("bindingController", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Create(ctx, &binding)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &binding, @@ -118,14 +173,14 @@ var _ = Describe("bindingController", func() { fakeRabbitMQClient.DeleteBindingReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &binding)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete binding")) @@ -139,9 +194,9 @@ var _ = Describe("bindingController", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &binding)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) return apierrors.IsNotFound(err) }, 5).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete binding")) diff --git a/controllers/exchange_controller_test.go b/controllers/exchange_controller_test.go index abdb63fb..0d016fff 100644 --- a/controllers/exchange_controller_test.go +++ b/controllers/exchange_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,64 @@ import ( ) var _ = Describe("exchange-controller", func() { - var exchange topology.Exchange - var exchangeName string + var ( + exchange topology.Exchange + exchangeName string + exchangeMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + exchangeMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{exchangeNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(exchangeMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = exchangeMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: exchangeMgr.GetClient(), + Type: &topology.Exchange{}, + Scheme: exchangeMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.ExchangeReconciler{}, + }).SetupWithManager(exchangeMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { + // this will be executed after all BeforeEach have run exchange = topology.Exchange{ ObjectMeta: metav1.ObjectMeta{ Name: exchangeName, - Namespace: "default", + Namespace: exchangeNamespace, }, Spec: topology.ExchangeSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,9 +102,9 @@ var _ = Describe("exchange-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &exchange, @@ -71,9 +127,9 @@ var _ = Describe("exchange-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &exchange, @@ -99,9 +155,9 @@ var _ = Describe("exchange-controller", func() { }) It("changes only if status changes", func() { By("setting LastTransitionTime when transitioning to status Ready=true") - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Namespace: exchange.Namespace, Name: exchange.Name}, &exchange, @@ -120,9 +176,9 @@ var _ = Describe("exchange-controller", func() { StatusCode: http.StatusNoContent, }, nil) exchange.Labels = map[string]string{"k1": "v1"} - Expect(client.Update(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Update(ctx, &exchange)).To(Succeed()) ConsistentlyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Namespace: exchange.Namespace, Name: exchange.Name}, &exchange, @@ -140,9 +196,9 @@ var _ = Describe("exchange-controller", func() { StatusCode: http.StatusInternalServerError, }, errors.New("something went wrong")) exchange.Labels = map[string]string{"k1": "v2"} - Expect(client.Update(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Update(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Namespace: exchange.Namespace, Name: exchange.Name}, &exchange, @@ -164,9 +220,9 @@ var _ = Describe("exchange-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &exchange, @@ -191,9 +247,9 @@ var _ = Describe("exchange-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete exchange")) @@ -207,9 +263,9 @@ var _ = Describe("exchange-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete exchange")) diff --git a/controllers/federation_controller_test.go b/controllers/federation_controller_test.go index 5e94cf33..42e12141 100644 --- a/controllers/federation_controller_test.go +++ b/controllers/federation_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("federation-controller", func() { - var federation topology.Federation - var federationName string + var ( + federation topology.Federation + federationName string + federationMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + federationMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{federationNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(federationMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = federationMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: federationMgr.GetClient(), + Type: &topology.Federation{}, + Scheme: federationMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.FederationReconciler{Client: federationMgr.GetClient()}, + }).SetupWithManager(federationMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { federation = topology.Federation{ ObjectMeta: metav1.ObjectMeta{ Name: federationName, - Namespace: "default", + Namespace: federationNamespace, }, Spec: topology.FederationSpec{ Name: "my-federation-upstream", @@ -49,21 +104,24 @@ var _ = Describe("federation-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &federation)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation, ) return federation.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some HTTP error"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some HTTP error"), + }))) }) }) @@ -74,21 +132,24 @@ var _ = Describe("federation-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &federation)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation, ) return federation.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some go failure here"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some go failure here"), + }))) }) }) }) @@ -99,20 +160,23 @@ var _ = Describe("federation-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &federation)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation, ) return federation.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -121,16 +185,19 @@ var _ = Describe("federation-controller", func() { fakeRabbitMQClient.DeleteFederationUpstreamReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &federation)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &federation)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete federation")) }) }) @@ -142,11 +209,14 @@ var _ = Describe("federation-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &federation)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &federation)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete federation")) }) }) diff --git a/controllers/permission_controller_test.go b/controllers/permission_controller_test.go index c54de3a3..37c0c1e5 100644 --- a/controllers/permission_controller_test.go +++ b/controllers/permission_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,17 +24,66 @@ import ( ) var _ = Describe("permission-controller", func() { - var permission topology.Permission - var user topology.User - var permissionName string - var userName string + var ( + permission topology.Permission + user topology.User + permissionName string + userName string + permissionMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + permissionMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{permissionNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(permissionMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = permissionMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: permissionMgr.GetClient(), + Type: &topology.Permission{}, + Scheme: permissionMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.PermissionReconciler{Client: permissionMgr.GetClient(), Scheme: permissionMgr.GetScheme()}, + }).SetupWithManager(permissionMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) When("validating RabbitMQ Client failures with username", func() { JustBeforeEach(func() { permission = topology.Permission{ ObjectMeta: metav1.ObjectMeta{ Name: permissionName, - Namespace: "default", + Namespace: permissionNamespace, }, Spec: topology.PermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -51,9 +106,9 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -76,9 +131,9 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -101,9 +156,9 @@ var _ = Describe("permission-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -128,9 +183,9 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete permission")) @@ -144,9 +199,9 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete permission")) @@ -160,24 +215,24 @@ var _ = Describe("permission-controller", func() { user = topology.User{ ObjectMeta: metav1.ObjectMeta{ Name: userName, - Namespace: "default", + Namespace: permissionNamespace, }, Spec: topology.UserSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: permissionNamespace, }, }, } permission = topology.Permission{ ObjectMeta: metav1.ObjectMeta{ Name: permissionName, - Namespace: "default", + Namespace: permissionNamespace, }, Spec: topology.PermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: permissionNamespace, }, UserReference: &corev1.LocalObjectReference{ Name: userName, @@ -211,9 +266,9 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -236,10 +291,12 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -257,10 +314,12 @@ var _ = Describe("permission-controller", func() { Context("deletion", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -274,22 +333,25 @@ var _ = Describe("permission-controller", func() { }))) }) - When("Secret User is removed first", func() { + When("Secret for User credential does not exist", func() { BeforeEach(func() { permissionName = "test-with-userref-delete-secret" userName = "example-delete-secret-first" }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: user.Name + "-user-credentials", - Namespace: user.Namespace, - }, - })).To(Succeed()) - Expect(client.Delete(ctx, &permission)).To(Succeed()) + // We used to delete a Secret right here, before this PR: + // https://github.com/rabbitmq/messaging-topology-operator/pull/710 + // + // That PR refactored tests and provided controller isolation in tests, so that + // other controllers i.e. User controller, won't interfere with resources + // created/deleted/modified as part of this suite. Therefore, we don't need to + // delete the Secret objects because, after PR 710, the Secret object is never + // created, which meets the point of this test: when the Secret does not exist + // and Permission is deleted + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) observedEvents := observedEvents() @@ -305,14 +367,14 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) observedEvents := observedEvents() @@ -328,9 +390,9 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) observedEvents := observedEvents() @@ -347,11 +409,13 @@ var _ = Describe("permission-controller", func() { }) It("sets the correct deletion ownerref to the object", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) Eventually(func() []metav1.OwnerReference { var fetched topology.Permission - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &fetched) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &fetched) if err != nil { return []metav1.OwnerReference{} } diff --git a/controllers/policy_controller_test.go b/controllers/policy_controller_test.go index d7ba7c90..3da7dac9 100644 --- a/controllers/policy_controller_test.go +++ b/controllers/policy_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" "k8s.io/apimachinery/pkg/runtime" @@ -20,14 +26,63 @@ import ( ) var _ = Describe("policy-controller", func() { - var policy topology.Policy - var policyName string + var ( + policy topology.Policy + policyName string + policyMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + policyMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{policyNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(policyMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = policyMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: policyMgr.GetClient(), + Type: &topology.Policy{}, + Scheme: policyMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.PolicyReconciler{}, + }).SetupWithManager(policyMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { policy = topology.Policy{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, - Namespace: "default", + Namespace: policyNamespace, }, Spec: topology.PolicySpec{ Definition: &runtime.RawExtension{ @@ -51,21 +106,24 @@ var _ = Describe("policy-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &policy)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &policy)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &policy, ) return policy.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -76,21 +134,24 @@ var _ = Describe("policy-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &policy)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &policy)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &policy, ) return policy.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -101,20 +162,23 @@ var _ = Describe("policy-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &policy)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &policy)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &policy, ) return policy.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -123,16 +187,18 @@ var _ = Describe("policy-controller", func() { fakeRabbitMQClient.DeletePolicyReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &policy)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &policy)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete policy")) }) }) @@ -144,11 +210,13 @@ var _ = Describe("policy-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &policy)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &policy)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete policy")) }) }) diff --git a/controllers/queue_controller_test.go b/controllers/queue_controller_test.go index ce93c49f..e82bf170 100644 --- a/controllers/queue_controller_test.go +++ b/controllers/queue_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("queue-controller", func() { - var queue topology.Queue - var queueName string + var ( + queue topology.Queue + queueName string + queueMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + queueMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{queueNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(queueMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = queueMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: queueMgr.GetClient(), + Type: &topology.Queue{}, + Scheme: queueMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.QueueReconciler{}, + }).SetupWithManager(queueMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { queue = topology.Queue{ ObjectMeta: metav1.ObjectMeta{ Name: queueName, - Namespace: "default", + Namespace: queueNamespace, }, Spec: topology.QueueSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,21 +101,24 @@ var _ = Describe("queue-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &queue)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &queue)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &queue, ) return queue.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -71,21 +129,24 @@ var _ = Describe("queue-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &queue)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &queue)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &queue, ) return queue.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -96,20 +157,23 @@ var _ = Describe("queue-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &queue)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &queue)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &queue, ) return queue.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -123,11 +187,13 @@ var _ = Describe("queue-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &queue)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete queue")) }) }) @@ -139,11 +205,13 @@ var _ = Describe("queue-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &queue)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete queue")) }) }) diff --git a/controllers/schemareplication_controller_test.go b/controllers/schemareplication_controller_test.go index 477c46af..988d384a 100644 --- a/controllers/schemareplication_controller_test.go +++ b/controllers/schemareplication_controller_test.go @@ -2,13 +2,18 @@ package controllers_test import ( "bytes" + "context" "errors" "github.com/rabbitmq/messaging-topology-operator/controllers" "github.com/rabbitmq/messaging-topology-operator/internal" "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient" "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient/rabbitmqclientfakes" - "io/ioutil" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -22,21 +27,72 @@ import ( ) var _ = Describe("schema-replication-controller", func() { - var replication topology.SchemaReplication - var replicationName string + var ( + replication topology.SchemaReplication + replicationName string + schemaReplication ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + const ( + name = "example-rabbit" + ) + + BeforeEach(func() { + var err error + schemaReplication, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", + }, + Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{schemaReplicationNamespace: {}}}, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(schemaReplication.Start(ctx)).To(Succeed()) + }(managerCtx) + + Expect((&controllers.TopologyReconciler{ + Client: schemaReplication.GetClient(), + Type: &topology.SchemaReplication{}, + Scheme: schemaReplication.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.SchemaReplicationReconciler{Client: schemaReplication.GetClient()}, + }).SetupWithManager(schemaReplication)).To(Succeed()) + + k8sClient = schemaReplication.GetClient() + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { replication = topology.SchemaReplication{ ObjectMeta: metav1.ObjectMeta{ Name: replicationName, - Namespace: "default", + Namespace: schemaReplicationNamespace, }, Spec: topology.SchemaReplicationSpec{ UpstreamSecret: &corev1.LocalObjectReference{ Name: "endpoints-secret", // created in 'BeforeSuite' }, RabbitmqClusterReference: topology.RabbitmqClusterReference{ - Name: "example-rabbit", + Name: name, + Namespace: schemaReplicationNamespace, }, }, } @@ -53,9 +109,9 @@ var _ = Describe("schema-replication-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, @@ -78,9 +134,9 @@ var _ = Describe("schema-replication-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, @@ -103,9 +159,9 @@ var _ = Describe("schema-replication-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, @@ -125,18 +181,26 @@ var _ = Describe("schema-replication-controller", func() { fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &replication)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete schemareplication")) }) + + AfterEach(func() { + // this is to let the deletion finish + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "200 Ok", + StatusCode: http.StatusOK, + }, nil) + }) }) When("the RabbitMQ Client returns a Go error response", func() { @@ -146,13 +210,21 @@ var _ = Describe("schema-replication-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &replication)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete schemareplication")) }) + + AfterEach(func() { + // this is to let the deletion finish + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "200 Ok", + StatusCode: http.StatusOK, + }, nil) + }) }) }) @@ -162,14 +234,14 @@ var _ = Describe("schema-replication-controller", func() { replication = topology.SchemaReplication{ ObjectMeta: metav1.ObjectMeta{ Name: replicationName, - Namespace: "default", + Namespace: schemaReplicationNamespace, }, Spec: topology.SchemaReplicationSpec{ SecretBackend: topology.SecretBackend{Vault: &topology.VaultSpec{SecretPath: "rabbitmq"}}, Endpoints: "test:12345", RabbitmqClusterReference: topology.RabbitmqClusterReference{ - Name: "example-rabbit", - Namespace: "default", + Name: name, + Namespace: schemaReplicationNamespace, }, }, } @@ -191,9 +263,9 @@ var _ = Describe("schema-replication-controller", func() { return fakeSecretStoreClient, nil } - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) Eventually(func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, diff --git a/controllers/shovel_controller_test.go b/controllers/shovel_controller_test.go index 57ffc0c2..5dab06bc 100644 --- a/controllers/shovel_controller_test.go +++ b/controllers/shovel_controller_test.go @@ -2,10 +2,16 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest/komega" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -19,14 +25,63 @@ import ( ) var _ = Describe("shovel-controller", func() { - var shovel topology.Shovel - var shovelName string + var ( + shovel topology.Shovel + shovelName string + shovelMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + shovelMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{shovelNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(shovelMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = shovelMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: shovelMgr.GetClient(), + Type: &topology.Shovel{}, + Scheme: shovelMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.ShovelReconciler{Client: shovelMgr.GetClient()}, + }).SetupWithManager(shovelMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { shovel = topology.Shovel{ ObjectMeta: metav1.ObjectMeta{ Name: shovelName, - Namespace: "default", + Namespace: shovelNamespace, }, Spec: topology.ShovelSpec{ Name: "my-shovel-configuration", @@ -50,21 +105,24 @@ var _ = Describe("shovel-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &shovel)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some HTTP error"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some HTTP error"), + }))) }) }) @@ -75,21 +133,24 @@ var _ = Describe("shovel-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &shovel)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) @@ -103,24 +164,27 @@ var _ = Describe("shovel-controller", func() { }) It("works", func() { - Expect(client.Create(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) By("setting the correct finalizer") Eventually(komega.Object(&shovel)).WithTimeout(2 * time.Second).Should(HaveField("ObjectMeta.Finalizers", ConsistOf("deletion.finalizers.shovels.rabbitmq.com"))) By("sets the status condition 'Ready' to 'true'") - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) }) }) @@ -131,20 +195,23 @@ var _ = Describe("shovel-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &shovel)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -158,11 +225,14 @@ var _ = Describe("shovel-controller", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete shovel")) }) }) @@ -174,11 +244,14 @@ var _ = Describe("shovel-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete shovel")) }) }) @@ -193,11 +266,14 @@ var _ = Describe("shovel-controller", func() { }) It("publishes a normal event", func() { - Expect(client.Delete(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) Expect(observedEvents()).To(SatisfyAll( Not(ContainElement("Warning FailedDelete failed to delete shovel")), ContainElement("Normal SuccessfulDelete successfully deleted shovel"), diff --git a/controllers/suite_test.go b/controllers/suite_test.go index b72e475a..04cf9bef 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -14,9 +14,8 @@ import ( "crypto/x509" "fmt" "go/build" + "k8s.io/client-go/kubernetes" "path/filepath" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/metrics/server" "testing" "time" @@ -26,8 +25,6 @@ import ( "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient/rabbitmqclientfakes" topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" - topologyClient "github.com/rabbitmq/messaging-topology-operator/pkg/generated/clientset/versioned" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -44,7 +41,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" - "github.com/rabbitmq/messaging-topology-operator/controllers" ) func TestControllers(t *testing.T) { @@ -53,12 +49,10 @@ func TestControllers(t *testing.T) { } var ( - testEnv *envtest.Environment - client runtimeClient.Client - clientSet *topologyClient.Clientset - ctx context.Context - cancel context.CancelFunc - mgr ctrl.Manager + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc + //mgr ctrl.Manager fakeRabbitMQClient *rabbitmqclientfakes.FakeClient fakeRabbitMQClientError error fakeRabbitMQClientFactory = func(connectionCreds map[string]string, tlsEnabled bool, certPool *x509.CertPool) (rabbitmqclient.Client, error) { @@ -76,11 +70,25 @@ var ( arg3 *x509.CertPool } fakeRecorder *record.FakeRecorder - topologyReconcilers []*controllers.TopologyReconciler - superStreamReconciler *controllers.SuperStreamReconciler statusEventsUpdateTimeout = 20 * time.Second ) +const ( + bindingNamespace = "binding-test" + exchangeNamespace = "exchange-test" + permissionNamespace = "permission-test" + policyNamespace = "policy-test" + queueNamespace = "queue-test" + userNamespace = "user-test" + vhostNamespace = "vhost-test" + schemaReplicationNamespace = "schema-replication-test" + federationNamespace = "federation-test" + shovelNamespace = "shovel-test" + topicPermissionNamespace = "topic-permission-test" + superStreamNamespace = "super-stream-test" + topologyNamespace = "topology-reconciler-test" +) + var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter))) @@ -99,216 +107,112 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) Expect(cfg).ToNot(BeNil()) + // These lines are important to ensure that managers and controllers + // know how to convert k8s objects from JSON to Go objects Expect(scheme.AddToScheme(scheme.Scheme)).To(Succeed()) Expect(topology.AddToScheme(scheme.Scheme)).To(Succeed()) Expect(topologyv1alpha1.AddToScheme(scheme.Scheme)).To(Succeed()) Expect(rabbitmqv1beta1.AddToScheme(scheme.Scheme)).To(Succeed()) - clientSet, err = topologyClient.NewForConfig(cfg) - Expect(err).NotTo(HaveOccurred()) - - mgr, err = ctrl.NewManager(cfg, ctrl.Options{ - Scheme: scheme.Scheme, - Metrics: server.Options{ - BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite - }, - Cache: cache.Options{ - DefaultNamespaces: map[string]cache.Config{"default": {}}, - }, - }) - Expect(err).ToNot(HaveOccurred()) - - fakeRecorder = record.NewFakeRecorder(128) - - // The order in which these are declared matters - // Keep it sync with the order in which 'topologyObjects' are declared in 'common_test.go` - topologyReconcilers = []*controllers.TopologyReconciler{ - { - Client: mgr.GetClient(), - Type: &topology.Binding{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.BindingReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Exchange{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.ExchangeReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Permission{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.PermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Policy{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.PolicyReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Queue{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.QueueReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.User{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.UserReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Vhost{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.VhostReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.SchemaReplication{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.SchemaReplicationReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Federation{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.FederationReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Shovel{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.ShovelReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.TopicPermission{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.TopicPermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, - }, + namespaces := []string{ + bindingNamespace, + exchangeNamespace, + permissionNamespace, + policyNamespace, + queueNamespace, + userNamespace, + vhostNamespace, + schemaReplicationNamespace, + federationNamespace, + shovelNamespace, + topicPermissionNamespace, + superStreamNamespace, + topologyNamespace, } - for _, controller := range topologyReconcilers { - Expect(controller.SetupWithManager(mgr)).To(Succeed()) - } - - superStreamReconciler = &controllers.SuperStreamReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - } + clientSet, err := kubernetes.NewForConfig(cfg) + Expect(err).ToNot(HaveOccurred()) - Expect(superStreamReconciler.SetupWithManager(mgr)).To(Succeed()) + client, err := runtimeClient.New(cfg, runtimeClient.Options{}) + Expect(err).ToNot(HaveOccurred()) - go func() { - err = mgr.Start(ctx) + for _, n := range namespaces { + _, err := clientSet.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n}}, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - }() - - client = mgr.GetClient() - Expect(client).ToNot(BeNil()) - - komega.SetClient(client) - komega.SetContext(ctx) - - rmq := rabbitmqv1beta1.RabbitmqCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "example-rabbit", - Namespace: "default", - Annotations: map[string]string{ - "rabbitmq.com/topology-allowed-namespaces": "allowed", + rmq := rabbitmqv1beta1.RabbitmqCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-rabbit", + Namespace: n, + Annotations: map[string]string{ + "rabbitmq.com/topology-allowed-namespaces": "allowed", + }, }, - }, - Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ - TLS: rabbitmqv1beta1.TLSSpec{ - SecretName: "i-do-not-exist-but-its-fine", + Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ + TLS: rabbitmqv1beta1.TLSSpec{ + SecretName: "i-do-not-exist-but-its-fine", + }, }, - }, - } - Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) - - rmq = rabbitmqv1beta1.RabbitmqCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-all-rabbit", - Namespace: "default", - Annotations: map[string]string{ - "rabbitmq.com/topology-allowed-namespaces": "*", + } + Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) + + rmq = rabbitmqv1beta1.RabbitmqCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "allow-all-rabbit", + Namespace: n, + Annotations: map[string]string{ + "rabbitmq.com/topology-allowed-namespaces": "*", + }, }, - }, + } + Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) } - Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) - endpointsSecretBody := map[string][]byte{ - "username": []byte("a-random-user"), - "password": []byte("a-random-password"), - "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), - } + fakeRecorder = record.NewFakeRecorder(128) + + //Expect(superStreamReconciler.SetupWithManager(mgr)).To(Succeed()) + + komega.SetClient(client) + komega.SetContext(ctx) // used in schema-replication-controller test endpointsSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "endpoints-secret", - Namespace: "default", + Namespace: schemaReplicationNamespace, }, Type: corev1.SecretTypeOpaque, - Data: endpointsSecretBody, + Data: map[string][]byte{ + "username": []byte("a-random-user"), + "password": []byte("a-random-password"), + "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), + }, } Expect(client.Create(ctx, &endpointsSecret)).To(Succeed()) - federationUriSecretBody := map[string][]byte{ - "uri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), - } - // used in federation-controller test federationUriSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "federation-uri", - Namespace: "default", + Namespace: federationNamespace, }, Type: corev1.SecretTypeOpaque, - Data: federationUriSecretBody, + Data: map[string][]byte{ + "uri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), + }, } Expect(client.Create(ctx, &federationUriSecret)).To(Succeed()) - shovelUriSecretBody := map[string][]byte{ - "srcUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), - "destUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), - } - // used in shovel-controller test shovelUriSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "shovel-uri-secret", - Namespace: "default", + Namespace: shovelNamespace, }, Type: corev1.SecretTypeOpaque, - Data: shovelUriSecretBody, + Data: map[string][]byte{ + "srcUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), + "destUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), + }, } Expect(client.Create(ctx, &shovelUriSecret)).To(Succeed()) }) diff --git a/controllers/super_stream_controller_test.go b/controllers/super_stream_controller_test.go index 1048d2bd..b9c2e266 100644 --- a/controllers/super_stream_controller_test.go +++ b/controllers/super_stream_controller_test.go @@ -1,8 +1,14 @@ package controllers_test import ( + "context" "fmt" + "github.com/rabbitmq/messaging-topology-operator/controllers" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "strconv" "time" @@ -10,7 +16,7 @@ import ( . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" - topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + topologyv1beta1 "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" "github.com/rabbitmq/messaging-topology-operator/internal/managedresource" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,11 +25,59 @@ import ( var _ = Describe("super-stream-controller", func() { - var superStream topologyv1alpha1.SuperStream - var superStreamName string - var expectedQueueNames []string + var ( + superStream topologyv1alpha1.SuperStream + superStreamName string + expectedQueueNames []string + superStreamMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) When("validating RabbitMQ Client failures", func() { + BeforeEach(func() { + var err error + superStreamMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{superStreamNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(superStreamMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = superStreamMgr.GetClient() + + Expect((&controllers.SuperStreamReconciler{ + Log: GinkgoLogr, + Client: superStreamMgr.GetClient(), + Scheme: superStreamMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + }).SetupWithManager(superStreamMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) + JustBeforeEach(func() { fakeRabbitMQClient.DeclareExchangeReturns(&http.Response{ Status: "201 Created", @@ -52,10 +106,10 @@ var _ = Describe("super-stream-controller", func() { superStream = topologyv1alpha1.SuperStream{ ObjectMeta: metav1.ObjectMeta{ Name: superStreamName, - Namespace: "default", + Namespace: superStreamNamespace, }, Spec: topologyv1alpha1.SuperStreamSpec{ - RabbitmqClusterReference: topology.RabbitmqClusterReference{ + RabbitmqClusterReference: topologyv1beta1.RabbitmqClusterReference{ Name: "example-rabbit", }, Partitions: 3, @@ -66,16 +120,16 @@ var _ = Describe("super-stream-controller", func() { Context("creation", func() { When("an underlying resource is deleted", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &superStream)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -86,27 +140,27 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "delete-binding" }) It("recreates the missing object", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding expectedBindingName := fmt.Sprintf("%s-binding-2", superStreamName) - Expect(client.Get( + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, )).To(Succeed()) initialCreationTimestamp := binding.CreationTimestamp - Expect(client.Delete(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &binding)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -114,9 +168,9 @@ var _ = Describe("super-stream-controller", func() { By("recreating the binding", func() { EventuallyWithOffset(1, func() bool { - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) if err != nil { @@ -133,27 +187,27 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "delete-queue" }) It("recreates the missing object", func() { - var queue topology.Queue + var queue topologyv1beta1.Queue expectedQueueName := fmt.Sprintf("%s-partition-1", superStreamName) - Expect(client.Get( + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &queue, )).To(Succeed()) initialCreationTimestamp := queue.CreationTimestamp - Expect(client.Delete(ctx, &queue)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -161,9 +215,9 @@ var _ = Describe("super-stream-controller", func() { By("recreating the queue", func() { EventuallyWithOffset(1, func() bool { - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &queue, ) if err != nil { @@ -180,26 +234,26 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "delete-exchange" }) It("recreates the missing object", func() { - var exchange topology.Exchange - Expect(client.Get( + var exchange topologyv1beta1.Exchange + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName + "-exchange", Namespace: "default"}, + types.NamespacedName{Name: superStreamName + "-exchange", Namespace: superStreamNamespace}, &exchange, )).To(Succeed()) initialCreationTimestamp := exchange.CreationTimestamp - Expect(client.Delete(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -207,9 +261,9 @@ var _ = Describe("super-stream-controller", func() { By("recreating the exchange", func() { EventuallyWithOffset(1, func() bool { - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName + "-exchange", Namespace: "default"}, + types.NamespacedName{Name: superStreamName + "-exchange", Namespace: superStreamNamespace}, &exchange, ) if err != nil { @@ -227,39 +281,39 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "scale-down-super-stream" }) It("refuses scaling down the partitions with a helpful warning", func() { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) originalPartitionCount = len(superStream.Status.Partitions) superStream.Spec.Partitions = 1 - Expect(client.Update(ctx, &superStream)).To(Succeed()) + Expect(k8sClient.Update(ctx, &superStream)).To(Succeed()) By("setting the status condition 'Ready' to 'false' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("FailedCreateOrUpdate"), "Status": Equal(corev1.ConditionFalse), }))) }) By("retaining the original stream queue partitions", func() { - var partition topology.Queue + var partition topologyv1beta1.Queue expectedQueueNames = []string{} for i := 0; i < originalPartitionCount; i++ { expectedQueueName := fmt.Sprintf("%s-partition-%d", superStreamName, i) - Expect(client.Get( + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &partition, )).To(Succeed()) expectedQueueNames = append(expectedQueueNames, partition.Spec.Name) @@ -270,7 +324,7 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -279,9 +333,9 @@ var _ = Describe("super-stream-controller", func() { By("setting the status of the super stream to list the partition queue names", func() { ConsistentlyWithOffset(1, func() []string { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) @@ -290,13 +344,13 @@ var _ = Describe("super-stream-controller", func() { }) By("retaining the original bindings", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding for i := 0; i < originalPartitionCount; i++ { expectedBindingName := fmt.Sprintf("%s-binding-%d", superStreamName, i) EventuallyWithOffset(1, func() error { - return client.Get( + return k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) @@ -310,7 +364,7 @@ var _ = Describe("super-stream-controller", func() { "RoutingKey": Equal(strconv.Itoa(i)), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -327,17 +381,17 @@ var _ = Describe("super-stream-controller", func() { When("the super stream is scaled", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &superStream)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -347,23 +401,23 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "scale-out-super-stream" }) It("allows the number of partitions to be increased", func() { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) superStream.Spec.Partitions = 5 - Expect(client.Update(ctx, &superStream)).To(Succeed()) + Expect(k8sClient.Update(ctx, &superStream)).To(Succeed()) By("creating n stream queue partitions", func() { - var partition topology.Queue + var partition topologyv1beta1.Queue expectedQueueNames = []string{} for i := 0; i < superStream.Spec.Partitions; i++ { expectedQueueName := fmt.Sprintf("%s-partition-%s", superStreamName, strconv.Itoa(i)) EventuallyWithOffset(1, func() error { - return client.Get( + return k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &partition, ) }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) @@ -375,7 +429,7 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -384,9 +438,9 @@ var _ = Describe("super-stream-controller", func() { By("setting the status of the super stream to list the partition queue names", func() { EventuallyWithOffset(1, func() []string { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) @@ -395,13 +449,13 @@ var _ = Describe("super-stream-controller", func() { }) By("creating n bindings", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding for i := 0; i < superStream.Spec.Partitions; i++ { expectedBindingName := fmt.Sprintf("%s-binding-%s", superStreamName, strconv.Itoa(i)) EventuallyWithOffset(1, func() error { - return client.Get( + return k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) @@ -415,7 +469,7 @@ var _ = Describe("super-stream-controller", func() { "RoutingKey": Equal(strconv.Itoa(i)), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -423,16 +477,16 @@ var _ = Describe("super-stream-controller", func() { }) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -449,30 +503,30 @@ var _ = Describe("super-stream-controller", func() { It("creates the SuperStream and any underlying resources", func() { superStream.Spec.RoutingKeys = []string{"abc", "bcd", "cde"} superStream.Spec.Partitions = 3 - Expect(client.Create(ctx, &superStream)).To(Succeed()) + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { var fetchedSuperStream topologyv1alpha1.SuperStream - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &fetchedSuperStream, ) return fetchedSuperStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) }) By("creating an exchange", func() { - var exchange topology.Exchange - err := client.Get( + var exchange topologyv1beta1.Exchange + err := k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName + "-exchange", Namespace: "default"}, + types.NamespacedName{Name: superStreamName + "-exchange", Namespace: superStreamNamespace}, &exchange, ) Expect(err).NotTo(HaveOccurred()) @@ -482,20 +536,20 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) }) By("creating n stream queue partitions", func() { - var partition topology.Queue + var partition topologyv1beta1.Queue expectedQueueNames = []string{} for i := 0; i < superStream.Spec.Partitions; i++ { expectedQueueName := fmt.Sprintf("%s-partition-%d", superStreamName, i) - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &partition, ) Expect(err).NotTo(HaveOccurred()) @@ -508,7 +562,7 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -517,9 +571,9 @@ var _ = Describe("super-stream-controller", func() { By("setting the status of the super stream to list the partition queue names", func() { EventuallyWithOffset(1, func() []string { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) @@ -528,12 +582,12 @@ var _ = Describe("super-stream-controller", func() { }) By("creating n bindings", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding for i := 0; i < superStream.Spec.Partitions; i++ { expectedBindingName := fmt.Sprintf("%s-binding-%d", superStreamName, i) - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) Expect(err).NotTo(HaveOccurred()) @@ -547,7 +601,7 @@ var _ = Describe("super-stream-controller", func() { "RoutingKey": Equal(superStream.Spec.RoutingKeys[i]), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -564,18 +618,18 @@ var _ = Describe("super-stream-controller", func() { It("creates the SuperStream and any underlying resources", func() { superStream.Spec.RoutingKeys = []string{"abc", "bcd", "cde"} superStream.Spec.Partitions = 2 - Expect(client.Create(ctx, &superStream)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { var fetchedSuperStream topologyv1alpha1.SuperStream - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &fetchedSuperStream, ) return fetchedSuperStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("FailedCreateOrUpdate"), "Message": Equal("SuperStream mismatch failed to reconcile"), "Status": Equal(corev1.ConditionFalse), diff --git a/controllers/topicpermission_controller_test.go b/controllers/topicpermission_controller_test.go index 9d4fd713..16255ed1 100644 --- a/controllers/topicpermission_controller_test.go +++ b/controllers/topicpermission_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,17 +24,66 @@ import ( ) var _ = Describe("topicpermission-controller", func() { - var topicperm topology.TopicPermission - var user topology.User - var name string - var userName string + var ( + topicperm topology.TopicPermission + user topology.User + name string + userName string + topicPermissionMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + topicPermissionMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{topicPermissionNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(topicPermissionMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = topicPermissionMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: topicPermissionMgr.GetClient(), + Type: &topology.TopicPermission{}, + Scheme: topicPermissionMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.TopicPermissionReconciler{Client: topicPermissionMgr.GetClient(), Scheme: topicPermissionMgr.GetScheme()}, + }).SetupWithManager(topicPermissionMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) When("validating RabbitMQ Client failures with username", func() { JustBeforeEach(func() { topicperm = topology.TopicPermission{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: "default", + Namespace: topicPermissionNamespace, }, Spec: topology.TopicPermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -51,21 +106,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -76,21 +134,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -101,20 +162,23 @@ var _ = Describe("topicpermission-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -123,16 +187,19 @@ var _ = Describe("topicpermission-controller", func() { fakeRabbitMQClient.DeleteTopicPermissionsInReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete topicpermission")) }) }) @@ -144,11 +211,14 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete topicpermission")) }) }) @@ -160,24 +230,24 @@ var _ = Describe("topicpermission-controller", func() { user = topology.User{ ObjectMeta: metav1.ObjectMeta{ Name: userName, - Namespace: "default", + Namespace: topicPermissionNamespace, }, Spec: topology.UserSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: topicPermissionNamespace, }, }, } topicperm = topology.TopicPermission{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: "default", + Namespace: topicPermissionNamespace, }, Spec: topology.TopicPermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: topicPermissionNamespace, }, UserReference: &corev1.LocalObjectReference{ Name: userName, @@ -211,21 +281,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Message": Equal("failed create Permission, missing User"), - "Status": Equal(corev1.ConditionFalse), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Message": Equal("failed create Permission, missing User"), + "Status": Equal(corev1.ConditionFalse), + }))) }) }) @@ -236,42 +309,52 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) }) }) Context("deletion", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("Secret User is removed first", func() { @@ -281,17 +364,23 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: user.Name + "-user-credentials", - Namespace: user.Namespace, - }, - })).To(Succeed()) - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + // We used to delete a Secret right here, before this PR: + // https://github.com/rabbitmq/messaging-topology-operator/pull/710 + // + // That PR refactored tests and provided controller isolation in tests, so that + // other controllers i.e. User controller, won't interfere with resources + // created/deleted/modified as part of this suite. Therefore, we don't need to + // delete the Secret objects because, after PR 710, the Secret object is never + // created, which meets the point of this test: when the Secret does not exist + // and Permission is deleted + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) observedEvents := observedEvents() Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete topicpermission")) Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully deleted topicpermission")) @@ -305,16 +394,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + observedEvents := observedEvents() Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete topicpermission")) Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully deleted topicpermission")) @@ -328,11 +425,15 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + observedEvents := observedEvents() Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete topicpermission")) Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully deleted topicpermission")) @@ -347,16 +448,22 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the correct deletion ownerref to the object", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) Eventually(func() []metav1.OwnerReference { var fetched topology.TopicPermission - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &fetched) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &fetched) if err != nil { return []metav1.OwnerReference{} } return fetched.ObjectMeta.OwnerReferences - }, 5).Should(Not(BeEmpty())) + }). + Within(5 * time.Second). + WithPolling(time.Second). + Should(Not(BeEmpty())) }) }) }) diff --git a/controllers/topology_controller_test.go b/controllers/topology_controller_test.go index f8164d0d..4b4959b5 100644 --- a/controllers/topology_controller_test.go +++ b/controllers/topology_controller_test.go @@ -3,13 +3,11 @@ package controllers_test import ( "context" "fmt" - rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1" "github.com/rabbitmq/messaging-topology-operator/controllers" - v1 "k8s.io/api/core/v1" - k8sApiErrors "k8s.io/apimachinery/pkg/api/errors" "net/http" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" @@ -21,13 +19,13 @@ import ( var _ = Describe("TopologyReconciler", func() { const ( - namespace = "topology-reconciler-test" - name = "topology-rabbit" + name = "example-rabbit" ) + var ( commonRabbitmqClusterRef = topology.RabbitmqClusterReference{ Name: name, - Namespace: namespace, + Namespace: topologyNamespace, } commonHttpCreatedResponse = &http.Response{ Status: "201 Created", @@ -40,6 +38,7 @@ var _ = Describe("TopologyReconciler", func() { topologyMgr ctrl.Manager managerCtx context.Context managerCancel context.CancelFunc + k8sClient runtimeClient.Client ) BeforeEach(func() { @@ -49,7 +48,7 @@ var _ = Describe("TopologyReconciler", func() { BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite }, Cache: cache.Options{ - DefaultNamespaces: map[string]cache.Config{namespace: {}}, + DefaultNamespaces: map[string]cache.Config{topologyNamespace: {}}, }, Logger: GinkgoLogr, }) @@ -61,25 +60,7 @@ var _ = Describe("TopologyReconciler", func() { Expect(topologyMgr.Start(ctx)).To(Succeed()) }(managerCtx) - c := topologyMgr.GetClient() - err = c.Create(context.Background(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}) - err = client.Create(context.Background(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}) - if err != nil && !k8sApiErrors.IsAlreadyExists(err) { - Fail(err.Error()) - } - - rmq := &rabbitmqv1beta1.RabbitmqCluster{ - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, - Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ - TLS: rabbitmqv1beta1.TLSSpec{ - SecretName: "i-do-not-exist-but-its-fine", - }, - }, - } - err = createRabbitmqClusterResources(c, rmq) - if err != nil && !k8sApiErrors.IsAlreadyExists(err) { - Fail(err.Error()) - } + k8sClient = topologyMgr.GetClient() }) AfterEach(func() { @@ -107,19 +88,19 @@ var _ = Describe("TopologyReconciler", func() { }).SetupWithManager(topologyMgr)).To(Succeed()) queue := &topology.Queue{ - ObjectMeta: metav1.ObjectMeta{Name: "ab-queue", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: "ab-queue", Namespace: topologyNamespace}, Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef}, } fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil) fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil) - Expect(client.Create(ctx, queue)).To(Succeed()) + Expect(k8sClient.Create(ctx, queue)).To(Succeed()) Eventually(func() int { return len(fakeRabbitMQClientFactoryArgsForCall) }, 5).Should(BeNumerically(">", 0)) credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(0) - expected := fmt.Sprintf("https://%s.%s.svc.some-domain.com:15671", name, namespace) + expected := fmt.Sprintf("https://%s.%s.svc.some-domain.com:15671", name, topologyNamespace) Expect(credentials).Should(HaveKeyWithValue("uri", expected)) }) }) @@ -136,19 +117,19 @@ var _ = Describe("TopologyReconciler", func() { }).SetupWithManager(topologyMgr)).To(Succeed()) queue := &topology.Queue{ - ObjectMeta: metav1.ObjectMeta{Name: "bb-queue", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: "bb-queue", Namespace: topologyNamespace}, Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef}, } fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil) fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil) - Expect(client.Create(ctx, queue)).To(Succeed()) + Expect(k8sClient.Create(ctx, queue)).To(Succeed()) Eventually(func() int { return len(fakeRabbitMQClientFactoryArgsForCall) }, 5).Should(BeNumerically(">", 0)) credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(0) - expected := fmt.Sprintf("https://%s.%s.svc:15671", name, namespace) + expected := fmt.Sprintf("https://%s.%s.svc:15671", name, topologyNamespace) Expect(credentials).Should(HaveKeyWithValue("uri", expected)) }) }) @@ -166,19 +147,19 @@ var _ = Describe("TopologyReconciler", func() { }).SetupWithManager(topologyMgr)).To(Succeed()) queue := &topology.Queue{ - ObjectMeta: metav1.ObjectMeta{Name: "cb-queue", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: "cb-queue", Namespace: topologyNamespace}, Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef}, } fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil) fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil) - Expect(client.Create(ctx, queue)).To(Succeed()) + Expect(k8sClient.Create(ctx, queue)).To(Succeed()) Eventually(func() int { return len(fakeRabbitMQClientFactoryArgsForCall) }, 5).Should(BeNumerically(">", 0)) credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(0) - expected := fmt.Sprintf("http://%s.%s.svc:15672", name, namespace) + expected := fmt.Sprintf("http://%s.%s.svc:15672", name, topologyNamespace) Expect(credentials).Should(HaveKeyWithValue("uri", expected)) }) }) diff --git a/controllers/user_controller_test.go b/controllers/user_controller_test.go index b18ddb9a..aa44a89a 100644 --- a/controllers/user_controller_test.go +++ b/controllers/user_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("UserController", func() { - var user topology.User - var userName string + var ( + user topology.User + userName string + userMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + userMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{userNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(userMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = userMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: userMgr.GetClient(), + Type: &topology.User{}, + Scheme: userMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.UserReconciler{Client: userMgr.GetClient(), Scheme: userMgr.GetScheme()}, + }).SetupWithManager(userMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { user = topology.User{ ObjectMeta: metav1.ObjectMeta{ Name: userName, - Namespace: "default", + Namespace: userNamespace, }, Spec: topology.UserSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -34,6 +89,7 @@ var _ = Describe("UserController", func() { }, } }) + When("creating a user", func() { When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { @@ -45,21 +101,24 @@ var _ = Describe("UserController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &user, ) return user.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some HTTP error"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some HTTP error"), + }))) }) }) @@ -70,21 +129,24 @@ var _ = Describe("UserController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &user, ) return user.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("hit a exception"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("hit a exception"), + }))) }) }) }) @@ -95,20 +157,23 @@ var _ = Describe("UserController", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &user)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &user, ) return user.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -122,11 +187,14 @@ var _ = Describe("UserController", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete user")) }) }) @@ -138,11 +206,14 @@ var _ = Describe("UserController", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete user")) }) }) @@ -157,17 +228,20 @@ var _ = Describe("UserController", func() { }) It("raises an event to indicate a successful deletion", func() { - Expect(client.Delete(ctx, &corev1.Secret{ + Expect(k8sClient.Delete(ctx, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: user.Name + "-user-credentials", Namespace: user.Namespace, }, })).To(Succeed()) - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) Expect(observedEvents()).To(SatisfyAll( Not(ContainElement("Warning FailedDelete failed to delete user")), diff --git a/controllers/vhost_controller_test.go b/controllers/vhost_controller_test.go index fc2564a0..5320dc0a 100644 --- a/controllers/vhost_controller_test.go +++ b/controllers/vhost_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("vhost-controller", func() { - var vhost topology.Vhost - var vhostName string + var ( + vhost topology.Vhost + vhostName string + vhostMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + vhostMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{vhostNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(vhostMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = vhostMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: vhostMgr.GetClient(), + Type: &topology.Vhost{}, + Scheme: vhostMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.VhostReconciler{Client: vhostMgr.GetClient()}, + }).SetupWithManager(vhostMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { vhost = topology.Vhost{ ObjectMeta: metav1.ObjectMeta{ Name: vhostName, - Namespace: "default", + Namespace: vhostNamespace, }, Spec: topology.VhostSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,21 +101,24 @@ var _ = Describe("vhost-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &vhost)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &vhost, ) return vhost.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -71,21 +129,24 @@ var _ = Describe("vhost-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &vhost)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &vhost, ) return vhost.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -96,20 +157,23 @@ var _ = Describe("vhost-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &vhost)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &vhost, ) return vhost.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -118,16 +182,19 @@ var _ = Describe("vhost-controller", func() { fakeRabbitMQClient.DeleteVhostReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &vhost)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &vhost)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete vhost")) }) }) @@ -139,11 +206,14 @@ var _ = Describe("vhost-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &vhost)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &vhost)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete vhost")) }) }) diff --git a/go.sum b/go.sum index 9fc202be..2a37e48b 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,7 @@ github.com/go-errors/errors v1.5.0 h1:/EuijeGOu7ckFxzhkj4CXJ8JaenxK7bKUxpPYqeLHq github.com/go-errors/errors v1.5.0/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo= github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -162,8 +163,10 @@ github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -218,6 +221,7 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod github.com/mreiferson/go-httpclient v0.0.0-20160630210159-31f0106b4474/go.mod h1:OQA4XLvDbMgS8P0CevmM4m9Q3Jq4phKUzcocxuGJ5m8= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/internal/binding_test.go b/internal/binding_test.go index b2a68a23..f17836a7 100644 --- a/internal/binding_test.go +++ b/internal/binding_test.go @@ -11,6 +11,7 @@ import ( var _ = Describe("Binding", func() { var binding *topology.Binding + Context("GenerateBindingInfo", func() { BeforeEach(func() { binding = &topology.Binding{