Skip to content

Commit

Permalink
BUG/MINOR: fix k8s resync for all objects except endpoints, endpoints…
Browse files Browse the repository at this point in the history
…lices.

Endpoint and endpointslices fix is done in a previous commit.
With informers, resync are sent as Update, not Create. We were doing nothing on an Update on a resync.
If an event was skipped for any reason, we were never resyncing.
  • Loading branch information
hdurand0710 committed Nov 3, 2023
1 parent 5862249 commit f7db3eb
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 60 deletions.
4 changes: 2 additions & 2 deletions pkg/annotations/service/maxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (a *Maxconn) Process(k store.K8s, annotations ...map[string]string) error {
return err
}
// adjust backend maxconn when using multiple HAProxy Instances
if k.NbrHAProxyInst != 0 {
v /= k.NbrHAProxyInst
if len(k.HaProxyPods) != 0 {
v /= int64(len(k.HaProxyPods))
}
if a.backend.DefaultServer == nil {
a.backend.DefaultServer = &models.DefaultServer{}
Expand Down
68 changes: 18 additions & 50 deletions pkg/k8s/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (k k8s) getNamespaceInfomer(eventChan chan SyncDataEvent, factory informers
eventChan <- SyncDataEvent{SyncType: NAMESPACE, Namespace: item.Name, Data: item}
},
UpdateFunc: func(oldObj, newObj interface{}) {
data1, ok := oldObj.(*corev1.Namespace)
_, ok := oldObj.(*corev1.Namespace)
if !ok {
logger.Errorf("%s: Invalid data from k8s api, %s", NAMESPACE, oldObj)
return
Expand All @@ -89,17 +89,11 @@ func (k k8s) getNamespaceInfomer(eventChan chan SyncDataEvent, factory informers
return
}
status := store.MODIFIED
item1 := &store.Namespace{
Name: data1.GetName(),
Status: status,
}

item2 := &store.Namespace{
Name: data2.GetName(),
Status: status,
}
if item1.Name == item2.Name {
return
}
logger.Tracef("[RUNTIME] [K8s] %s %s: %s", NAMESPACE, item2.Status, item2.Name)
eventChan <- SyncDataEvent{SyncType: NAMESPACE, Namespace: item2.Name, Data: item2}
},
Expand Down Expand Up @@ -201,23 +195,6 @@ func (k k8s) getServiceInformer(eventChan chan SyncDataEvent, factory informers.
}

status := store.MODIFIED
item1 := &store.Service{
Namespace: data1.GetNamespace(),
Name: data1.GetName(),
Annotations: store.CopyAnnotations(data1.ObjectMeta.Annotations),
Ports: []store.ServicePort{},
Status: status,
}
if data1.Spec.Type == corev1.ServiceTypeExternalName {
item1.DNS = data1.Spec.ExternalName
}
for _, sp := range data1.Spec.Ports {
item1.Ports = append(item1.Ports, store.ServicePort{
Name: sp.Name,
Protocol: string(sp.Protocol),
Port: int64(sp.Port),
})
}

item2 := &store.Service{
Namespace: data2.GetNamespace(),
Expand All @@ -236,9 +213,7 @@ func (k k8s) getServiceInformer(eventChan chan SyncDataEvent, factory informers.
Port: int64(sp.Port),
})
}
if item2.Equal(item1) {
return
}

logger.Tracef("[RUNTIME] [K8s] %s %s: %s", SERVICE, item2.Status, item2.Name)
eventChan <- SyncDataEvent{SyncType: SERVICE, Namespace: item2.Namespace, Data: item2}

Expand Down Expand Up @@ -293,7 +268,7 @@ func (k k8s) getSecretInformer(eventChan chan SyncDataEvent, factory informers.S
eventChan <- SyncDataEvent{SyncType: SECRET, Namespace: item.Namespace, Data: item}
},
UpdateFunc: func(oldObj, newObj interface{}) {
data1, ok := oldObj.(*corev1.Secret)
_, ok := oldObj.(*corev1.Secret)
if !ok {
logger.Errorf("%s: Invalid data from k8s api, %s", SECRET, oldObj)
return
Expand All @@ -304,21 +279,14 @@ func (k k8s) getSecretInformer(eventChan chan SyncDataEvent, factory informers.S
return
}
status := store.MODIFIED
item1 := &store.Secret{
Namespace: data1.GetNamespace(),
Name: data1.GetName(),
Data: data1.Data,
Status: status,
}

item2 := &store.Secret{
Namespace: data2.GetNamespace(),
Name: data2.GetName(),
Data: data2.Data,
Status: status,
}
if item2.Equal(item1) {
return
}

logger.Tracef("[RUNTIME] [K8s] %s %s: %s", SECRET, item2.Status, item2.Name)
eventChan <- SyncDataEvent{SyncType: SECRET, Namespace: item2.Namespace, Data: item2}
},
Expand Down Expand Up @@ -368,7 +336,7 @@ func (k k8s) getConfigMapInformer(eventChan chan SyncDataEvent, factory informer
eventChan <- SyncDataEvent{SyncType: CONFIGMAP, Namespace: item.Namespace, Data: item}
},
UpdateFunc: func(oldObj, newObj interface{}) {
data1, ok := oldObj.(*corev1.ConfigMap)
_, ok := oldObj.(*corev1.ConfigMap)
if !ok {
logger.Errorf("%s: Invalid data from k8s api, %s", CONFIGMAP, oldObj)
return
Expand All @@ -379,21 +347,13 @@ func (k k8s) getConfigMapInformer(eventChan chan SyncDataEvent, factory informer
return
}
status := store.MODIFIED
item1 := &store.ConfigMap{
Namespace: data1.GetNamespace(),
Name: data1.GetName(),
Annotations: store.CopyAnnotations(data1.Data),
Status: status,
}
item2 := &store.ConfigMap{
Namespace: data2.GetNamespace(),
Name: data2.GetName(),
Annotations: store.CopyAnnotations(data2.Data),
Status: status,
}
if item2.Equal(item1) {
return
}

logger.Tracef("[RUNTIME] [K8s] %s %s: %s", CONFIGMAP, item2.Status, item2.Name)
eventChan <- SyncDataEvent{SyncType: CONFIGMAP, Namespace: item2.Namespace, Data: item2}
},
Expand Down Expand Up @@ -513,15 +473,23 @@ func (k *k8s) getPodInformer(namespace, podPrefix string, resyncPeriod time.Dura
if prefix != podPrefix {
return
}
eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Created: true}}
eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Status: store.ADDED, Name: meta.Name}}
},
DeleteFunc: func(obj interface{}) {
meta := obj.(*corev1.Pod).ObjectMeta //nolint:forcetypeassert
prefix, _ = utils.GetPodPrefix(meta.Name)
if prefix != podPrefix {
return
}
eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{}}
eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Status: store.DELETED, Name: meta.Name}}
},
UpdateFunc: func(oldObj, newObj interface{}) {
meta := newObj.(*corev1.Pod).ObjectMeta //nolint:forcetypeassert
prefix, _ = utils.GetPodPrefix(meta.Name)
if prefix != podPrefix {
return
}
eventChan <- SyncDataEvent{SyncType: POD, Namespace: meta.Namespace, Data: store.PodEvent{Status: store.MODIFIED, Name: meta.Name}}
},
},
)
Expand Down
21 changes: 15 additions & 6 deletions pkg/store/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
)

