From f7db3eb49269656c2afda2119fa3b71ea22ce231 Mon Sep 17 00:00:00 2001 From: Helene Durand Date: Thu, 5 Oct 2023 08:49:26 +0200 Subject: [PATCH] BUG/MINOR: fix k8s resync for all objects except endpoints, endpointslices. Endpoint and endpointslices fix is done in a previous commit. With informers, resync are sent as Update, not Create. We were doing nothing on an Update on a resync. If an event was skipped for any reason, we were never resyncing. --- pkg/annotations/service/maxconn.go | 4 +- pkg/k8s/informers.go | 68 ++++++++---------------------- pkg/store/events.go | 21 ++++++--- pkg/store/store.go | 3 +- pkg/store/types.go | 3 +- 5 files changed, 39 insertions(+), 60 deletions(-) diff --git a/pkg/annotations/service/maxconn.go b/pkg/annotations/service/maxconn.go index f84991f4..917dbdea 100644 --- a/pkg/annotations/service/maxconn.go +++ b/pkg/annotations/service/maxconn.go @@ -35,8 +35,8 @@ func (a *Maxconn) Process(k store.K8s, annotations ...map[string]string) error { return err } // adjust backend maxconn when using multiple HAProxy Instances - if k.NbrHAProxyInst != 0 { - v /= k.NbrHAProxyInst + if len(k.HaProxyPods) != 0 { + v /= int64(len(k.HaProxyPods)) } if a.backend.DefaultServer == nil { a.backend.DefaultServer = &models.DefaultServer{} diff --git a/pkg/k8s/informers.go b/pkg/k8s/informers.go index 61dfdd6a..b87bc570 100644 --- a/pkg/k8s/informers.go +++ b/pkg/k8s/informers.go @@ -78,7 +78,7 @@ func (k k8s) getNamespaceInfomer(eventChan chan SyncDataEvent, factory informers eventChan <- SyncDataEvent{SyncType: NAMESPACE, Namespace: item.Name, Data: item} }, UpdateFunc: func(oldObj, newObj interface{}) { - data1, ok := oldObj.(*corev1.Namespace) + _, ok := oldObj.(*corev1.Namespace) if !ok { logger.Errorf("%s: Invalid data from k8s api, %s", NAMESPACE, oldObj) return @@ -89,17 +89,11 @@ func (k k8s) getNamespaceInfomer(eventChan chan SyncDataEvent, factory informers return } status := store.MODIFIED - item1 := &store.Namespace{ - Name: data1.GetName(), - Status: status, - } + item2 := &store.Namespace{ Name: data2.GetName(), Status: status, } - if item1.Name == item2.Name { - return - } logger.Tracef("[RUNTIME] [K8s] %s %s: %s", NAMESPACE, item2.Status, item2.Name) eventChan <- SyncDataEvent{SyncType: NAMESPACE, Namespace: item2.Name, Data: item2} }, @@ -201,23 +195,6 @@ func (k k8s) getServiceInformer(eventChan chan SyncDataEvent, factory informers. } status := store.MODIFIED - item1 := &store.Service{ - Namespace: data1.GetNamespace(), - Name: data1.GetName(), - Annotations: store.CopyAnnotations(data1.ObjectMeta.Annotations), - Ports: []store.ServicePort{}, - Status: status, - } - if data1.Spec.Type == corev1.ServiceTypeExternalName { - item1.DNS = data1.Spec.ExternalName - } - for _, sp := range data1.Spec.Ports { - item1.Ports = append(item1.Ports, store.ServicePort{ - Name: sp.Name, - Protocol: string(sp.Protocol), - Port: int64(sp.Port), - }) - } item2 := &store.Service{ Namespace: data2.GetNamespace(), @@ -236,9 +213,7 @@ func (k k8s) getServiceInformer(eventChan chan SyncDataEvent, factory informers. Port: int64(sp.Port), }) } - if item2.Equal(item1) { - return - } + logger.Tracef("[RUNTIME] [K8s] %s %s: %s", SERVICE, item2.Status, item2.Name) eventChan <- SyncDataEvent{SyncType: SERVICE, Namespace: item2.Namespace, Data: item2} @@ -293,7 +268,7 @@ func (k k8s) getSecretInformer(eventChan chan SyncDataEvent, factory informers.S eventChan <- SyncDataEvent{SyncType: SECRET, Namespace: item.Namespace, Data: item} }, UpdateFunc: func(oldObj, newObj interface{}) { - data1, ok := oldObj.(*corev1.Secret) + _, ok := oldObj.(*corev1.Secret) if !ok { logger.Errorf("%s: Invalid data from k8s api, %s", SECRET, oldObj) return @@ -304,21 +279,14 @@ func (k k8s) getSecretInformer(eventChan chan SyncDataEvent, factory informers.S return } status := store.MODIFIED - item1 := &store.Secret{ - Namespace: data1.GetNamespace(), - Name: data1.GetName(), - Data: data1.Data, - Status: status, - } + item2 := &store.Secret{ Namespace: data2.GetNamespace(), Name: data2.GetName(), Data: data2.Data, Status: status, } - if item2.Equal(item1) { - return - } + logger.Tracef("[RUNTIME] [K8s] %s %s: %s", SECRET, item2.Status, item2.Name) eventChan <- SyncDataEvent{SyncType: SECRET, Namespace: item2.Namespace, Data: item2} }, @@ -368,7 +336,7 @@ func (k k8s) getConfigMapInformer(eventChan chan SyncDataEvent, factory informer eventChan <- SyncDataEvent{SyncType: CONFIGMAP, Namespace: item.Namespace, Data: item} }, UpdateFunc: func(oldObj, newObj interface{}) { - data1, ok := oldObj.(*corev1.ConfigMap) + _, ok := oldObj.(*corev1.ConfigMap) if !ok { logger.Errorf("%s: Invalid data from k8s api, %s", CONFIGMAP, oldObj) return @@ -379,21 +347,13 @@ func (k k8s) getConfigMapInformer(eventChan chan SyncDataEvent, factory informer return } status := store.MODIFIED - item1 := &store.ConfigMap{ - Namespace: data1.GetNamespace(), - Name: data1.GetName(), - Annotations: store.CopyAnnotations(data1.Data), - Status: status, - } item2 := &store.ConfigMap{ Namespace: data2.GetNamespace(), Name: data2.GetName(), Annotations: store.CopyAnnotations(data2.Data), Status: status, } - if item2.Equal(item1) { - return - } + logger.Tracef("[RUNTIME] [K8s] %s %s: %s", CONFIGMAP, item2.Status, item2.Name) eventChan <- SyncDataEvent{SyncType: CONFIGMAP, Namespace: item2.Namespace, Data: item2} }, @@ -513,7 +473,7 @@ func (k *k8s) getPodInformer(namespace, podPrefix string, resyncPeriod time.Dura if prefix != podPrefix { return } - eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Created: true}} + eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Status: store.ADDED, Name: meta.Name}} }, DeleteFunc: func(obj interface{}) { meta := obj.(*corev1.Pod).ObjectMeta //nolint:forcetypeassert @@ -521,7 +481,15 @@ func (k *k8s) getPodInformer(namespace, podPrefix string, resyncPeriod time.Dura if prefix != podPrefix { return } - eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{}} + eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Status: store.DELETED, Name: meta.Name}} + }, + UpdateFunc: func(oldObj, newObj interface{}) { + meta := newObj.(*corev1.Pod).ObjectMeta //nolint:forcetypeassert + prefix, _ = utils.GetPodPrefix(meta.Name) + if prefix != podPrefix { + return + } + eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Status: store.MODIFIED, Name: meta.Name}} }, }, ) diff --git a/pkg/store/events.go b/pkg/store/events.go index 32bf32a0..4e3ba442 100644 --- a/pkg/store/events.go +++ b/pkg/store/events.go @@ -22,9 +22,8 @@ import ( ) func (k *K8s) EventNamespace(ns *Namespace, data *Namespace) (updateRequired bool) { - updateRequired = false switch data.Status { - case ADDED: + case ADDED, MODIFIED: _ = k.GetNamespace(data.Name) case DELETED: _, ok := k.Namespaces[data.Name] @@ -193,6 +192,9 @@ func (k *K8s) EventConfigMap(ns *Namespace, data *ConfigMap) (updateRequired boo updateRequired = true logger.Debugf("configmap '%s/%s' processed", cm.Namespace, cm.Name) case MODIFIED: + if cm.Equal(data) { + return false + } *cm = *data updateRequired = true logger.Infof("configmap '%s/%s' updated", cm.Namespace, cm.Name) @@ -247,10 +249,17 @@ func (k *K8s) EventSecret(ns *Namespace, data *Secret) (updateRequired bool) { } func (k *K8s) EventPod(podEvent PodEvent) (updateRequired bool) { - if podEvent.Created { - k.NbrHAProxyInst++ - } else { - k.NbrHAProxyInst-- + switch podEvent.Status { + case ADDED, MODIFIED: + if _, ok := k.HaProxyPods[podEvent.Name]; ok { + return false + } + k.HaProxyPods[podEvent.Name] = struct{}{} + case DELETED: + if _, ok := k.HaProxyPods[podEvent.Name]; !ok { + return false + } + delete(k.HaProxyPods, podEvent.Name) } return true diff --git a/pkg/store/store.go b/pkg/store/store.go index 07968a53..cae9471c 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -24,7 +24,7 @@ import ( const DefaultLocalBackend = "default-local-service" type K8s struct { - NbrHAProxyInst int64 + HaProxyPods map[string]struct{} Namespaces map[string]*Namespace IngressClasses map[string]*IngressClass NamespacesAccess NamespacesWatch @@ -72,6 +72,7 @@ func NewK8sStore(args utils.OSArgs) K8s { }, SecretsProcessed: map[string]struct{}{}, BackendProcessed: map[string]struct{}{}, + HaProxyPods: map[string]struct{}{}, } for _, namespace := range args.NamespaceWhitelist { store.NamespacesAccess.Whitelist[namespace] = struct{}{} diff --git a/pkg/store/types.go b/pkg/store/types.go index 3c99f2cd..a6444b10 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -57,7 +57,8 @@ type Endpoints struct { // PodEvent carries creation/deletion pod event. type PodEvent struct { - Created bool + Status Status + Name string } // Service is useful data from k8s structures about service