From e69e7c41ee15d8264807f91e52c086bc63b34910 Mon Sep 17 00:00:00 2001 From: David Festal Date: Wed, 29 Mar 2023 20:34:31 +0200 Subject: [PATCH 1/3] Replicating SyncTarget-related rbac objects Signed-off-by: David Festal --- .../replicateclusterrole_controller.go | 65 +++++++++++++++++++ .../replicateclusterrolebinding_controller.go | 50 ++++++++++++++ tmc/pkg/server/controllers.go | 56 ++++++++++++++++ tmc/pkg/server/server.go | 8 +++ 4 files changed, 179 insertions(+) create mode 100644 pkg/reconciler/workload/replicateclusterrole/replicateclusterrole_controller.go create mode 100644 pkg/reconciler/workload/replicateclusterrolebinding/replicateclusterrolebinding_controller.go diff --git a/pkg/reconciler/workload/replicateclusterrole/replicateclusterrole_controller.go b/pkg/reconciler/workload/replicateclusterrole/replicateclusterrole_controller.go new file mode 100644 index 00000000000..0ca83caa013 --- /dev/null +++ b/pkg/reconciler/workload/replicateclusterrole/replicateclusterrole_controller.go @@ -0,0 +1,65 @@ +/* +Copyright 2023 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicateclusterrole + +import ( + kcprbacinformers "github.com/kcp-dev/client-go/informers/rbac/v1" + kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" + "github.com/kcp-dev/logicalcluster/v3" + + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/kube-openapi/pkg/util/sets" + + "github.com/kcp-dev/kcp/pkg/reconciler/cache/labelclusterroles" + "github.com/kcp-dev/kcp/sdk/apis/workload" +) + +const ( + ControllerName = "kcp-workloads-replicate-clusterrole" +) + +// NewController returns a new controller for labelling ClusterRole that should be replicated. +func NewController( + kubeClusterClient kcpkubernetesclientset.ClusterInterface, + clusterRoleInformer kcprbacinformers.ClusterRoleClusterInformer, + clusterRoleBindingInformer kcprbacinformers.ClusterRoleBindingClusterInformer, +) labelclusterroles.Controller { + return labelclusterroles.NewController( + ControllerName, + workload.GroupName, + HasSyncRule, + func(clusterName logicalcluster.Name, crb *rbacv1.ClusterRoleBinding) bool { return false }, + kubeClusterClient, + clusterRoleInformer, + clusterRoleBindingInformer, + ) +} + +func HasSyncRule(clusterName logicalcluster.Name, cr *rbacv1.ClusterRole) bool { + for _, rule := range cr.Rules { + apiGroups := sets.NewString(rule.APIGroups...) + if !apiGroups.Has(workload.GroupName) && !apiGroups.Has("*") { + continue + } + resources := sets.NewString(rule.Resources...) + verbs := sets.NewString(rule.Verbs...) + if (resources.Has("synctargets") || resources.Has("*")) && (verbs.Has("sync") || verbs.Has("*")) { + return true + } + } + return false +} diff --git a/pkg/reconciler/workload/replicateclusterrolebinding/replicateclusterrolebinding_controller.go b/pkg/reconciler/workload/replicateclusterrolebinding/replicateclusterrolebinding_controller.go new file mode 100644 index 00000000000..4ce3942f1a9 --- /dev/null +++ b/pkg/reconciler/workload/replicateclusterrolebinding/replicateclusterrolebinding_controller.go @@ -0,0 +1,50 @@ +/* +Copyright 2023 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicateclusterrolebinding + +import ( + kcprbacinformers "github.com/kcp-dev/client-go/informers/rbac/v1" + kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" + "github.com/kcp-dev/logicalcluster/v3" + + rbacv1 "k8s.io/api/rbac/v1" + + "github.com/kcp-dev/kcp/pkg/reconciler/cache/labelclusterrolebindings" + "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicateclusterrole" + "github.com/kcp-dev/kcp/sdk/apis/workload" +) + +const ( + ControllerName = "kcp-workloads-replicate-clusterrolebinding" +) + +// NewController returns a new controller for labelling ClusterRoleBinding that should be replicated. +func NewController( + kubeClusterClient kcpkubernetesclientset.ClusterInterface, + clusterRoleBindingInformer kcprbacinformers.ClusterRoleBindingClusterInformer, + clusterRoleInformer kcprbacinformers.ClusterRoleClusterInformer, +) labelclusterrolebindings.Controller { + return labelclusterrolebindings.NewController( + ControllerName, + workload.GroupName, + replicateclusterrole.HasSyncRule, + func(clusterName logicalcluster.Name, crb *rbacv1.ClusterRoleBinding) bool { return false }, + kubeClusterClient, + clusterRoleBindingInformer, + clusterRoleInformer, + ) +} diff --git a/tmc/pkg/server/controllers.go b/tmc/pkg/server/controllers.go index 1fa9965b197..3070e7ad230 100644 --- a/tmc/pkg/server/controllers.go +++ b/tmc/pkg/server/controllers.go @@ -37,6 +37,8 @@ import ( "github.com/kcp-dev/kcp/pkg/reconciler/workload/heartbeat" workloadnamespace "github.com/kcp-dev/kcp/pkg/reconciler/workload/namespace" workloadplacement "github.com/kcp-dev/kcp/pkg/reconciler/workload/placement" + workloadreplicateclusterrole "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicateclusterrole" + workloadreplicateclusterrolebinding "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicateclusterrolebinding" workloadresource "github.com/kcp-dev/kcp/pkg/reconciler/workload/resource" synctargetcontroller "github.com/kcp-dev/kcp/pkg/reconciler/workload/synctarget" "github.com/kcp-dev/kcp/pkg/reconciler/workload/synctargetexports" @@ -403,3 +405,57 @@ func (s *Server) installSyncTargetController(ctx context.Context, config *rest.C return nil }) } + +func (s *Server) installWorkloadReplicateClusterRoleControllers(ctx context.Context, config *rest.Config) error { + config = rest.CopyConfig(config) + config = rest.AddUserAgent(config, workloadreplicateclusterrole.ControllerName) + kubeClusterClient, err := kcpkubernetesclientset.NewForConfig(config) + if err != nil { + return err + } + + c := workloadreplicateclusterrole.NewController( + kubeClusterClient, + s.Core.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + s.Core.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + ) + + return s.Core.AddPostStartHook(postStartHookName(workloadreplicateclusterrole.ControllerName), func(hookContext genericapiserver.PostStartHookContext) error { + logger := klog.FromContext(ctx).WithValues("postStartHook", postStartHookName(workloadreplicateclusterrole.ControllerName)) + if err := s.Core.WaitForSync(hookContext.StopCh); err != nil { + logger.Error(err, "failed to finish post-start-hook") + return nil // don't klog.Fatal. This only happens when context is cancelled. + } + + go c.Start(goContext(hookContext), 2) + + return nil + }) +} + +func (s *Server) installWorkloadReplicateClusterRoleBindingControllers(ctx context.Context, config *rest.Config) error { + config = rest.CopyConfig(config) + config = rest.AddUserAgent(config, workloadreplicateclusterrolebinding.ControllerName) + kubeClusterClient, err := kcpkubernetesclientset.NewForConfig(config) + if err != nil { + return err + } + + c := workloadreplicateclusterrolebinding.NewController( + kubeClusterClient, + s.Core.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + s.Core.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), + ) + + return s.Core.AddPostStartHook(postStartHookName(workloadreplicateclusterrolebinding.ControllerName), func(hookContext genericapiserver.PostStartHookContext) error { + logger := klog.FromContext(ctx).WithValues("postStartHook", postStartHookName(workloadreplicateclusterrolebinding.ControllerName)) + if err := s.Core.WaitForSync(hookContext.StopCh); err != nil { + logger.Error(err, "failed to finish post-start-hook") + return nil // don't klog.Fatal. This only happens when context is cancelled. + } + + go c.Start(goContext(hookContext), 2) + + return nil + }) +} diff --git a/tmc/pkg/server/server.go b/tmc/pkg/server/server.go index 6e8ed16c682..3a21c4fbc84 100644 --- a/tmc/pkg/server/server.go +++ b/tmc/pkg/server/server.go @@ -102,6 +102,14 @@ func (s *Server) Run(ctx context.Context) error { if err := s.installWorkloadsSyncTargetExportController(ctx, controllerConfig); err != nil { return err } + + if err := s.installWorkloadReplicateClusterRoleControllers(ctx, controllerConfig); err != nil { + return err + } + + if err := s.installWorkloadReplicateClusterRoleBindingControllers(ctx, controllerConfig); err != nil { + return err + } } if s.Options.Core.Controllers.EnableAll || enabled.Has("resource-scheduler") { From 5e15fe7633c4798747ffaaf2c930d56110a6c1b5 Mon Sep 17 00:00:00 2001 From: David Festal Date: Wed, 29 Mar 2023 20:34:59 +0200 Subject: [PATCH 2/3] Replication: Add e2e tests for workloads-related rbac objects Signed-off-by: David Festal --- test/e2e/reconciler/cache/replication_test.go | 194 +++++++++++++++++- 1 file changed, 193 insertions(+), 1 deletion(-) diff --git a/test/e2e/reconciler/cache/replication_test.go b/test/e2e/reconciler/cache/replication_test.go index 98722f284d1..046ef871259 100644 --- a/test/e2e/reconciler/cache/replication_test.go +++ b/test/e2e/reconciler/cache/replication_test.go @@ -30,6 +30,7 @@ import ( "github.com/kcp-dev/logicalcluster/v3" "github.com/stretchr/testify/require" + rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -64,6 +65,10 @@ var scenarios = []testScenario{ {"TestReplicateAPIResourceSchemaNegative", replicateAPIResourceSchemaNegativeScenario}, {"TestReplicateWorkspaceType", replicateWorkspaceTypeScenario}, {"TestReplicateWorkspaceTypeNegative", replicateWorkspaceTypeNegativeScenario}, + {"TestReplicateWorkloadsClusterRole", replicateWorkloadsClusterRoleScenario}, + {"TestReplicateWorkloadsClusterRoleNegative", replicateWorkloadsClusterRoleNegativeScenario}, + {"TestReplicateWorkloadsClusterRoleBinding", replicateWorkloadsClusterRoleBindingScenario}, + {"TestReplicateWorkloadsClusterRoleBindingNegative", replicateWorkloadsClusterRoleBindingNegativeScenario}, } // disruptiveScenarios contains a list of scenarios that will be run in a private environment @@ -678,6 +683,15 @@ func (b *replicateResourceScenario) verifyResourceReplicationHelper(ctx context. } unstructured.RemoveNestedField(originalResource.Object, "metadata", "resourceVersion") unstructured.RemoveNestedField(cachedResource.Object, "metadata", "resourceVersion") + + // TODO(davidfestal): find out why the generation is not equal, specially for rbacv1. + // Is it a characteristic of all built-in KCP resources (which are not backed by CRDs) ? + // Issue opened: https://github.com/kcp-dev/kcp/issues/2935 + if b.gvr.Group == rbacv1.SchemeGroupVersion.Group { + unstructured.RemoveNestedField(originalResource.Object, "metadata", "generation") + unstructured.RemoveNestedField(cachedResource.Object, "metadata", "generation") + } + unstructured.RemoveNestedField(cachedResource.Object, "metadata", "annotations", genericapirequest.AnnotationKey) if cachedStatus, ok := cachedResource.Object["status"]; ok && cachedStatus == nil || (cachedStatus != nil && len(cachedStatus.(map[string]interface{})) == 0) { // TODO: worth investigating: @@ -685,7 +699,7 @@ func (b *replicateResourceScenario) verifyResourceReplicationHelper(ctx context. unstructured.RemoveNestedField(cachedResource.Object, "status") } if diff := cmp.Diff(cachedResource.Object, originalResource.Object); len(diff) > 0 { - return false, fmt.Sprintf("replicated %s root|%s/%s is different from the original", b.gvr, cluster, cachedResourceMeta.GetName()) + return false, fmt.Sprintf("replicated %s root|%s/%s is different from the original: %s", b.gvr, cluster, cachedResourceMeta.GetName(), diff) } return true, "" }, wait.ForeverTestTimeout, 100*time.Millisecond) @@ -732,3 +746,181 @@ func createCacheClientConfigForEnvironment(ctx context.Context, t *testing.T, kc require.NoError(t, err) return cacheServerRestConfig } + +// replicateWorkloadsClusterRoleScenario tests if a ClusterRole related to workloads API is propagated to the cache server. +// The test exercises creation, modification and removal of the ClusterRole object. +func replicateWorkloadsClusterRoleScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterDynamicClient kcpdynamic.ClusterInterface, cacheKcpClusterDynamicClient kcpdynamic.ClusterInterface) { + t.Helper() + replicateResource(ctx, + t, + server, + kcpShardClusterDynamicClient, + cacheKcpClusterDynamicClient, + "", + "ClusterRole", + rbacv1.SchemeGroupVersion.WithResource("clusterroles"), + &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: withPseudoRandomSuffix("syncer"), + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"sync"}, + APIGroups: []string{"workload.kcp.io"}, + Resources: []string{"synctargets"}, + ResourceNames: []string{"asynctarget"}, + }, + }, + }, + nil, + ) +} + +// replicateWorkloadsClusterRoleNegativeScenario checks if modified or even deleted cached ClusterRole (related to workloads API) will be reconciled to match the original object. +func replicateWorkloadsClusterRoleNegativeScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterDynamicClient kcpdynamic.ClusterInterface, cacheKcpClusterDynamicClient kcpdynamic.ClusterInterface) { + t.Helper() + replicateResourceNegative( + ctx, + t, + server, + kcpShardClusterDynamicClient, + cacheKcpClusterDynamicClient, + "", + "ClusterRole", + rbacv1.SchemeGroupVersion.WithResource("clusterroles"), + &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: withPseudoRandomSuffix("syncer"), + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"sync"}, + APIGroups: []string{"workload.kcp.io"}, + Resources: []string{"synctargets"}, + ResourceNames: []string{"asynctarget"}, + }, + }, + }, + nil, + ) +} + +// replicateWorkloadsClusterRoleBindingScenario tests if a ClusterRoleBinding related to workloads API is propagated to the cache server. +// The test exercises creation, modification and removal of the ClusterRoleBinding object. +func replicateWorkloadsClusterRoleBindingScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterDynamicClient kcpdynamic.ClusterInterface, cacheKcpClusterDynamicClient kcpdynamic.ClusterInterface) { + t.Helper() + + clusterRole := &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: withPseudoRandomSuffix("syncer"), + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"sync"}, + APIGroups: []string{"workload.kcp.io"}, + Resources: []string{"synctargets"}, + ResourceNames: []string{"asynctarget"}, + }, + }, + } + + orgPath, _ := framework.NewOrganizationFixture(t, server) + _, ws := framework.NewWorkspaceFixture(t, server, orgPath, framework.WithRootShard()) + clusterName := logicalcluster.Name(ws.Spec.Cluster) + + t.Logf("Create additional ClusterRole %s on the root shard for replication", clusterRole.Name) + clusterRoleGVR := rbacv1.SchemeGroupVersion.WithResource("clusterroles") + clusterRoleUnstr, err := toUnstructured(clusterRole, "ClusterRole", clusterRoleGVR) + require.NoError(t, err) + _, err = kcpShardClusterDynamicClient.Resource(clusterRoleGVR).Cluster(clusterName.Path()).Create(ctx, clusterRoleUnstr, metav1.CreateOptions{}) + require.NoError(t, err) + + replicateResource(ctx, + t, + server, + kcpShardClusterDynamicClient, + cacheKcpClusterDynamicClient, + clusterName, + "ClusterRoleBinding", + rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), + &rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: withPseudoRandomSuffix("syncer"), + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.SchemeGroupVersion.Group, + Kind: "ClusterRole", + Name: clusterRole.Name, + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + APIGroup: "", + Name: "kcp-syncer-0000", + Namespace: "kcp-syncer-namespace", + }, + }, + }, + nil, + ) +} + +// replicateWorkloadsClusterRoleNegativeScenario checks if modified or even deleted cached ClusterRole (related to workloads API) will be reconciled to match the original object. +func replicateWorkloadsClusterRoleBindingNegativeScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterDynamicClient kcpdynamic.ClusterInterface, cacheKcpClusterDynamicClient kcpdynamic.ClusterInterface) { + t.Helper() + + clusterRole := &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: withPseudoRandomSuffix("syncer"), + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"sync"}, + APIGroups: []string{"workload.kcp.io"}, + Resources: []string{"synctargets"}, + ResourceNames: []string{"asynctarget"}, + }, + }, + } + + orgPath, _ := framework.NewOrganizationFixture(t, server) + _, ws := framework.NewWorkspaceFixture(t, server, orgPath, framework.WithRootShard()) + clusterName := logicalcluster.Name(ws.Spec.Cluster) + + t.Logf("Create additional ClusterRole %s on the root shard for replication", clusterRole.Name) + clusterRoleGVR := rbacv1.SchemeGroupVersion.WithResource("clusterroles") + clusterRoleUnstr, err := toUnstructured(clusterRole, "ClusterRole", clusterRoleGVR) + require.NoError(t, err) + _, err = kcpShardClusterDynamicClient.Resource(clusterRoleGVR).Cluster(clusterName.Path()).Create(ctx, clusterRoleUnstr, metav1.CreateOptions{}) + require.NoError(t, err) + + replicateResourceNegative( + ctx, + t, + server, + kcpShardClusterDynamicClient, + cacheKcpClusterDynamicClient, + clusterName, + "ClusterRoleBinding", + rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), + &rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: withPseudoRandomSuffix("syncer"), + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.SchemeGroupVersion.Group, + Kind: "ClusterRole", + Name: clusterRole.Name, + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + APIGroup: "", + Name: "kcp-syncer-0000", + Namespace: "kcp-syncer-namespace", + }, + }, + }, + nil, + ) +} From 63e6eb5271c4a5a3f2592b3e71b7fb58a5a7c9e7 Mon Sep 17 00:00:00 2001 From: David Festal Date: Mon, 3 Apr 2023 18:22:16 +0200 Subject: [PATCH 3/3] Replicate logicalclusters containing synctargets Signed-off-by: David Festal --- .../replicatelogicalcluster_controller.go | 105 ++++++++++++++++++ tmc/pkg/server/controllers.go | 28 +++++ tmc/pkg/server/server.go | 4 + 3 files changed, 137 insertions(+) create mode 100644 pkg/reconciler/workload/replicatelogicalcluster/replicatelogicalcluster_controller.go diff --git a/pkg/reconciler/workload/replicatelogicalcluster/replicatelogicalcluster_controller.go b/pkg/reconciler/workload/replicatelogicalcluster/replicatelogicalcluster_controller.go new file mode 100644 index 00000000000..afce8e886b9 --- /dev/null +++ b/pkg/reconciler/workload/replicatelogicalcluster/replicatelogicalcluster_controller.go @@ -0,0 +1,105 @@ +/* +Copyright 2023 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicatelogicalcluster + +import ( + "fmt" + + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/logicalcluster/v3" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + + "github.com/kcp-dev/kcp/pkg/reconciler/cache/labellogicalcluster" + "github.com/kcp-dev/kcp/pkg/reconciler/cache/replication" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/workload" + workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" + corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" + workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1" +) + +const ( + ControllerName = "kcp-workload-replicate-logicalcluster" +) + +// NewController returns a new controller for labelling LogicalClusters that should be replicated. + +func NewController( + kcpClusterClient kcpclientset.ClusterInterface, + logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, + syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, +) labellogicalcluster.Controller { + logicalClusterLister := logicalClusterInformer.Lister() + syncTargetIndexer := syncTargetInformer.Informer().GetIndexer() + + c := labellogicalcluster.NewController( + ControllerName, + workload.GroupName, + func(cluster *corev1alpha1.LogicalCluster) bool { + // If there are any SyncTargets for this logical cluster, then the LogicalCluster object should be replicated. + keys, err := syncTargetIndexer.IndexKeys(kcpcache.ClusterIndexName, kcpcache.ClusterIndexKey(logicalcluster.From(cluster))) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to list SyncTargets: %v", err)) + return false + } + return len(keys) > 0 + }, + kcpClusterClient, + logicalClusterInformer, + ) + + // enqueue the logical cluster every time the APIExport changes + enqueueSyncTarget := func(obj interface{}) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + + syncTarget, ok := obj.(*workloadv1alpha1.SyncTarget) + if !ok { + runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + + cluster, err := logicalClusterLister.Cluster(logicalcluster.From(syncTarget)).Get(corev1alpha1.LogicalClusterName) + if err != nil && !apierrors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("failed to get logical cluster: %v", err)) + return + } else if apierrors.IsNotFound(err) { + return + } + + c.EnqueueLogicalCluster(cluster, "reason", "SyncTarget changed", "synctarget", syncTarget.Name) + } + + syncTargetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: replication.IsNoSystemClusterName, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + enqueueSyncTarget(obj) + }, + DeleteFunc: func(obj interface{}) { + enqueueSyncTarget(obj) + }, + }, + }) + + return c +} diff --git a/tmc/pkg/server/controllers.go b/tmc/pkg/server/controllers.go index 3070e7ad230..0bc94583cee 100644 --- a/tmc/pkg/server/controllers.go +++ b/tmc/pkg/server/controllers.go @@ -39,6 +39,7 @@ import ( workloadplacement "github.com/kcp-dev/kcp/pkg/reconciler/workload/placement" workloadreplicateclusterrole "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicateclusterrole" workloadreplicateclusterrolebinding "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicateclusterrolebinding" + workloadreplicatelogicalcluster "github.com/kcp-dev/kcp/pkg/reconciler/workload/replicatelogicalcluster" workloadresource "github.com/kcp-dev/kcp/pkg/reconciler/workload/resource" synctargetcontroller "github.com/kcp-dev/kcp/pkg/reconciler/workload/synctarget" "github.com/kcp-dev/kcp/pkg/reconciler/workload/synctargetexports" @@ -459,3 +460,30 @@ func (s *Server) installWorkloadReplicateClusterRoleBindingControllers(ctx conte return nil }) } + +func (s *Server) installWorkloadReplicateLogicalClusterControllers(ctx context.Context, config *rest.Config) error { + config = rest.CopyConfig(config) + config = rest.AddUserAgent(config, workloadreplicatelogicalcluster.ControllerName) + kcpClusterClient, err := kcpclientset.NewForConfig(config) + if err != nil { + return err + } + + c := workloadreplicatelogicalcluster.NewController( + kcpClusterClient, + s.Core.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), + ) + + return s.Core.AddPostStartHook(postStartHookName(workloadreplicatelogicalcluster.ControllerName), func(hookContext genericapiserver.PostStartHookContext) error { + logger := klog.FromContext(ctx).WithValues("postStartHook", postStartHookName(workloadreplicatelogicalcluster.ControllerName)) + if err := s.Core.WaitForSync(hookContext.StopCh); err != nil { + logger.Error(err, "failed to finish post-start-hook") + return nil // don't klog.Fatal. This only happens when context is cancelled. + } + + go c.Start(goContext(hookContext), 2) + + return nil + }) +} diff --git a/tmc/pkg/server/server.go b/tmc/pkg/server/server.go index 3a21c4fbc84..6b7b249eefd 100644 --- a/tmc/pkg/server/server.go +++ b/tmc/pkg/server/server.go @@ -110,6 +110,10 @@ func (s *Server) Run(ctx context.Context) error { if err := s.installWorkloadReplicateClusterRoleBindingControllers(ctx, controllerConfig); err != nil { return err } + + if err := s.installWorkloadReplicateLogicalClusterControllers(ctx, controllerConfig); err != nil { + return err + } } if s.Options.Core.Controllers.EnableAll || enabled.Has("resource-scheduler") {