Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CFE-1134: Watch infrastructure and update AWS tags #1148

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/operator/controller/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (

const (
controllerName = "ingress_controller"
// clusterInfrastructureName is the name of the 'cluster' infrastructure object.
clusterInfrastructureName = "cluster"
)

// TODO: consider moving these to openshift/api
Expand Down Expand Up @@ -134,6 +136,12 @@ func New(mgr manager.Manager, config Config) (controller.Controller, error) {
if err := c.Watch(source.Kind[client.Object](operatorCache, &configv1.Proxy{}, handler.EnqueueRequestsFromMapFunc(reconciler.ingressConfigToIngressController))); err != nil {
return nil, err
}
// Watch for changes to infrastructure config to update user defined tags.
if err := c.Watch(source.Kind[client.Object](operatorCache, &configv1.Infrastructure{}, handler.EnqueueRequestsFromMapFunc(reconciler.ingressConfigToIngressController),
predicate.NewPredicateFuncs(hasName(clusterInfrastructureName)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other watches technically should have this predicate too, and ingressConfigToIngressController should be renamed. However, adding the predicate to the other watches and renaming the map function should be addressed in a follow-up.

)); err != nil {
return nil, err
}
return c, nil
}

Expand Down Expand Up @@ -187,6 +195,13 @@ func enqueueRequestForOwningIngressController(namespace string) handler.EventHan
})
}

// hasName returns a predicate which checks whether an object has the given name.
func hasName(name string) func(o client.Object) bool {
return func(o client.Object) bool {
return o.GetName() == name
}
}

