Skip to content

Commit

Permalink
Watch Infrastructure object and update AWS user tags
Browse files Browse the repository at this point in the history
- Ingress controller now monitors changes to the Infrastructure object,
ensuring that modifications to user-defined AWS ResourceTags (platform.AWS.ResourceTags) trigger updates to the load balancer service.
- Consider awsLBAdditionalResourceTags annotation as a managed annotation.

Signed-off-by: chiragkyal <[email protected]>
  • Loading branch information
chiragkyal committed Nov 1, 2024
1 parent 871b2b2 commit 19eced8
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 10 deletions.
15 changes: 15 additions & 0 deletions pkg/operator/controller/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (

const (
controllerName = "ingress_controller"
// clusterInfrastructureName is the name of the 'cluster' infrastructure object.
clusterInfrastructureName = "cluster"
)

// TODO: consider moving these to openshift/api
Expand Down Expand Up @@ -134,6 +136,12 @@ func New(mgr manager.Manager, config Config) (controller.Controller, error) {
if err := c.Watch(source.Kind[client.Object](operatorCache, &configv1.Proxy{}, handler.EnqueueRequestsFromMapFunc(reconciler.ingressConfigToIngressController))); err != nil {
return nil, err
}
// Watch for changes to infrastructure config to update user defined tags.
if err := c.Watch(source.Kind[client.Object](operatorCache, &configv1.Infrastructure{}, handler.EnqueueRequestsFromMapFunc(reconciler.ingressConfigToIngressController),
predicate.NewPredicateFuncs(hasName(clusterInfrastructureName)),
)); err != nil {
return nil, err
}
return c, nil
}

Expand Down Expand Up @@ -187,6 +195,13 @@ func enqueueRequestForOwningIngressController(namespace string) handler.EventHan
})
}

// hasName returns a predicate which checks whether an object has the given name.
func hasName(name string) func(o client.Object) bool {
return func(o client.Object) bool {
return o.GetName() == name
}
}