func (k *K8s) EventNamespace(ns *Namespace, data *Namespace) (updateRequired bool) {
updateRequired = false
switch data.Status {
case ADDED:
case ADDED, MODIFIED:
_ = k.GetNamespace(data.Name)
case DELETED:
_, ok := k.Namespaces[data.Name]
Expand Down Expand Up @@ -193,6 +192,9 @@ func (k *K8s) EventConfigMap(ns *Namespace, data *ConfigMap) (updateRequired boo
updateRequired = true
logger.Debugf("configmap '%s/%s' processed", cm.Namespace, cm.Name)
case MODIFIED:
if cm.Equal(data) {
return false
}
*cm = *data
updateRequired = true
logger.Infof("configmap '%s/%s' updated", cm.Namespace, cm.Name)
Expand Down Expand Up @@ -247,10 +249,17 @@ func (k *K8s) EventSecret(ns *Namespace, data *Secret) (updateRequired bool) {
}

func (k *K8s) EventPod(podEvent PodEvent) (updateRequired bool) {
if podEvent.Created {
k.NbrHAProxyInst++
} else {
k.NbrHAProxyInst--
switch podEvent.Status {
case ADDED, MODIFIED:
if _, ok := k.HaProxyPods[podEvent.Name]; ok {
return false
}
k.HaProxyPods[podEvent.Name] = struct{}{}
case DELETED:
if _, ok := k.HaProxyPods[podEvent.Name]; !ok {
return false
}
delete(k.HaProxyPods, podEvent.Name)
}

return true
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const DefaultLocalBackend = "default-local-service"

type K8s struct {
NbrHAProxyInst int64
HaProxyPods map[string]struct{}
Namespaces map[string]*Namespace
IngressClasses map[string]*IngressClass
NamespacesAccess NamespacesWatch
Expand Down Expand Up @@ -72,6 +72,7 @@ func NewK8sStore(args utils.OSArgs) K8s {
},
SecretsProcessed: map[string]struct{}{},
BackendProcessed: map[string]struct{}{},
HaProxyPods: map[string]struct{}{},
}
for _, namespace := range args.NamespaceWhitelist {
store.NamespacesAccess.Whitelist[namespace] = struct{}{}
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Endpoints struct {

// PodEvent carries creation/deletion pod event.
type PodEvent struct {
Created bool
Status Status
Name string
}

// Service is useful data from k8s structures about service
Expand Down

0 comments on commit f7db3eb

Please sign in to comment.