// Config holds all the things necessary for the controller to run.
type Config struct {
Namespace string
Expand Down
17 changes: 10 additions & 7 deletions pkg/operator/controller/ingress/load_balancer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ var (
//
// https://cloud.ibm.com/docs/containers?topic=containers-vpc-lbaas
iksLBEnableFeaturesAnnotation,
// awsLBAdditionalResourceTags annotation is populated
// by user tags present in
// Status.PlatformStatus.AWS.ResourceTags in the
// infrastructure config.
awsLBAdditionalResourceTags,
)

// Azure and GCP support switching between internal and external
Expand Down Expand Up @@ -751,12 +756,6 @@ func IsServiceInternal(service *corev1.Service) bool {
return false
}

// loadBalancerServiceTagsModified verifies that none of the managedAnnotations have been changed and also the AWS tags annotation
func loadBalancerServiceTagsModified(current, expected *corev1.Service) (bool, *corev1.Service) {
ignoredAnnotations := managedLoadBalancerServiceAnnotations.Union(sets.NewString(awsLBAdditionalResourceTags))
return loadBalancerServiceAnnotationsChanged(current, expected, ignoredAnnotations)
}

// loadBalancerServiceIsUpgradeable returns an error value indicating if the
// load balancer service is safe to upgrade. In particular, if the current
// service matches the desired service, then the service is upgradeable, and the
Expand All @@ -773,7 +772,11 @@ func loadBalancerServiceIsUpgradeable(ic *operatorv1.IngressController, deployme
return nil
}

changed, updated := loadBalancerServiceTagsModified(current, desired)
// Verify that none of the managedAnnotations have been changed by something or someone.
// Since the status logic runs after the controller sets the annotations, it checks for
// any discrepancy (in case modified) between the desired annotation values of the controller
// and the current annotation values.
changed, updated := loadBalancerServiceAnnotationsChanged(current, desired, managedLoadBalancerServiceAnnotations)
if changed {
diff := cmp.Diff(current, updated, cmpopts.EquateEmpty())
return fmt.Errorf("load balancer service has been modified; changes must be reverted before upgrading: %s", diff)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ func Test_loadBalancerServiceChanged(t *testing.T) {
mutate: func(svc *corev1.Service) {
svc.Annotations["service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags"] = "Key3=Value3,Key4=Value4"
},
expect: false,
expect: true,
},
{
description: "if the service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout annotation changes",
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/controller/ingress/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3022,7 +3022,7 @@ func Test_computeIngressUpgradeableCondition(t *testing.T) {
expect: true,
},
{
description: "if the service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags annotation changes",
description: "if the service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags annotation changes not by ingress controller",
mutate: func(svc *corev1.Service) {
svc.Annotations[awsLBAdditionalResourceTags] = "Key2=Value2"
},
Expand Down
1 change: 1 addition & 0 deletions test/e2e/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestAll(t *testing.T) {
t.Run("TestDefaultIngressClass", TestDefaultIngressClass)
t.Run("TestOperatorRecreatesItsClusterOperator", TestOperatorRecreatesItsClusterOperator)
t.Run("TestAWSLBTypeDefaulting", TestAWSLBTypeDefaulting)
t.Run("TestAWSResourceTagsChanged", TestAWSResourceTagsChanged)
t.Run("TestHstsPolicyWorks", TestHstsPolicyWorks)
t.Run("TestIngressControllerCustomEndpoints", TestIngressControllerCustomEndpoints)
t.Run("TestIngressStatus", TestIngressStatus)
Expand Down
110 changes: 109 additions & 1 deletion test/e2e/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/service/ec2"
"io"
"io/ioutil"
"net"
Expand All @@ -32,13 +31,15 @@ import (
iov1 "github.com/openshift/api/operatoringress/v1"
routev1 "github.com/openshift/api/route/v1"

configclientset "github.com/openshift/client-go/config/clientset/versioned"
"github.com/openshift/cluster-ingress-operator/pkg/manifests"
chiragkyal marked this conversation as resolved.
Show resolved Hide resolved
operatorclient "github.com/openshift/cluster-ingress-operator/pkg/operator/client"
"github.com/openshift/cluster-ingress-operator/pkg/operator/controller"
operatorcontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller"
ingresscontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/ingress"

"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/service/ec2"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -116,6 +117,7 @@ var (

var (
kclient client.Client
configClient *configclientset.Clientset
dnsConfig configv1.DNS
infraConfig configv1.Infrastructure
operatorNamespace = operatorcontroller.DefaultOperatorNamespace
Expand Down Expand Up @@ -153,6 +155,12 @@ func TestMain(m *testing.M) {
}
kclient = kubeClient

configClient, err = configclientset.NewForConfig(kubeConfig)
if err != nil {
fmt.Printf("failed to create config client: %s\n", err)
os.Exit(1)
}

if err := kclient.Get(context.TODO(), types.NamespacedName{Name: "cluster"}, &dnsConfig); err != nil {
fmt.Printf("failed to get DNS config: %v\n", err)
os.Exit(1)
Expand Down Expand Up @@ -1291,6 +1299,83 @@ func TestInternalLoadBalancerGlobalAccessGCP(t *testing.T) {
}
}

// TestAWSResourceTagsChanged tests the functionality of updating AWS resource tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the acceptance criteria is:
"any modifications to user-defined tags (platform.AWS.ResourceTags) trigger an update of the load balancer service" - shouldn't we have a couple more tests, like deleting a user-defined tag and adding a user-defined tag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like deleting a user-defined tag and adding a user-defined tag?

The test already covers adding a user-defined tag.

However, updating the infra status again to remove certain tag is possible, which will update the annotation as well, but the tag won't be removed from the AWS resource itself, and this is an expected behaviour for cloud-provider-aws. See #1148 (comment) for more details.

I've extended the test to cover this scenario of tag removal and annotation update in the latest changes. Hope it covers the acceptance criteria.

// in the infrastructure configuration and validates that the expected
// awsLBAdditionalResourceTags is set correctly on the
// loadBalancer service associated with the default Ingress Controller.
//
// This test is a serial test because it modifies the cluster infrastructure config and
// therefore should not run in parallel with other tests.
func TestAWSResourceTagsChanged(t *testing.T) {
if infraConfig.Status.Platform != "AWS" {
t.Skipf("test skipped on platform %q", infraConfig.Status.Platform)
}
if err := waitForIngressControllerCondition(t, kclient, 10*time.Second, defaultName, defaultAvailableConditions...); err != nil {
t.Errorf("did not get expected conditions: %v", err)
}
defaultIC := &operatorv1.IngressController{
ObjectMeta: metav1.ObjectMeta{
Namespace: defaultName.Namespace,
Name: defaultName.Name,
},
}
awsLBAdditionalResourceTags := "service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags"

// Save a copy of the original infraConfig, to revert changes before exiting.
originalInfra := infraConfig.DeepCopy()
t.Cleanup(func() {
err := updateInfrastructureConfigStatusWithRetryOnConflict(t, 5*time.Minute, configClient, func(infra *configv1.Infrastructure) *configv1.Infrastructure {
infra.Status = originalInfra.Status
return infra
})
if err != nil {
t.Logf("Unable to remove changes from the infraConfig, possible corruption of test environment: %v", err)
}
})
Copy link
Contributor

@candita candita Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this:

Suggested change
})
}
if err != nil {
t.Logf("Unable to remove changes to the infraConfig, possible corruption of test environment: %v", err)
}
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated.


initialTags := []configv1.AWSResourceTag{
{Key: "Key1", Value: "Value1"},
{Key: "Key2", Value: "Value2"},
}
t.Logf("Updating AWS ResourceTags in the cluster infrastructure config: %v", initialTags)
err := updateInfrastructureConfigStatusWithRetryOnConflict(t, 5*time.Minute, configClient, func(infra *configv1.Infrastructure) *configv1.Infrastructure {
if infra.Status.PlatformStatus == nil {
infra.Status.PlatformStatus = &configv1.PlatformStatus{}
}
if infra.Status.PlatformStatus.AWS == nil {
infra.Status.PlatformStatus.AWS = &configv1.AWSPlatformStatus{}
}
infra.Status.PlatformStatus.AWS.ResourceTags = initialTags
return infra
})
if err != nil {
t.Errorf("failed to update infrastructure status: %v", err)
}

chiragkyal marked this conversation as resolved.
Show resolved Hide resolved
// Check awsLBAdditionalResourceTags annotation with initial tags.
expectedTags := "Key1=Value1,Key2=Value2"
t.Logf("Validating the %s annotation for the load balancer service of the default ingresscontroller", awsLBAdditionalResourceTags)
assertLoadBalancerServiceAnnotationWithPollImmediate(t, kclient, defaultIC, awsLBAdditionalResourceTags, expectedTags)

// Update the status again, removing one tag.
updatedTags := []configv1.AWSResourceTag{
{Key: "Key1", Value: "Value1"},
}
t.Logf("Updating AWS ResourceTags in the cluster infrastructure config: %v", updatedTags)
err = updateInfrastructureConfigStatusWithRetryOnConflict(t, 5*time.Minute, configClient, func(infra *configv1.Infrastructure) *configv1.Infrastructure {
infra.Status.PlatformStatus.AWS.ResourceTags = updatedTags
return infra
})
if err != nil {
t.Errorf("failed to update infrastructure status: %v", err)
}

// Check awsLBAdditionalResourceTags annotation with updated tags.
expectedTags = "Key1=Value1"
t.Logf("Validating the %s annotation for the load balancer service of the default ingresscontroller", awsLBAdditionalResourceTags)
assertLoadBalancerServiceAnnotationWithPollImmediate(t, kclient, defaultIC, awsLBAdditionalResourceTags, expectedTags)
}

func TestAWSLBTypeChange(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -4323,6 +4408,29 @@ func assertServiceAnnotation(t *testing.T, serviceName types.NamespacedName, ann
}
}

// assertLoadBalancerServiceAnnotationWithPollImmediate checks if the specified annotation on the
// LoadBalancer Service of the given IngressController matches the expected value.
func assertLoadBalancerServiceAnnotationWithPollImmediate(t *testing.T, kclient client.Client, ic *operatorv1.IngressController, annotationKey, expectedValue string) {
err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) {
service := &corev1.Service{}
if err := kclient.Get(ctx, controller.LoadBalancerServiceName(ic), service); err != nil {
t.Logf("failed to get service %s: %v, retrying...", controller.LoadBalancerServiceName(ic), err)
return false, nil
}
if actualValue, ok := service.Annotations[annotationKey]; !ok {
t.Logf("load balancer has no %q annotation yet: %v, retrying...", annotationKey, service.Annotations)
return false, nil
} else if actualValue != expectedValue {
t.Logf("expected %s, found %s", expectedValue, actualValue)
return false, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect that we don't want to keep trying, after we found an unexpected value. Or would we expect it change after this?

Suggested change
return false, nil
return false, fmt.Errorf("expected %s, found %s", expectedValue, actualValue)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep trying here because the annotation value might not get updated immediately after the infra status is updated.

}
return true, nil
})
if err != nil {
t.Fatalf("timed out waiting for the %s annotation to be updated: %v", annotationKey, err)
}
}

// assertServiceNotDeleted asserts that a provide service wasn't deleted.
func assertServiceNotDeleted(t *testing.T, serviceName types.NamespacedName, oldUid types.UID) {
t.Helper()
Expand Down
27 changes: 27 additions & 0 deletions test/e2e/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
configv1 "github.com/openshift/api/config/v1"
operatorv1 "github.com/openshift/api/operator/v1"
routev1 "github.com/openshift/api/route/v1"
configclientset "github.com/openshift/client-go/config/clientset/versioned"
"github.com/openshift/cluster-ingress-operator/pkg/operator/controller"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -595,6 +596,32 @@ func updateInfrastructureConfigSpecWithRetryOnConflict(t *testing.T, name types.
})
}

// updateInfrastructureStatus updates the Infrastructure status by applying
// the given update function to the current Infrastructure object.
// If there is a conflict error on update then the complete operation
// is retried until timeout is reached.
func updateInfrastructureConfigStatusWithRetryOnConflict(t *testing.T, timeout time.Duration, configClient *configclientset.Clientset, updateFunc func(*configv1.Infrastructure) *configv1.Infrastructure) error {
return wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
infra, err := configClient.ConfigV1().Infrastructures().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
t.Logf("error getting 'cluster' infrastructure config: %v, retrying...", err)
return false, nil
}

// Apply the update function to the Infrastructure object.
updatedInfra := updateFunc(infra.DeepCopy())

if _, err := configClient.ConfigV1().Infrastructures().UpdateStatus(ctx, updatedInfra, metav1.UpdateOptions{}); err != nil {
if errors.IsConflict(err) {
t.Logf("conflict when updating 'cluster' infrastructure config: %v, retrying...", err)
return false, nil
}
return false, err
}
return true, nil
})
}

// verifyExternalIngressController verifies connectivity between the router
// and a test workload by making a http call using the hostname passed to it.
// This hostname must be the domain associated with the ingresscontroller under test.
Expand Down