// Config holds all the things necessary for the controller to run.
type Config struct {
Namespace string
Expand Down
17 changes: 10 additions & 7 deletions pkg/operator/controller/ingress/load_balancer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ var (
//
// https://cloud.ibm.com/docs/containers?topic=containers-vpc-lbaas
iksLBEnableFeaturesAnnotation,
// awsLBAdditionalResourceTags annotation is populated
// by user tags present in
// Status.PlatformStatus.AWS.ResourceTags in the
// infrastructure config.
awsLBAdditionalResourceTags,
)

// Azure and GCP support switching between internal and external
Expand Down Expand Up @@ -751,12 +756,6 @@ func IsServiceInternal(service *corev1.Service) bool {
return false
}

// loadBalancerServiceTagsModified verifies that none of the managedAnnotations have been changed and also the AWS tags annotation
func loadBalancerServiceTagsModified(current, expected *corev1.Service) (bool, *corev1.Service) {
ignoredAnnotations := managedLoadBalancerServiceAnnotations.Union(sets.NewString(awsLBAdditionalResourceTags))
return loadBalancerServiceAnnotationsChanged(current, expected, ignoredAnnotations)
}

// loadBalancerServiceIsUpgradeable returns an error value indicating if the
// load balancer service is safe to upgrade. In particular, if the current
// service matches the desired service, then the service is upgradeable, and the
Expand All @@ -773,7 +772,11 @@ func loadBalancerServiceIsUpgradeable(ic *operatorv1.IngressController, deployme
return nil
}

changed, updated := loadBalancerServiceTagsModified(current, desired)
// Verify that none of the managedAnnotations have been changed by something or someone.
// Since the status logic runs after the controller sets the annotations, it checks for
// any discrepancy (in case modified) between the desired annotation values of the controller
// and the current annotation values.
changed, updated := loadBalancerServiceAnnotationsChanged(current, desired, managedLoadBalancerServiceAnnotations)
if changed {
diff := cmp.Diff(current, updated, cmpopts.EquateEmpty())
return fmt.Errorf("load balancer service has been modified; changes must be reverted before upgrading: %s", diff)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ func Test_loadBalancerServiceChanged(t *testing.T) {
mutate: func(svc *corev1.Service) {
svc.Annotations["service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags"] = "Key3=Value3,Key4=Value4"
},
expect: false,
expect: true,
},
{
description: "if the service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout annotation changes",
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/controller/ingress/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3022,7 +3022,7 @@ func Test_computeIngressUpgradeableCondition(t *testing.T) {
expect: true,
},
{
description: "if the service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags annotation changes",
description: "if the service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags annotation changes not by ingress controller",
mutate: func(svc *corev1.Service) {
svc.Annotations[awsLBAdditionalResourceTags] = "Key2=Value2"
},
Expand Down
1 change: 1 addition & 0 deletions test/e2e/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestAll(t *testing.T) {
t.Run("TestDefaultIngressClass", TestDefaultIngressClass)
t.Run("TestOperatorRecreatesItsClusterOperator", TestOperatorRecreatesItsClusterOperator)
t.Run("TestAWSLBTypeDefaulting", TestAWSLBTypeDefaulting)
t.Run("TestAWSResourceTagsChanged", TestAWSResourceTagsChanged)
t.Run("TestHstsPolicyWorks", TestHstsPolicyWorks)
t.Run("TestIngressControllerCustomEndpoints", TestIngressControllerCustomEndpoints)
t.Run("TestIngressStatus", TestIngressStatus)
Expand Down
110 changes: 109 additions & 1 deletion test/e2e/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/service/ec2"
"io"
"io/ioutil"
"net"
Expand All @@ -32,13 +31,15 @@ import (
iov1 "github.com/openshift/api/operatoringress/v1"
routev1 "github.com/openshift/api/route/v1"

configclientset "github.com/openshift/client-go/config/clientset/versioned"
"github.com/openshift/cluster-ingress-operator/pkg/manifests"
operatorclient "github.com/openshift/cluster-ingress-operator/pkg/operator/client"
"github.com/openshift/cluster-ingress-operator/pkg/operator/controller"
operatorcontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller"
ingresscontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/ingress"

"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/service/ec2"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -116,6 +117,7 @@ var (

var (
kclient client.Client
configClient *configclientset.Clientset
dnsConfig configv1.DNS
infraConfig configv1.Infrastructure
operatorNamespace = operatorcontroller.DefaultOperatorNamespace
Expand Down Expand Up @@ -153,6 +155,12 @@ func TestMain(m *testing.M) {
}
kclient = kubeClient

configClient, err = configclientset.NewForConfig(kubeConfig)
if err != nil {
fmt.Printf("failed to create config client: %s\n", err)
os.Exit(1)
}

if err := kclient.Get(context.TODO(), types.NamespacedName{Name: "cluster"}, &dnsConfig); err != nil {
fmt.Printf("failed to get DNS config: %v\n", err)
os.Exit(1)
Expand Down Expand Up @@ -1291,6 +1299,83 @@ func TestInternalLoadBalancerGlobalAccessGCP(t *testing.T) {
}
}

// TestAWSResourceTagsChanged tests the functionality of updating AWS resource tags
// in the infrastructure configuration and validates that the expected
// awsLBAdditionalResourceTags is set correctly on the
// loadBalancer service associated with the default Ingress Controller.
//
// This test is a serial test because it modifies the cluster infrastructure config and
// therefore should not run in parallel with other tests.
func TestAWSResourceTagsChanged(t *testing.T) {
if infraConfig.Status.Platform != "AWS" {
t.Skipf("test skipped on platform %q", infraConfig.Status.Platform)
}
if err := waitForIngressControllerCondition(t, kclient, 10*time.Second, defaultName, defaultAvailableConditions...); err != nil {
t.Errorf("did not get expected conditions: %v", err)
}
defaultIC := &operatorv1.IngressController{
ObjectMeta: metav1.ObjectMeta{
Namespace: defaultName.Namespace,
Name: defaultName.Name,
},
}
awsLBAdditionalResourceTags := "service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags"

// Save a copy of the original infraConfig, to revert changes before exiting.
originalInfra := infraConfig.DeepCopy()
t.Cleanup(func() {
err := updateInfrastructureConfigStatusWithRetryOnConflict(t, 5*time.Minute, configClient, func(infra *configv1.Infrastructure) *configv1.Infrastructure {
infra.Status = originalInfra.Status
return infra
})
if err != nil {
t.Logf("Unable to remove changes from the infraConfig, possible corruption of test environment: %v", err)
}
})

initialTags := []configv1.AWSResourceTag{
{Key: "Key1", Value: "Value1"},
{Key: "Key2", Value: "Value2"},
}
t.Logf("Updating AWS ResourceTags in the cluster infrastructure config: %v", initialTags)
err := updateInfrastructureConfigStatusWithRetryOnConflict(t, 5*time.Minute, configClient, func(infra *configv1.Infrastructure) *configv1.Infrastructure {
if infra.Status.PlatformStatus == nil {
infra.Status.PlatformStatus = &configv1.PlatformStatus{}
}
if infra.Status.PlatformStatus.AWS == nil {
infra.Status.PlatformStatus.AWS = &configv1.AWSPlatformStatus{}
}
infra.Status.PlatformStatus.AWS.ResourceTags = initialTags
return infra
})
if err != nil {
t.Errorf("failed to update infrastructure status: %v", err)
}

// Check awsLBAdditionalResourceTags annotation with initial tags.
expectedTags := "Key1=Value1,Key2=Value2"
t.Logf("Validating the %s annotation for the load balancer service of the default ingresscontroller", awsLBAdditionalResourceTags)
assertLoadBalancerServiceAnnotationWithPollImmediate(t, kclient, defaultIC, awsLBAdditionalResourceTags, expectedTags)

// Update the status again, removing one tag.
updatedTags := []configv1.AWSResourceTag{
{Key: "Key1", Value: "Value1"},
}
t.Logf("Updating AWS ResourceTags in the cluster infrastructure config: %v", updatedTags)
err = updateInfrastructureConfigStatusWithRetryOnConflict(t, 5*time.Minute, configClient, func(infra *configv1.Infrastructure) *configv1.Infrastructure {
infra.Status.PlatformStatus.AWS.ResourceTags = updatedTags
return infra
})
if err != nil {
t.Errorf("failed to update infrastructure status: %v", err)
}

// Check awsLBAdditionalResourceTags annotation with updated tags.
expectedTags = "Key1=Value1"
t.Logf("Validating the %s annotation for the load balancer service of the default ingresscontroller", awsLBAdditionalResourceTags)
assertLoadBalancerServiceAnnotationWithPollImmediate(t, kclient, defaultIC, awsLBAdditionalResourceTags, expectedTags)
}

func TestAWSLBTypeChange(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -4323,6 +4408,29 @@ func assertServiceAnnotation(t *testing.T, serviceName types.NamespacedName, ann
}
}

// assertLoadBalancerServiceAnnotationWithPollImmediate checks if the specified annotation on the
// LoadBalancer Service of the given IngressController matches the expected value.
func assertLoadBalancerServiceAnnotationWithPollImmediate(t *testing.T, kclient client.Client, ic *operatorv1.IngressController, annotationKey, expectedValue string) {
err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) {
service := &corev1.Service{}
if err := kclient.Get(ctx, controller.LoadBalancerServiceName(ic), service); err != nil {
t.Logf("failed to get service %s: %v, retrying...", controller.LoadBalancerServiceName(ic), err)
return false, nil
}
if actualValue, ok := service.Annotations[annotationKey]; !ok {
t.Logf("load balancer has no %q annotation yet: %v, retrying...", annotationKey, service.Annotations)
return false, nil
} else if actualValue != expectedValue {
t.Logf("expected %s, found %s", expectedValue, actualValue)
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("timed out waiting for the %s annotation to be updated: %v", annotationKey, err)
}
}

// assertServiceNotDeleted asserts that a provide service wasn't deleted.
func assertServiceNotDeleted(t *testing.T, serviceName types.NamespacedName, oldUid types.UID) {
t.Helper()
Expand Down
27 changes: 27 additions & 0 deletions test/e2e/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
configv1 "github.com/openshift/api/config/v1"
operatorv1 "github.com/openshift/api/operator/v1"
routev1 "github.com/openshift/api/route/v1"
configclientset "github.com/openshift/client-go/config/clientset/versioned"
"github.com/openshift/cluster-ingress-operator/pkg/operator/controller"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -595,6 +596,32 @@ func updateInfrastructureConfigSpecWithRetryOnConflict(t *testing.T, name types.
})
}

// updateInfrastructureStatus updates the Infrastructure status by applying
// the given update function to the current Infrastructure object.
// If there is a conflict error on update then the complete operation
// is retried until timeout is reached.
func updateInfrastructureConfigStatusWithRetryOnConflict(t *testing.T, timeout time.Duration, configClient *configclientset.Clientset, updateFunc func(*configv1.Infrastructure) *configv1.Infrastructure) error {
return wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
infra, err := configClient.ConfigV1().Infrastructures().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
t.Logf("error getting 'cluster' infrastructure config: %v, retrying...", err)
return false, nil
}

// Apply the update function to the Infrastructure object.
updatedInfra := updateFunc(infra.DeepCopy())

if _, err := configClient.ConfigV1().Infrastructures().UpdateStatus(ctx, updatedInfra, metav1.UpdateOptions{}); err != nil {
if errors.IsConflict(err) {
t.Logf("conflict when updating 'cluster' infrastructure config: %v, retrying...", err)
return false, nil
}
return false, err
}
return true, nil
})
}

// verifyExternalIngressController verifies connectivity between the router
// and a test workload by making a http call using the hostname passed to it.
// This hostname must be the domain associated with the ingresscontroller under test.
Expand Down

0 comments on commit 19eced8

Please sign in to comment.