diff --git a/Makefile b/Makefile index 1b28eb37..c3c2fcba 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ install-tools: @$(get_mod_code_generator) go install golang.org/x/vuln/cmd/govulncheck@latest -ENVTEST_K8S_VERSION = 1.22.1 +ENVTEST_K8S_VERSION = 1.26.1 ARCHITECTURE = amd64 LOCAL_TESTBIN = $(CURDIR)/testbin diff --git a/api/v1beta1/user_types_test.go b/api/v1beta1/user_types_test.go index 9c44b534..4a416584 100644 --- a/api/v1beta1/user_types_test.go +++ b/api/v1beta1/user_types_test.go @@ -87,7 +87,7 @@ var _ = Describe("user spec", func() { username = "invalid-user" }) It("fails to create the user", func() { - Expect(k8sClient.Create(ctx, &user)).To(MatchError(`User.rabbitmq.com "invalid-user" is invalid: spec.tags: Unsupported value: "invalid": supported values: "management", "policymaker", "monitoring", "administrator"`)) + Expect(k8sClient.Create(ctx, &user)).To(MatchError(`User.rabbitmq.com "invalid-user" is invalid: spec.tags[1]: Unsupported value: "invalid": supported values: "management", "policymaker", "monitoring", "administrator"`)) }) }) }) diff --git a/controllers/binding_controller_test.go b/controllers/binding_controller_test.go index 94d8c4b5..05d5b79f 100644 --- a/controllers/binding_controller_test.go +++ b/controllers/binding_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("bindingController", func() { - var binding topology.Binding - var bindingName string + var ( + binding topology.Binding + bindingName string + bindingMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + bindingMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{bindingNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(bindingMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = bindingMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: bindingMgr.GetClient(), + Type: &topology.Binding{}, + Scheme: bindingMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.BindingReconciler{}, + }).SetupWithManager(bindingMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { binding = topology.Binding{ ObjectMeta: metav1.ObjectMeta{ Name: bindingName, - Namespace: "default", + Namespace: bindingNamespace, }, Spec: topology.BindingSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,9 +101,9 @@ var _ = Describe("bindingController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Create(ctx, &binding)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &binding, @@ -71,9 +126,9 @@ var _ = Describe("bindingController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Create(ctx, &binding)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &binding, @@ -96,9 +151,9 @@ var _ = Describe("bindingController", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Create(ctx, &binding)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &binding, @@ -118,14 +173,14 @@ var _ = Describe("bindingController", func() { fakeRabbitMQClient.DeleteBindingReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &binding)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete binding")) @@ -139,9 +194,9 @@ var _ = Describe("bindingController", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &binding)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &topology.Binding{}) return apierrors.IsNotFound(err) }, 5).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete binding")) diff --git a/controllers/exchange_controller_test.go b/controllers/exchange_controller_test.go index abdb63fb..0d016fff 100644 --- a/controllers/exchange_controller_test.go +++ b/controllers/exchange_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,64 @@ import ( ) var _ = Describe("exchange-controller", func() { - var exchange topology.Exchange - var exchangeName string + var ( + exchange topology.Exchange + exchangeName string + exchangeMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + exchangeMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{exchangeNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(exchangeMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = exchangeMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: exchangeMgr.GetClient(), + Type: &topology.Exchange{}, + Scheme: exchangeMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.ExchangeReconciler{}, + }).SetupWithManager(exchangeMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { + // this will be executed after all BeforeEach have run exchange = topology.Exchange{ ObjectMeta: metav1.ObjectMeta{ Name: exchangeName, - Namespace: "default", + Namespace: exchangeNamespace, }, Spec: topology.ExchangeSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,9 +102,9 @@ var _ = Describe("exchange-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &exchange, @@ -71,9 +127,9 @@ var _ = Describe("exchange-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &exchange, @@ -99,9 +155,9 @@ var _ = Describe("exchange-controller", func() { }) It("changes only if status changes", func() { By("setting LastTransitionTime when transitioning to status Ready=true") - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Namespace: exchange.Namespace, Name: exchange.Name}, &exchange, @@ -120,9 +176,9 @@ var _ = Describe("exchange-controller", func() { StatusCode: http.StatusNoContent, }, nil) exchange.Labels = map[string]string{"k1": "v1"} - Expect(client.Update(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Update(ctx, &exchange)).To(Succeed()) ConsistentlyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Namespace: exchange.Namespace, Name: exchange.Name}, &exchange, @@ -140,9 +196,9 @@ var _ = Describe("exchange-controller", func() { StatusCode: http.StatusInternalServerError, }, errors.New("something went wrong")) exchange.Labels = map[string]string{"k1": "v2"} - Expect(client.Update(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Update(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Namespace: exchange.Namespace, Name: exchange.Name}, &exchange, @@ -164,9 +220,9 @@ var _ = Describe("exchange-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &exchange, @@ -191,9 +247,9 @@ var _ = Describe("exchange-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete exchange")) @@ -207,9 +263,9 @@ var _ = Describe("exchange-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: exchange.Name, Namespace: exchange.Namespace}, &topology.Exchange{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete exchange")) diff --git a/controllers/federation_controller_test.go b/controllers/federation_controller_test.go index 5e94cf33..42e12141 100644 --- a/controllers/federation_controller_test.go +++ b/controllers/federation_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("federation-controller", func() { - var federation topology.Federation - var federationName string + var ( + federation topology.Federation + federationName string + federationMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + federationMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{federationNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(federationMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = federationMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: federationMgr.GetClient(), + Type: &topology.Federation{}, + Scheme: federationMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.FederationReconciler{Client: federationMgr.GetClient()}, + }).SetupWithManager(federationMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { federation = topology.Federation{ ObjectMeta: metav1.ObjectMeta{ Name: federationName, - Namespace: "default", + Namespace: federationNamespace, }, Spec: topology.FederationSpec{ Name: "my-federation-upstream", @@ -49,21 +104,24 @@ var _ = Describe("federation-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &federation)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation, ) return federation.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some HTTP error"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some HTTP error"), + }))) }) }) @@ -74,21 +132,24 @@ var _ = Describe("federation-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &federation)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation, ) return federation.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some go failure here"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some go failure here"), + }))) }) }) }) @@ -99,20 +160,23 @@ var _ = Describe("federation-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &federation)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &federation)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation, ) return federation.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -121,16 +185,19 @@ var _ = Describe("federation-controller", func() { fakeRabbitMQClient.DeleteFederationUpstreamReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &federation)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &federation)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete federation")) }) }) @@ -142,11 +209,14 @@ var _ = Describe("federation-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &federation)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &federation)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &topology.Federation{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete federation")) }) }) diff --git a/controllers/permission_controller_test.go b/controllers/permission_controller_test.go index c54de3a3..37c0c1e5 100644 --- a/controllers/permission_controller_test.go +++ b/controllers/permission_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,17 +24,66 @@ import ( ) var _ = Describe("permission-controller", func() { - var permission topology.Permission - var user topology.User - var permissionName string - var userName string + var ( + permission topology.Permission + user topology.User + permissionName string + userName string + permissionMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + permissionMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{permissionNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(permissionMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = permissionMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: permissionMgr.GetClient(), + Type: &topology.Permission{}, + Scheme: permissionMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.PermissionReconciler{Client: permissionMgr.GetClient(), Scheme: permissionMgr.GetScheme()}, + }).SetupWithManager(permissionMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) When("validating RabbitMQ Client failures with username", func() { JustBeforeEach(func() { permission = topology.Permission{ ObjectMeta: metav1.ObjectMeta{ Name: permissionName, - Namespace: "default", + Namespace: permissionNamespace, }, Spec: topology.PermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -51,9 +106,9 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -76,9 +131,9 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -101,9 +156,9 @@ var _ = Describe("permission-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -128,9 +183,9 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete permission")) @@ -144,9 +199,9 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete permission")) @@ -160,24 +215,24 @@ var _ = Describe("permission-controller", func() { user = topology.User{ ObjectMeta: metav1.ObjectMeta{ Name: userName, - Namespace: "default", + Namespace: permissionNamespace, }, Spec: topology.UserSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: permissionNamespace, }, }, } permission = topology.Permission{ ObjectMeta: metav1.ObjectMeta{ Name: permissionName, - Namespace: "default", + Namespace: permissionNamespace, }, Spec: topology.PermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: permissionNamespace, }, UserReference: &corev1.LocalObjectReference{ Name: userName, @@ -211,9 +266,9 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -236,10 +291,12 @@ var _ = Describe("permission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -257,10 +314,12 @@ var _ = Describe("permission-controller", func() { Context("deletion", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &permission, @@ -274,22 +333,25 @@ var _ = Describe("permission-controller", func() { }))) }) - When("Secret User is removed first", func() { + When("Secret for User credential does not exist", func() { BeforeEach(func() { permissionName = "test-with-userref-delete-secret" userName = "example-delete-secret-first" }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: user.Name + "-user-credentials", - Namespace: user.Namespace, - }, - })).To(Succeed()) - Expect(client.Delete(ctx, &permission)).To(Succeed()) + // We used to delete a Secret right here, before this PR: + // https://github.com/rabbitmq/messaging-topology-operator/pull/710 + // + // That PR refactored tests and provided controller isolation in tests, so that + // other controllers i.e. User controller, won't interfere with resources + // created/deleted/modified as part of this suite. Therefore, we don't need to + // delete the Secret objects because, after PR 710, the Secret object is never + // created, which meets the point of this test: when the Secret does not exist + // and Permission is deleted + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) observedEvents := observedEvents() @@ -305,14 +367,14 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) observedEvents := observedEvents() @@ -328,9 +390,9 @@ var _ = Describe("permission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &topology.Permission{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeTrue()) observedEvents := observedEvents() @@ -347,11 +409,13 @@ var _ = Describe("permission-controller", func() { }) It("sets the correct deletion ownerref to the object", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &permission)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &permission)).To(Succeed()) Eventually(func() []metav1.OwnerReference { var fetched topology.Permission - err := client.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &fetched) + err := k8sClient.Get(ctx, types.NamespacedName{Name: permission.Name, Namespace: permission.Namespace}, &fetched) if err != nil { return []metav1.OwnerReference{} } diff --git a/controllers/policy_controller_test.go b/controllers/policy_controller_test.go index d7ba7c90..3da7dac9 100644 --- a/controllers/policy_controller_test.go +++ b/controllers/policy_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" "k8s.io/apimachinery/pkg/runtime" @@ -20,14 +26,63 @@ import ( ) var _ = Describe("policy-controller", func() { - var policy topology.Policy - var policyName string + var ( + policy topology.Policy + policyName string + policyMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + policyMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{policyNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(policyMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = policyMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: policyMgr.GetClient(), + Type: &topology.Policy{}, + Scheme: policyMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.PolicyReconciler{}, + }).SetupWithManager(policyMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { policy = topology.Policy{ ObjectMeta: metav1.ObjectMeta{ Name: policyName, - Namespace: "default", + Namespace: policyNamespace, }, Spec: topology.PolicySpec{ Definition: &runtime.RawExtension{ @@ -51,21 +106,24 @@ var _ = Describe("policy-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &policy)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &policy)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &policy, ) return policy.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -76,21 +134,24 @@ var _ = Describe("policy-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &policy)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &policy)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &policy, ) return policy.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -101,20 +162,23 @@ var _ = Describe("policy-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &policy)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &policy)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &policy, ) return policy.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -123,16 +187,18 @@ var _ = Describe("policy-controller", func() { fakeRabbitMQClient.DeletePolicyReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &policy)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &policy)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete policy")) }) }) @@ -144,11 +210,13 @@ var _ = Describe("policy-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &policy)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &policy)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: policy.Name, Namespace: policy.Namespace}, &topology.Policy{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete policy")) }) }) diff --git a/controllers/queue_controller_test.go b/controllers/queue_controller_test.go index ce93c49f..e82bf170 100644 --- a/controllers/queue_controller_test.go +++ b/controllers/queue_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("queue-controller", func() { - var queue topology.Queue - var queueName string + var ( + queue topology.Queue + queueName string + queueMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + queueMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{queueNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(queueMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = queueMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: queueMgr.GetClient(), + Type: &topology.Queue{}, + Scheme: queueMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.QueueReconciler{}, + }).SetupWithManager(queueMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { queue = topology.Queue{ ObjectMeta: metav1.ObjectMeta{ Name: queueName, - Namespace: "default", + Namespace: queueNamespace, }, Spec: topology.QueueSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,21 +101,24 @@ var _ = Describe("queue-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &queue)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &queue)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &queue, ) return queue.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -71,21 +129,24 @@ var _ = Describe("queue-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &queue)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &queue)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &queue, ) return queue.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -96,20 +157,23 @@ var _ = Describe("queue-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &queue)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &queue)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &queue, ) return queue.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -123,11 +187,13 @@ var _ = Describe("queue-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &queue)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete queue")) }) }) @@ -139,11 +205,13 @@ var _ = Describe("queue-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &queue)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: queue.Name, Namespace: queue.Namespace}, &topology.Queue{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete queue")) }) }) diff --git a/controllers/schemareplication_controller_test.go b/controllers/schemareplication_controller_test.go index 477c46af..988d384a 100644 --- a/controllers/schemareplication_controller_test.go +++ b/controllers/schemareplication_controller_test.go @@ -2,13 +2,18 @@ package controllers_test import ( "bytes" + "context" "errors" "github.com/rabbitmq/messaging-topology-operator/controllers" "github.com/rabbitmq/messaging-topology-operator/internal" "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient" "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient/rabbitmqclientfakes" - "io/ioutil" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -22,21 +27,72 @@ import ( ) var _ = Describe("schema-replication-controller", func() { - var replication topology.SchemaReplication - var replicationName string + var ( + replication topology.SchemaReplication + replicationName string + schemaReplication ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + const ( + name = "example-rabbit" + ) + + BeforeEach(func() { + var err error + schemaReplication, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", + }, + Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{schemaReplicationNamespace: {}}}, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(schemaReplication.Start(ctx)).To(Succeed()) + }(managerCtx) + + Expect((&controllers.TopologyReconciler{ + Client: schemaReplication.GetClient(), + Type: &topology.SchemaReplication{}, + Scheme: schemaReplication.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.SchemaReplicationReconciler{Client: schemaReplication.GetClient()}, + }).SetupWithManager(schemaReplication)).To(Succeed()) + + k8sClient = schemaReplication.GetClient() + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { replication = topology.SchemaReplication{ ObjectMeta: metav1.ObjectMeta{ Name: replicationName, - Namespace: "default", + Namespace: schemaReplicationNamespace, }, Spec: topology.SchemaReplicationSpec{ UpstreamSecret: &corev1.LocalObjectReference{ Name: "endpoints-secret", // created in 'BeforeSuite' }, RabbitmqClusterReference: topology.RabbitmqClusterReference{ - Name: "example-rabbit", + Name: name, + Namespace: schemaReplicationNamespace, }, }, } @@ -53,9 +109,9 @@ var _ = Describe("schema-replication-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, @@ -78,9 +134,9 @@ var _ = Describe("schema-replication-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, @@ -103,9 +159,9 @@ var _ = Describe("schema-replication-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, @@ -125,18 +181,26 @@ var _ = Describe("schema-replication-controller", func() { fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &replication)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete schemareplication")) }) + + AfterEach(func() { + // this is to let the deletion finish + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "200 Ok", + StatusCode: http.StatusOK, + }, nil) + }) }) When("the RabbitMQ Client returns a Go error response", func() { @@ -146,13 +210,21 @@ var _ = Describe("schema-replication-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &replication)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &topology.SchemaReplication{}) return apierrors.IsNotFound(err) }, statusEventsUpdateTimeout).Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete schemareplication")) }) + + AfterEach(func() { + // this is to let the deletion finish + fakeRabbitMQClient.DeleteGlobalParameterReturns(&http.Response{ + Status: "200 Ok", + StatusCode: http.StatusOK, + }, nil) + }) }) }) @@ -162,14 +234,14 @@ var _ = Describe("schema-replication-controller", func() { replication = topology.SchemaReplication{ ObjectMeta: metav1.ObjectMeta{ Name: replicationName, - Namespace: "default", + Namespace: schemaReplicationNamespace, }, Spec: topology.SchemaReplicationSpec{ SecretBackend: topology.SecretBackend{Vault: &topology.VaultSpec{SecretPath: "rabbitmq"}}, Endpoints: "test:12345", RabbitmqClusterReference: topology.RabbitmqClusterReference{ - Name: "example-rabbit", - Namespace: "default", + Name: name, + Namespace: schemaReplicationNamespace, }, }, } @@ -191,9 +263,9 @@ var _ = Describe("schema-replication-controller", func() { return fakeSecretStoreClient, nil } - Expect(client.Create(ctx, &replication)).To(Succeed()) + Expect(k8sClient.Create(ctx, &replication)).To(Succeed()) Eventually(func() []topology.Condition { - _ = client.Get( + _ = k8sClient.Get( ctx, types.NamespacedName{Name: replication.Name, Namespace: replication.Namespace}, &replication, diff --git a/controllers/shovel_controller_test.go b/controllers/shovel_controller_test.go index 57ffc0c2..5dab06bc 100644 --- a/controllers/shovel_controller_test.go +++ b/controllers/shovel_controller_test.go @@ -2,10 +2,16 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest/komega" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -19,14 +25,63 @@ import ( ) var _ = Describe("shovel-controller", func() { - var shovel topology.Shovel - var shovelName string + var ( + shovel topology.Shovel + shovelName string + shovelMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + shovelMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{shovelNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(shovelMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = shovelMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: shovelMgr.GetClient(), + Type: &topology.Shovel{}, + Scheme: shovelMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.ShovelReconciler{Client: shovelMgr.GetClient()}, + }).SetupWithManager(shovelMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { shovel = topology.Shovel{ ObjectMeta: metav1.ObjectMeta{ Name: shovelName, - Namespace: "default", + Namespace: shovelNamespace, }, Spec: topology.ShovelSpec{ Name: "my-shovel-configuration", @@ -50,21 +105,24 @@ var _ = Describe("shovel-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &shovel)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some HTTP error"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some HTTP error"), + }))) }) }) @@ -75,21 +133,24 @@ var _ = Describe("shovel-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &shovel)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) @@ -103,24 +164,27 @@ var _ = Describe("shovel-controller", func() { }) It("works", func() { - Expect(client.Create(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) By("setting the correct finalizer") Eventually(komega.Object(&shovel)).WithTimeout(2 * time.Second).Should(HaveField("ObjectMeta.Finalizers", ConsistOf("deletion.finalizers.shovels.rabbitmq.com"))) By("sets the status condition 'Ready' to 'true'") - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) }) }) @@ -131,20 +195,23 @@ var _ = Describe("shovel-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &shovel)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &shovel, ) return shovel.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -158,11 +225,14 @@ var _ = Describe("shovel-controller", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete shovel")) }) }) @@ -174,11 +244,14 @@ var _ = Describe("shovel-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete shovel")) }) }) @@ -193,11 +266,14 @@ var _ = Describe("shovel-controller", func() { }) It("publishes a normal event", func() { - Expect(client.Delete(ctx, &shovel)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, &topology.Shovel{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) Expect(observedEvents()).To(SatisfyAll( Not(ContainElement("Warning FailedDelete failed to delete shovel")), ContainElement("Normal SuccessfulDelete successfully deleted shovel"), diff --git a/controllers/suite_test.go b/controllers/suite_test.go index b72e475a..04cf9bef 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -14,9 +14,8 @@ import ( "crypto/x509" "fmt" "go/build" + "k8s.io/client-go/kubernetes" "path/filepath" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/metrics/server" "testing" "time" @@ -26,8 +25,6 @@ import ( "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient/rabbitmqclientfakes" topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" - topologyClient "github.com/rabbitmq/messaging-topology-operator/pkg/generated/clientset/versioned" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -44,7 +41,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" - "github.com/rabbitmq/messaging-topology-operator/controllers" ) func TestControllers(t *testing.T) { @@ -53,12 +49,10 @@ func TestControllers(t *testing.T) { } var ( - testEnv *envtest.Environment - client runtimeClient.Client - clientSet *topologyClient.Clientset - ctx context.Context - cancel context.CancelFunc - mgr ctrl.Manager + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc + //mgr ctrl.Manager fakeRabbitMQClient *rabbitmqclientfakes.FakeClient fakeRabbitMQClientError error fakeRabbitMQClientFactory = func(connectionCreds map[string]string, tlsEnabled bool, certPool *x509.CertPool) (rabbitmqclient.Client, error) { @@ -76,11 +70,25 @@ var ( arg3 *x509.CertPool } fakeRecorder *record.FakeRecorder - topologyReconcilers []*controllers.TopologyReconciler - superStreamReconciler *controllers.SuperStreamReconciler statusEventsUpdateTimeout = 20 * time.Second ) +const ( + bindingNamespace = "binding-test" + exchangeNamespace = "exchange-test" + permissionNamespace = "permission-test" + policyNamespace = "policy-test" + queueNamespace = "queue-test" + userNamespace = "user-test" + vhostNamespace = "vhost-test" + schemaReplicationNamespace = "schema-replication-test" + federationNamespace = "federation-test" + shovelNamespace = "shovel-test" + topicPermissionNamespace = "topic-permission-test" + superStreamNamespace = "super-stream-test" + topologyNamespace = "topology-reconciler-test" +) + var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter))) @@ -99,216 +107,112 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) Expect(cfg).ToNot(BeNil()) + // These lines are important to ensure that managers and controllers + // know how to convert k8s objects from JSON to Go objects Expect(scheme.AddToScheme(scheme.Scheme)).To(Succeed()) Expect(topology.AddToScheme(scheme.Scheme)).To(Succeed()) Expect(topologyv1alpha1.AddToScheme(scheme.Scheme)).To(Succeed()) Expect(rabbitmqv1beta1.AddToScheme(scheme.Scheme)).To(Succeed()) - clientSet, err = topologyClient.NewForConfig(cfg) - Expect(err).NotTo(HaveOccurred()) - - mgr, err = ctrl.NewManager(cfg, ctrl.Options{ - Scheme: scheme.Scheme, - Metrics: server.Options{ - BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite - }, - Cache: cache.Options{ - DefaultNamespaces: map[string]cache.Config{"default": {}}, - }, - }) - Expect(err).ToNot(HaveOccurred()) - - fakeRecorder = record.NewFakeRecorder(128) - - // The order in which these are declared matters - // Keep it sync with the order in which 'topologyObjects' are declared in 'common_test.go` - topologyReconcilers = []*controllers.TopologyReconciler{ - { - Client: mgr.GetClient(), - Type: &topology.Binding{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.BindingReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Exchange{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.ExchangeReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Permission{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.PermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Policy{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.PolicyReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Queue{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.QueueReconciler{}, - }, - { - Client: mgr.GetClient(), - Type: &topology.User{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.UserReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Vhost{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.VhostReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.SchemaReplication{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.SchemaReplicationReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Federation{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.FederationReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.Shovel{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.ShovelReconciler{Client: mgr.GetClient()}, - }, - { - Client: mgr.GetClient(), - Type: &topology.TopicPermission{}, - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - ReconcileFunc: &controllers.TopicPermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()}, - }, + namespaces := []string{ + bindingNamespace, + exchangeNamespace, + permissionNamespace, + policyNamespace, + queueNamespace, + userNamespace, + vhostNamespace, + schemaReplicationNamespace, + federationNamespace, + shovelNamespace, + topicPermissionNamespace, + superStreamNamespace, + topologyNamespace, } - for _, controller := range topologyReconcilers { - Expect(controller.SetupWithManager(mgr)).To(Succeed()) - } - - superStreamReconciler = &controllers.SuperStreamReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: fakeRecorder, - RabbitmqClientFactory: fakeRabbitMQClientFactory, - } + clientSet, err := kubernetes.NewForConfig(cfg) + Expect(err).ToNot(HaveOccurred()) - Expect(superStreamReconciler.SetupWithManager(mgr)).To(Succeed()) + client, err := runtimeClient.New(cfg, runtimeClient.Options{}) + Expect(err).ToNot(HaveOccurred()) - go func() { - err = mgr.Start(ctx) + for _, n := range namespaces { + _, err := clientSet.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n}}, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - }() - - client = mgr.GetClient() - Expect(client).ToNot(BeNil()) - - komega.SetClient(client) - komega.SetContext(ctx) - - rmq := rabbitmqv1beta1.RabbitmqCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "example-rabbit", - Namespace: "default", - Annotations: map[string]string{ - "rabbitmq.com/topology-allowed-namespaces": "allowed", + rmq := rabbitmqv1beta1.RabbitmqCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "example-rabbit", + Namespace: n, + Annotations: map[string]string{ + "rabbitmq.com/topology-allowed-namespaces": "allowed", + }, }, - }, - Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ - TLS: rabbitmqv1beta1.TLSSpec{ - SecretName: "i-do-not-exist-but-its-fine", + Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ + TLS: rabbitmqv1beta1.TLSSpec{ + SecretName: "i-do-not-exist-but-its-fine", + }, }, - }, - } - Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) - - rmq = rabbitmqv1beta1.RabbitmqCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-all-rabbit", - Namespace: "default", - Annotations: map[string]string{ - "rabbitmq.com/topology-allowed-namespaces": "*", + } + Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) + + rmq = rabbitmqv1beta1.RabbitmqCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "allow-all-rabbit", + Namespace: n, + Annotations: map[string]string{ + "rabbitmq.com/topology-allowed-namespaces": "*", + }, }, - }, + } + Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) } - Expect(createRabbitmqClusterResources(client, &rmq)).To(Succeed()) - endpointsSecretBody := map[string][]byte{ - "username": []byte("a-random-user"), - "password": []byte("a-random-password"), - "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), - } + fakeRecorder = record.NewFakeRecorder(128) + + //Expect(superStreamReconciler.SetupWithManager(mgr)).To(Succeed()) + + komega.SetClient(client) + komega.SetContext(ctx) // used in schema-replication-controller test endpointsSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "endpoints-secret", - Namespace: "default", + Namespace: schemaReplicationNamespace, }, Type: corev1.SecretTypeOpaque, - Data: endpointsSecretBody, + Data: map[string][]byte{ + "username": []byte("a-random-user"), + "password": []byte("a-random-password"), + "endpoints": []byte("a.endpoints.local:5672,b.endpoints.local:5672,c.endpoints.local:5672"), + }, } Expect(client.Create(ctx, &endpointsSecret)).To(Succeed()) - federationUriSecretBody := map[string][]byte{ - "uri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), - } - // used in federation-controller test federationUriSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "federation-uri", - Namespace: "default", + Namespace: federationNamespace, }, Type: corev1.SecretTypeOpaque, - Data: federationUriSecretBody, + Data: map[string][]byte{ + "uri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), + }, } Expect(client.Create(ctx, &federationUriSecret)).To(Succeed()) - shovelUriSecretBody := map[string][]byte{ - "srcUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), - "destUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), - } - // used in shovel-controller test shovelUriSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "shovel-uri-secret", - Namespace: "default", + Namespace: shovelNamespace, }, Type: corev1.SecretTypeOpaque, - Data: shovelUriSecretBody, + Data: map[string][]byte{ + "srcUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), + "destUri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"), + }, } Expect(client.Create(ctx, &shovelUriSecret)).To(Succeed()) }) diff --git a/controllers/super_stream_controller_test.go b/controllers/super_stream_controller_test.go index 1048d2bd..b9c2e266 100644 --- a/controllers/super_stream_controller_test.go +++ b/controllers/super_stream_controller_test.go @@ -1,8 +1,14 @@ package controllers_test import ( + "context" "fmt" + "github.com/rabbitmq/messaging-topology-operator/controllers" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "strconv" "time" @@ -10,7 +16,7 @@ import ( . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" - topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + topologyv1beta1 "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" "github.com/rabbitmq/messaging-topology-operator/internal/managedresource" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,11 +25,59 @@ import ( var _ = Describe("super-stream-controller", func() { - var superStream topologyv1alpha1.SuperStream - var superStreamName string - var expectedQueueNames []string + var ( + superStream topologyv1alpha1.SuperStream + superStreamName string + expectedQueueNames []string + superStreamMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) When("validating RabbitMQ Client failures", func() { + BeforeEach(func() { + var err error + superStreamMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{superStreamNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(superStreamMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = superStreamMgr.GetClient() + + Expect((&controllers.SuperStreamReconciler{ + Log: GinkgoLogr, + Client: superStreamMgr.GetClient(), + Scheme: superStreamMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + }).SetupWithManager(superStreamMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) + JustBeforeEach(func() { fakeRabbitMQClient.DeclareExchangeReturns(&http.Response{ Status: "201 Created", @@ -52,10 +106,10 @@ var _ = Describe("super-stream-controller", func() { superStream = topologyv1alpha1.SuperStream{ ObjectMeta: metav1.ObjectMeta{ Name: superStreamName, - Namespace: "default", + Namespace: superStreamNamespace, }, Spec: topologyv1alpha1.SuperStreamSpec{ - RabbitmqClusterReference: topology.RabbitmqClusterReference{ + RabbitmqClusterReference: topologyv1beta1.RabbitmqClusterReference{ Name: "example-rabbit", }, Partitions: 3, @@ -66,16 +120,16 @@ var _ = Describe("super-stream-controller", func() { Context("creation", func() { When("an underlying resource is deleted", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &superStream)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, 10*time.Second, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -86,27 +140,27 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "delete-binding" }) It("recreates the missing object", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding expectedBindingName := fmt.Sprintf("%s-binding-2", superStreamName) - Expect(client.Get( + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, )).To(Succeed()) initialCreationTimestamp := binding.CreationTimestamp - Expect(client.Delete(ctx, &binding)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &binding)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -114,9 +168,9 @@ var _ = Describe("super-stream-controller", func() { By("recreating the binding", func() { EventuallyWithOffset(1, func() bool { - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) if err != nil { @@ -133,27 +187,27 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "delete-queue" }) It("recreates the missing object", func() { - var queue topology.Queue + var queue topologyv1beta1.Queue expectedQueueName := fmt.Sprintf("%s-partition-1", superStreamName) - Expect(client.Get( + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &queue, )).To(Succeed()) initialCreationTimestamp := queue.CreationTimestamp - Expect(client.Delete(ctx, &queue)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -161,9 +215,9 @@ var _ = Describe("super-stream-controller", func() { By("recreating the queue", func() { EventuallyWithOffset(1, func() bool { - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &queue, ) if err != nil { @@ -180,26 +234,26 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "delete-exchange" }) It("recreates the missing object", func() { - var exchange topology.Exchange - Expect(client.Get( + var exchange topologyv1beta1.Exchange + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName + "-exchange", Namespace: "default"}, + types.NamespacedName{Name: superStreamName + "-exchange", Namespace: superStreamNamespace}, &exchange, )).To(Succeed()) initialCreationTimestamp := exchange.CreationTimestamp - Expect(client.Delete(ctx, &exchange)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -207,9 +261,9 @@ var _ = Describe("super-stream-controller", func() { By("recreating the exchange", func() { EventuallyWithOffset(1, func() bool { - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName + "-exchange", Namespace: "default"}, + types.NamespacedName{Name: superStreamName + "-exchange", Namespace: superStreamNamespace}, &exchange, ) if err != nil { @@ -227,39 +281,39 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "scale-down-super-stream" }) It("refuses scaling down the partitions with a helpful warning", func() { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) originalPartitionCount = len(superStream.Status.Partitions) superStream.Spec.Partitions = 1 - Expect(client.Update(ctx, &superStream)).To(Succeed()) + Expect(k8sClient.Update(ctx, &superStream)).To(Succeed()) By("setting the status condition 'Ready' to 'false' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("FailedCreateOrUpdate"), "Status": Equal(corev1.ConditionFalse), }))) }) By("retaining the original stream queue partitions", func() { - var partition topology.Queue + var partition topologyv1beta1.Queue expectedQueueNames = []string{} for i := 0; i < originalPartitionCount; i++ { expectedQueueName := fmt.Sprintf("%s-partition-%d", superStreamName, i) - Expect(client.Get( + Expect(k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &partition, )).To(Succeed()) expectedQueueNames = append(expectedQueueNames, partition.Spec.Name) @@ -270,7 +324,7 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -279,9 +333,9 @@ var _ = Describe("super-stream-controller", func() { By("setting the status of the super stream to list the partition queue names", func() { ConsistentlyWithOffset(1, func() []string { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) @@ -290,13 +344,13 @@ var _ = Describe("super-stream-controller", func() { }) By("retaining the original bindings", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding for i := 0; i < originalPartitionCount; i++ { expectedBindingName := fmt.Sprintf("%s-binding-%d", superStreamName, i) EventuallyWithOffset(1, func() error { - return client.Get( + return k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) @@ -310,7 +364,7 @@ var _ = Describe("super-stream-controller", func() { "RoutingKey": Equal(strconv.Itoa(i)), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -327,17 +381,17 @@ var _ = Describe("super-stream-controller", func() { When("the super stream is scaled", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &superStream)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -347,23 +401,23 @@ var _ = Describe("super-stream-controller", func() { superStreamName = "scale-out-super-stream" }) It("allows the number of partitions to be increased", func() { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) superStream.Spec.Partitions = 5 - Expect(client.Update(ctx, &superStream)).To(Succeed()) + Expect(k8sClient.Update(ctx, &superStream)).To(Succeed()) By("creating n stream queue partitions", func() { - var partition topology.Queue + var partition topologyv1beta1.Queue expectedQueueNames = []string{} for i := 0; i < superStream.Spec.Partitions; i++ { expectedQueueName := fmt.Sprintf("%s-partition-%s", superStreamName, strconv.Itoa(i)) EventuallyWithOffset(1, func() error { - return client.Get( + return k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &partition, ) }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) @@ -375,7 +429,7 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -384,9 +438,9 @@ var _ = Describe("super-stream-controller", func() { By("setting the status of the super stream to list the partition queue names", func() { EventuallyWithOffset(1, func() []string { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) @@ -395,13 +449,13 @@ var _ = Describe("super-stream-controller", func() { }) By("creating n bindings", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding for i := 0; i < superStream.Spec.Partitions; i++ { expectedBindingName := fmt.Sprintf("%s-binding-%s", superStreamName, strconv.Itoa(i)) EventuallyWithOffset(1, func() error { - return client.Get( + return k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) @@ -415,7 +469,7 @@ var _ = Describe("super-stream-controller", func() { "RoutingKey": Equal(strconv.Itoa(i)), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -423,16 +477,16 @@ var _ = Describe("super-stream-controller", func() { }) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) return superStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) @@ -449,30 +503,30 @@ var _ = Describe("super-stream-controller", func() { It("creates the SuperStream and any underlying resources", func() { superStream.Spec.RoutingKeys = []string{"abc", "bcd", "cde"} superStream.Spec.Partitions = 3 - Expect(client.Create(ctx, &superStream)).To(Succeed()) + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) By("setting the status condition 'Ready' to 'true' ", func() { - EventuallyWithOffset(1, func() []topology.Condition { + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { var fetchedSuperStream topologyv1alpha1.SuperStream - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &fetchedSuperStream, ) return fetchedSuperStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("SuccessfulCreateOrUpdate"), "Status": Equal(corev1.ConditionTrue), }))) }) By("creating an exchange", func() { - var exchange topology.Exchange - err := client.Get( + var exchange topologyv1beta1.Exchange + err := k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName + "-exchange", Namespace: "default"}, + types.NamespacedName{Name: superStreamName + "-exchange", Namespace: superStreamNamespace}, &exchange, ) Expect(err).NotTo(HaveOccurred()) @@ -482,20 +536,20 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) }) By("creating n stream queue partitions", func() { - var partition topology.Queue + var partition topologyv1beta1.Queue expectedQueueNames = []string{} for i := 0; i < superStream.Spec.Partitions; i++ { expectedQueueName := fmt.Sprintf("%s-partition-%d", superStreamName, i) - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedQueueName, Namespace: "default"}, + types.NamespacedName{Name: expectedQueueName, Namespace: superStreamNamespace}, &partition, ) Expect(err).NotTo(HaveOccurred()) @@ -508,7 +562,7 @@ var _ = Describe("super-stream-controller", func() { "Durable": BeTrue(), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -517,9 +571,9 @@ var _ = Describe("super-stream-controller", func() { By("setting the status of the super stream to list the partition queue names", func() { EventuallyWithOffset(1, func() []string { - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &superStream, ) @@ -528,12 +582,12 @@ var _ = Describe("super-stream-controller", func() { }) By("creating n bindings", func() { - var binding topology.Binding + var binding topologyv1beta1.Binding for i := 0; i < superStream.Spec.Partitions; i++ { expectedBindingName := fmt.Sprintf("%s-binding-%d", superStreamName, i) - err := client.Get( + err := k8sClient.Get( ctx, - types.NamespacedName{Name: expectedBindingName, Namespace: "default"}, + types.NamespacedName{Name: expectedBindingName, Namespace: superStreamNamespace}, &binding, ) Expect(err).NotTo(HaveOccurred()) @@ -547,7 +601,7 @@ var _ = Describe("super-stream-controller", func() { "RoutingKey": Equal(superStream.Spec.RoutingKeys[i]), "RabbitmqClusterReference": MatchAllFields(Fields{ "Name": Equal("example-rabbit"), - "Namespace": Equal("default"), + "Namespace": Equal(superStreamNamespace), "ConnectionSecret": BeNil(), }), })) @@ -564,18 +618,18 @@ var _ = Describe("super-stream-controller", func() { It("creates the SuperStream and any underlying resources", func() { superStream.Spec.RoutingKeys = []string{"abc", "bcd", "cde"} superStream.Spec.Partitions = 2 - Expect(client.Create(ctx, &superStream)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { + Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) + EventuallyWithOffset(1, func() []topologyv1beta1.Condition { var fetchedSuperStream topologyv1alpha1.SuperStream - _ = client.Get( + _ = k8sClient.Get( ctx, - types.NamespacedName{Name: superStreamName, Namespace: "default"}, + types.NamespacedName{Name: superStreamName, Namespace: superStreamNamespace}, &fetchedSuperStream, ) return fetchedSuperStream.Status.Conditions }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), + "Type": Equal(topologyv1beta1.ConditionType("Ready")), "Reason": Equal("FailedCreateOrUpdate"), "Message": Equal("SuperStream mismatch failed to reconcile"), "Status": Equal(corev1.ConditionFalse), diff --git a/controllers/topicpermission_controller_test.go b/controllers/topicpermission_controller_test.go index 9d4fd713..16255ed1 100644 --- a/controllers/topicpermission_controller_test.go +++ b/controllers/topicpermission_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,17 +24,66 @@ import ( ) var _ = Describe("topicpermission-controller", func() { - var topicperm topology.TopicPermission - var user topology.User - var name string - var userName string + var ( + topicperm topology.TopicPermission + user topology.User + name string + userName string + topicPermissionMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + topicPermissionMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{topicPermissionNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(topicPermissionMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = topicPermissionMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: topicPermissionMgr.GetClient(), + Type: &topology.TopicPermission{}, + Scheme: topicPermissionMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.TopicPermissionReconciler{Client: topicPermissionMgr.GetClient(), Scheme: topicPermissionMgr.GetScheme()}, + }).SetupWithManager(topicPermissionMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) When("validating RabbitMQ Client failures with username", func() { JustBeforeEach(func() { topicperm = topology.TopicPermission{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: "default", + Namespace: topicPermissionNamespace, }, Spec: topology.TopicPermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -51,21 +106,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -76,21 +134,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -101,20 +162,23 @@ var _ = Describe("topicpermission-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -123,16 +187,19 @@ var _ = Describe("topicpermission-controller", func() { fakeRabbitMQClient.DeleteTopicPermissionsInReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete topicpermission")) }) }) @@ -144,11 +211,14 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete topicpermission")) }) }) @@ -160,24 +230,24 @@ var _ = Describe("topicpermission-controller", func() { user = topology.User{ ObjectMeta: metav1.ObjectMeta{ Name: userName, - Namespace: "default", + Namespace: topicPermissionNamespace, }, Spec: topology.UserSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: topicPermissionNamespace, }, }, } topicperm = topology.TopicPermission{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: "default", + Namespace: topicPermissionNamespace, }, Spec: topology.TopicPermissionSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", - Namespace: "default", + Namespace: topicPermissionNamespace, }, UserReference: &corev1.LocalObjectReference{ Name: userName, @@ -211,21 +281,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Message": Equal("failed create Permission, missing User"), - "Status": Equal(corev1.ConditionFalse), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Message": Equal("failed create Permission, missing User"), + "Status": Equal(corev1.ConditionFalse), + }))) }) }) @@ -236,42 +309,52 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the status condition 'Ready' to 'true' ", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) }) }) Context("deletion", func() { JustBeforeEach(func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topicperm, ) return topicperm.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("Secret User is removed first", func() { @@ -281,17 +364,23 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: user.Name + "-user-credentials", - Namespace: user.Namespace, - }, - })).To(Succeed()) - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + // We used to delete a Secret right here, before this PR: + // https://github.com/rabbitmq/messaging-topology-operator/pull/710 + // + // That PR refactored tests and provided controller isolation in tests, so that + // other controllers i.e. User controller, won't interfere with resources + // created/deleted/modified as part of this suite. Therefore, we don't need to + // delete the Secret objects because, after PR 710, the Secret object is never + // created, which meets the point of this test: when the Secret does not exist + // and Permission is deleted + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) observedEvents := observedEvents() Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete topicpermission")) Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully deleted topicpermission")) @@ -305,16 +394,24 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + observedEvents := observedEvents() Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete topicpermission")) Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully deleted topicpermission")) @@ -328,11 +425,15 @@ var _ = Describe("topicpermission-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &topology.TopicPermission{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) + observedEvents := observedEvents() Expect(observedEvents).NotTo(ContainElement("Warning FailedDelete failed to delete topicpermission")) Expect(observedEvents).To(ContainElement("Normal SuccessfulDelete successfully deleted topicpermission")) @@ -347,16 +448,22 @@ var _ = Describe("topicpermission-controller", func() { }) It("sets the correct deletion ownerref to the object", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - Expect(client.Create(ctx, &topicperm)).To(Succeed()) + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + user.Status.Username = userName + Expect(k8sClient.Status().Update(ctx, &user)).To(Succeed()) + + Expect(k8sClient.Create(ctx, &topicperm)).To(Succeed()) Eventually(func() []metav1.OwnerReference { var fetched topology.TopicPermission - err := client.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &fetched) + err := k8sClient.Get(ctx, types.NamespacedName{Name: topicperm.Name, Namespace: topicperm.Namespace}, &fetched) if err != nil { return []metav1.OwnerReference{} } return fetched.ObjectMeta.OwnerReferences - }, 5).Should(Not(BeEmpty())) + }). + Within(5 * time.Second). + WithPolling(time.Second). + Should(Not(BeEmpty())) }) }) }) diff --git a/controllers/topology_controller_test.go b/controllers/topology_controller_test.go index f8164d0d..4b4959b5 100644 --- a/controllers/topology_controller_test.go +++ b/controllers/topology_controller_test.go @@ -3,13 +3,11 @@ package controllers_test import ( "context" "fmt" - rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1" "github.com/rabbitmq/messaging-topology-operator/controllers" - v1 "k8s.io/api/core/v1" - k8sApiErrors "k8s.io/apimachinery/pkg/api/errors" "net/http" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" @@ -21,13 +19,13 @@ import ( var _ = Describe("TopologyReconciler", func() { const ( - namespace = "topology-reconciler-test" - name = "topology-rabbit" + name = "example-rabbit" ) + var ( commonRabbitmqClusterRef = topology.RabbitmqClusterReference{ Name: name, - Namespace: namespace, + Namespace: topologyNamespace, } commonHttpCreatedResponse = &http.Response{ Status: "201 Created", @@ -40,6 +38,7 @@ var _ = Describe("TopologyReconciler", func() { topologyMgr ctrl.Manager managerCtx context.Context managerCancel context.CancelFunc + k8sClient runtimeClient.Client ) BeforeEach(func() { @@ -49,7 +48,7 @@ var _ = Describe("TopologyReconciler", func() { BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite }, Cache: cache.Options{ - DefaultNamespaces: map[string]cache.Config{namespace: {}}, + DefaultNamespaces: map[string]cache.Config{topologyNamespace: {}}, }, Logger: GinkgoLogr, }) @@ -61,25 +60,7 @@ var _ = Describe("TopologyReconciler", func() { Expect(topologyMgr.Start(ctx)).To(Succeed()) }(managerCtx) - c := topologyMgr.GetClient() - err = c.Create(context.Background(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}) - err = client.Create(context.Background(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}) - if err != nil && !k8sApiErrors.IsAlreadyExists(err) { - Fail(err.Error()) - } - - rmq := &rabbitmqv1beta1.RabbitmqCluster{ - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, - Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ - TLS: rabbitmqv1beta1.TLSSpec{ - SecretName: "i-do-not-exist-but-its-fine", - }, - }, - } - err = createRabbitmqClusterResources(c, rmq) - if err != nil && !k8sApiErrors.IsAlreadyExists(err) { - Fail(err.Error()) - } + k8sClient = topologyMgr.GetClient() }) AfterEach(func() { @@ -107,19 +88,19 @@ var _ = Describe("TopologyReconciler", func() { }).SetupWithManager(topologyMgr)).To(Succeed()) queue := &topology.Queue{ - ObjectMeta: metav1.ObjectMeta{Name: "ab-queue", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: "ab-queue", Namespace: topologyNamespace}, Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef}, } fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil) fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil) - Expect(client.Create(ctx, queue)).To(Succeed()) + Expect(k8sClient.Create(ctx, queue)).To(Succeed()) Eventually(func() int { return len(fakeRabbitMQClientFactoryArgsForCall) }, 5).Should(BeNumerically(">", 0)) credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(0) - expected := fmt.Sprintf("https://%s.%s.svc.some-domain.com:15671", name, namespace) + expected := fmt.Sprintf("https://%s.%s.svc.some-domain.com:15671", name, topologyNamespace) Expect(credentials).Should(HaveKeyWithValue("uri", expected)) }) }) @@ -136,19 +117,19 @@ var _ = Describe("TopologyReconciler", func() { }).SetupWithManager(topologyMgr)).To(Succeed()) queue := &topology.Queue{ - ObjectMeta: metav1.ObjectMeta{Name: "bb-queue", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: "bb-queue", Namespace: topologyNamespace}, Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef}, } fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil) fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil) - Expect(client.Create(ctx, queue)).To(Succeed()) + Expect(k8sClient.Create(ctx, queue)).To(Succeed()) Eventually(func() int { return len(fakeRabbitMQClientFactoryArgsForCall) }, 5).Should(BeNumerically(">", 0)) credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(0) - expected := fmt.Sprintf("https://%s.%s.svc:15671", name, namespace) + expected := fmt.Sprintf("https://%s.%s.svc:15671", name, topologyNamespace) Expect(credentials).Should(HaveKeyWithValue("uri", expected)) }) }) @@ -166,19 +147,19 @@ var _ = Describe("TopologyReconciler", func() { }).SetupWithManager(topologyMgr)).To(Succeed()) queue := &topology.Queue{ - ObjectMeta: metav1.ObjectMeta{Name: "cb-queue", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: "cb-queue", Namespace: topologyNamespace}, Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef}, } fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil) fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil) - Expect(client.Create(ctx, queue)).To(Succeed()) + Expect(k8sClient.Create(ctx, queue)).To(Succeed()) Eventually(func() int { return len(fakeRabbitMQClientFactoryArgsForCall) }, 5).Should(BeNumerically(">", 0)) credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(0) - expected := fmt.Sprintf("http://%s.%s.svc:15672", name, namespace) + expected := fmt.Sprintf("http://%s.%s.svc:15672", name, topologyNamespace) Expect(credentials).Should(HaveKeyWithValue("uri", expected)) }) }) diff --git a/controllers/user_controller_test.go b/controllers/user_controller_test.go index b18ddb9a..aa44a89a 100644 --- a/controllers/user_controller_test.go +++ b/controllers/user_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" + "github.com/rabbitmq/messaging-topology-operator/controllers" "io/ioutil" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("UserController", func() { - var user topology.User - var userName string + var ( + user topology.User + userName string + userMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + userMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{userNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(userMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = userMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: userMgr.GetClient(), + Type: &topology.User{}, + Scheme: userMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.UserReconciler{Client: userMgr.GetClient(), Scheme: userMgr.GetScheme()}, + }).SetupWithManager(userMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { user = topology.User{ ObjectMeta: metav1.ObjectMeta{ Name: userName, - Namespace: "default", + Namespace: userNamespace, }, Spec: topology.UserSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -34,6 +89,7 @@ var _ = Describe("UserController", func() { }, } }) + When("creating a user", func() { When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { @@ -45,21 +101,24 @@ var _ = Describe("UserController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &user, ) return user.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("some HTTP error"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("some HTTP error"), + }))) }) }) @@ -70,21 +129,24 @@ var _ = Describe("UserController", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &user)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &user, ) return user.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("hit a exception"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("hit a exception"), + }))) }) }) }) @@ -95,20 +157,23 @@ var _ = Describe("UserController", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &user)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &user, ) return user.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -122,11 +187,14 @@ var _ = Describe("UserController", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete user")) }) }) @@ -138,11 +206,14 @@ var _ = Describe("UserController", func() { }) It("raises an event to indicate a failure to delete", func() { - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete user")) }) }) @@ -157,17 +228,20 @@ var _ = Describe("UserController", func() { }) It("raises an event to indicate a successful deletion", func() { - Expect(client.Delete(ctx, &corev1.Secret{ + Expect(k8sClient.Delete(ctx, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: user.Name + "-user-credentials", Namespace: user.Namespace, }, })).To(Succeed()) - Expect(client.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) Eventually(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, &topology.User{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeTrue()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeTrue()) Expect(observedEvents()).To(SatisfyAll( Not(ContainElement("Warning FailedDelete failed to delete user")), diff --git a/controllers/vhost_controller_test.go b/controllers/vhost_controller_test.go index fc2564a0..5320dc0a 100644 --- a/controllers/vhost_controller_test.go +++ b/controllers/vhost_controller_test.go @@ -2,9 +2,15 @@ package controllers_test import ( "bytes" + "context" "errors" - "io/ioutil" + "github.com/rabbitmq/messaging-topology-operator/controllers" + "io" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" "time" . "github.com/onsi/ginkgo/v2" @@ -18,14 +24,63 @@ import ( ) var _ = Describe("vhost-controller", func() { - var vhost topology.Vhost - var vhostName string + var ( + vhost topology.Vhost + vhostName string + vhostMgr ctrl.Manager + managerCtx context.Context + managerCancel context.CancelFunc + k8sClient runtimeClient.Client + ) + + BeforeEach(func() { + var err error + vhostMgr, err = ctrl.NewManager(testEnv.Config, ctrl.Options{ + Metrics: server.Options{ + BindAddress: "0", // To avoid MacOS firewall pop-up every time you run this suite + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{vhostNamespace: {}}, + }, + Logger: GinkgoLogr, + }) + Expect(err).ToNot(HaveOccurred()) + + managerCtx, managerCancel = context.WithCancel(context.Background()) + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(vhostMgr.Start(ctx)).To(Succeed()) + }(managerCtx) + + k8sClient = vhostMgr.GetClient() + + Expect((&controllers.TopologyReconciler{ + Client: vhostMgr.GetClient(), + Type: &topology.Vhost{}, + Scheme: vhostMgr.GetScheme(), + Recorder: fakeRecorder, + RabbitmqClientFactory: fakeRabbitMQClientFactory, + ReconcileFunc: &controllers.VhostReconciler{Client: vhostMgr.GetClient()}, + }).SetupWithManager(vhostMgr)).To(Succeed()) + }) + + AfterEach(func() { + managerCancel() + // Sad workaround to avoid controllers racing for the reconciliation of other's + // test cases. Without this wait, the last run test consistently fails because + // the previous cancelled manager is just in time to reconcile the Queue of the + // new/last test, and use the wrong/unexpected arguments in the queue declare call + // + // Eventual consistency is nice when you have good means of awaiting. That's not the + // case with testenv and kubernetes controllers. + <-time.After(time.Second) + }) JustBeforeEach(func() { vhost = topology.Vhost{ ObjectMeta: metav1.ObjectMeta{ Name: vhostName, - Namespace: "default", + Namespace: vhostNamespace, }, Spec: topology.VhostSpec{ RabbitmqClusterReference: topology.RabbitmqClusterReference{ @@ -46,21 +101,24 @@ var _ = Describe("vhost-controller", func() { }) It("sets the status condition", func() { - Expect(client.Create(ctx, &vhost)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &vhost, ) return vhost.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a failure"), + }))) }) }) @@ -71,21 +129,24 @@ var _ = Describe("vhost-controller", func() { }) It("sets the status condition to indicate a failure to reconcile", func() { - Expect(client.Create(ctx, &vhost)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &vhost, ) return vhost.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("FailedCreateOrUpdate"), - "Status": Equal(corev1.ConditionFalse), - "Message": ContainSubstring("a go failure"), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("FailedCreateOrUpdate"), + "Status": Equal(corev1.ConditionFalse), + "Message": ContainSubstring("a go failure"), + }))) }) }) }) @@ -96,20 +157,23 @@ var _ = Describe("vhost-controller", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) - Expect(client.Create(ctx, &vhost)).To(Succeed()) - EventuallyWithOffset(1, func() []topology.Condition { - _ = client.Get( + Expect(k8sClient.Create(ctx, &vhost)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &vhost, ) return vhost.Status.Conditions - }, statusEventsUpdateTimeout, 1*time.Second).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(topology.ConditionType("Ready")), - "Reason": Equal("SuccessfulCreateOrUpdate"), - "Status": Equal(corev1.ConditionTrue), - }))) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) }) When("the RabbitMQ Client returns a HTTP error response", func() { @@ -118,16 +182,19 @@ var _ = Describe("vhost-controller", func() { fakeRabbitMQClient.DeleteVhostReturns(&http.Response{ Status: "502 Bad Gateway", StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBufferString("Hello World")), + Body: io.NopCloser(bytes.NewBufferString("Hello World")), }, nil) }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &vhost)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &vhost)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete vhost")) }) }) @@ -139,11 +206,14 @@ var _ = Describe("vhost-controller", func() { }) It("publishes a 'warning' event", func() { - Expect(client.Delete(ctx, &vhost)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &vhost)).To(Succeed()) Consistently(func() bool { - err := client.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) + err := k8sClient.Get(ctx, types.NamespacedName{Name: vhost.Name, Namespace: vhost.Namespace}, &topology.Vhost{}) return apierrors.IsNotFound(err) - }, statusEventsUpdateTimeout).Should(BeFalse()) + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(BeFalse()) Expect(observedEvents()).To(ContainElement("Warning FailedDelete failed to delete vhost")) }) }) diff --git a/go.sum b/go.sum index 9fc202be..2a37e48b 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,7 @@ github.com/go-errors/errors v1.5.0 h1:/EuijeGOu7ckFxzhkj4CXJ8JaenxK7bKUxpPYqeLHq github.com/go-errors/errors v1.5.0/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo= github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -162,8 +163,10 @@ github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -218,6 +221,7 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod github.com/mreiferson/go-httpclient v0.0.0-20160630210159-31f0106b4474/go.mod h1:OQA4XLvDbMgS8P0CevmM4m9Q3Jq4phKUzcocxuGJ5m8= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/internal/binding_test.go b/internal/binding_test.go index b2a68a23..f17836a7 100644 --- a/internal/binding_test.go +++ b/internal/binding_test.go @@ -11,6 +11,7 @@ import ( var _ = Describe("Binding", func() { var binding *topology.Binding + Context("GenerateBindingInfo", func() { BeforeEach(func() { binding = &topology.Binding{