Skip to content

Commit

Permalink
chore use lister instead of client full list (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey authored Aug 15, 2024
1 parent 7b4a686 commit ea9bfe5
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 60 deletions.
8 changes: 5 additions & 3 deletions pkg/plugins/kprobe/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func NewController() Controller {
}

func (c *Controller) Start(ch chan *metric.Metric) {
go c.watchKprobe(ch)
if err := c.watchKprobe(ch); err != nil {
log.Panic(err)
}
}

func (c *Controller) watchKprobe(ch chan *metric.Metric) error {
Expand All @@ -62,10 +64,10 @@ func (c *Controller) GetSysctlStat(pid uint32) (kprobesysctl.SysctlStat, error)
return c.sysctlController.GetSysctlStatByPID(pid)
}

func (c *Controller) GetPodByUID(podUID string) (corev1.Pod, error) {
func (c *Controller) GetPodByUID(podUID string) (*corev1.Pod, error) {
return c.sysctlController.GetPodByUID(podUID)
}

func (c *Controller) GetService(ip string) (corev1.Service, error) {
func (c *Controller) GetService(ip string) (*corev1.Service, error) {
return c.sysctlController.GetService(ip)
}
8 changes: 4 additions & 4 deletions pkg/plugins/kprobe/kprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

type Interface interface {
GetSysctlStat(pid uint32) (kprobesysctl.SysctlStat, error)
GetPodByUID(podUID string) (corev1.Pod, error)
GetService(ip string) (corev1.Service, error)
GetPodByUID(podUID string) (*corev1.Pod, error)
GetService(ip string) (*corev1.Service, error)
RegisterNetLinkListener() <-chan NeighLinkEvent
GetVethes() ([]NeighLink, error)
}
Expand Down Expand Up @@ -85,11 +85,11 @@ func (p *provider) GetSysctlStat(pid uint32) (kprobesysctl.SysctlStat, error) {
return p.kprobeController.GetSysctlStat(pid)
}

func (p *provider) GetPodByUID(podUID string) (corev1.Pod, error) {
func (p *provider) GetPodByUID(podUID string) (*corev1.Pod, error) {
return p.kprobeController.GetPodByUID(podUID)
}

func (p *provider) GetService(ip string) (corev1.Service, error) {
func (p *provider) GetService(ip string) (*corev1.Service, error) {
return p.kprobeController.GetService(ip)
}

Expand Down
28 changes: 11 additions & 17 deletions pkg/plugins/kprobe/kprobesysctl/criruntime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kprobesysctl

import (
"bufio"
"context"
"fmt"
"os"
"path"
Expand All @@ -13,7 +12,7 @@ import (

"github.com/prometheus/procfs"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog"
)

Expand Down Expand Up @@ -156,51 +155,46 @@ func (k *KprobeSysctlController) refreshProcCgroupInfo() error {
}

func (k *KprobeSysctlController) refreshPodInfo() error {
pods, err := k.clientSet.CoreV1().Pods(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
pods, err := k.podLister.List(labels.Everything())
if err != nil {
return err
}

for i := range pods.Items {
for i := range pods {
// ignore evicted pods
if pods.Items[i].Status.Reason == "Evicted" {
if pods[i].Status.Reason == "Evicted" {
continue
}
k.podCache.Set(string(pods.Items[i].UID), pods.Items[i], 30*time.Minute)
k.podCache.Set(pods.Items[i].Status.PodIP, pods.Items[i], 30*time.Minute)
k.podCache.Set(string(pods[i].UID), pods[i], 30*time.Minute)
k.podCache.Set(pods[i].Status.PodIP, pods[i], 30*time.Minute)
}
return nil
}

func (k *KprobeSysctlController) refreshServiceInfo(s *corev1.Service) error {
refreshFunc := func(svc corev1.Service) {
refreshFunc := func(svc *corev1.Service) {
// ignore not ClusterIP and headless services
if (svc.Spec.Type != corev1.ServiceTypeClusterIP) || (svc.Spec.ClusterIP == corev1.ClusterIPNone) {
return
}

if svc.Spec.ClusterIP == "10.17.48.182" {
fmt.Println(fmt.Sprintf("0000000000xxxxxxxxxxxxx, ip: %s, svc: %s/%s", svc.Spec.ClusterIP,
svc.Namespace, svc.Name))
}
k.serviceCache.Set(svc.Spec.ClusterIP, svc, time.Hour)
}

// load all namespace.
if s == nil {
services, err := k.clientSet.CoreV1().Services(metav1.NamespaceAll).
List(context.Background(), metav1.ListOptions{})
services, err := k.serviceLister.List(labels.Everything())
if err != nil {
return err
}

for _, i := range services.Items {
refreshFunc(i)
for i := range services {
refreshFunc(services[i])
}
return nil
}

// load specific service.
refreshFunc(*s)
refreshFunc(s)
return nil
}
69 changes: 38 additions & 31 deletions pkg/plugins/kprobe/kprobesysctl/ebpf_kprobe_sysctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ import (
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/rlimit"
"github.com/erda-project/ebpf-agent/metric"
"github.com/erda-project/ebpf-agent/pkg/btfs"
"github.com/erda-project/ebpf-agent/pkg/envconf"
"github.com/erda-project/ebpf-agent/pkg/exporter/collector"
"github.com/patrickmn/go-cache"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/listers/core/v1"
clientgoCache "k8s.io/client-go/tools/cache"
"k8s.io/klog"

"github.com/erda-project/ebpf-agent/metric"
"github.com/erda-project/ebpf-agent/pkg/btfs"
"github.com/erda-project/ebpf-agent/pkg/envconf"
"github.com/erda-project/ebpf-agent/pkg/exporter/collector"
)

var (
Expand All @@ -32,13 +34,15 @@ var (
)

type KprobeSysctlController struct {
hostIP string
sysCtlCache *cache.Cache
podCache *cache.Cache
serviceCache *cache.Cache
clientSet *kubernetes.Clientset
reportClient *collector.ReportClient
objs bpfObjects
hostIP string
sysCtlCache *cache.Cache
podCache *cache.Cache
serviceCache *cache.Cache
clientSet *kubernetes.Clientset
podLister v1.PodLister
serviceLister v1.ServiceLister
reportClient *collector.ReportClient
objs bpfObjects
}

func New(clientSet *kubernetes.Clientset) *KprobeSysctlController {
Expand Down Expand Up @@ -73,43 +77,37 @@ func (k *KprobeSysctlController) GetSysctlStatByPID(pid uint32) (SysctlStat, err
return SysctlStat{}, fmt.Errorf("failed to find sysctl stat for pid: %d", pid)
}

func (k *KprobeSysctlController) GetPodByUID(uid string) (corev1.Pod, error) {
func (k *KprobeSysctlController) GetPodByUID(uid string) (*corev1.Pod, error) {
if pod, ok := k.podCache.Get(uid); ok {
return pod.(corev1.Pod), nil
return pod.(*corev1.Pod), nil
}
return corev1.Pod{}, fmt.Errorf("failed to find pod for uid: %s", uid)
return &corev1.Pod{}, fmt.Errorf("failed to find pod for uid: %s", uid)
}

func (k *KprobeSysctlController) GetService(ip string) (corev1.Service, error) {
func (k *KprobeSysctlController) GetService(ip string) (*corev1.Service, error) {
if svc, ok := k.serviceCache.Get(ip); ok {
return svc.(corev1.Service), nil
return svc.(*corev1.Service), nil
}
return corev1.Service{}, fmt.Errorf("failed to get service from cache, ip: %s", ip)
return &corev1.Service{}, fmt.Errorf("failed to get service from cache, ip: %s", ip)
}

func (k *KprobeSysctlController) Start(ch chan<- *SysctlStat) error {
if err := k.refreshProcCgroupInfo(); err != nil {
return err
}
if err := k.refreshPodInfo(); err != nil {
return err
}
if err := k.refreshServiceInfo(nil); err != nil {
return err
}

factory := informers.NewSharedInformerFactory(k.clientSet, 0)
// pod informer
podInformerStopper := make(chan struct{})
stopper := make(chan struct{})
podInformer := factory.Core().V1().Pods().Informer()
podInformer.AddEventHandler(clientgoCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
newPod := obj.(*corev1.Pod)
if newPod.Status.Reason == "Evicted" {
return
}
k.podCache.Set(string(newPod.UID), *newPod, 30*time.Minute)
k.podCache.Set(newPod.Status.PodIP, *newPod, 30*time.Minute)
k.podCache.Set(string(newPod.UID), newPod, 30*time.Minute)
k.podCache.Set(newPod.Status.PodIP, newPod, 30*time.Minute)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
Expand All @@ -119,10 +117,8 @@ func (k *KprobeSysctlController) Start(ch chan<- *SysctlStat) error {
// UpdateFunc: func(oldObj interface{}, newObj interface{}) {
// },
})
go podInformer.Run(podInformerStopper)

// service informer
serviceInformerStopper := make(chan struct{})
serviceInformer := factory.Core().V1().Services().Informer()
serviceInformer.AddEventHandler(clientgoCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -149,7 +145,18 @@ func (k *KprobeSysctlController) Start(ch chan<- *SysctlStat) error {
},
})

go serviceInformer.Run(serviceInformerStopper)
factory.Start(stopper)
factory.WaitForCacheSync(stopper)

k.podLister = factory.Core().V1().Pods().Lister()
k.serviceLister = factory.Core().V1().Services().Lister()

if err := k.refreshPodInfo(); err != nil {
return err
}
if err := k.refreshServiceInfo(nil); err != nil {
return err
}

// todo: add recover and context control
go func() {
Expand Down Expand Up @@ -270,7 +277,7 @@ func EncodeStat(stat *SysctlStat) []byte {
return w.Bytes()
}

func makeServiceNodeMetric(pod corev1.Pod) *metric.Metric {
func makeServiceNodeMetric(pod *corev1.Pod) *metric.Metric {
now := time.Now().Unix()
return &metric.Metric{
Measurement: "application_service_node",
Expand Down Expand Up @@ -314,7 +321,7 @@ func (k *KprobeSysctlController) updateServiceNode() error {
podItems := k.podCache.Items()
serviceNodes := make([]*metric.Metric, 0)
for _, item := range podItems {
pod, ok := item.Object.(corev1.Pod)
pod, ok := item.Object.(*corev1.Pod)
if !ok {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/memory/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *Controller) watchForOoms(ch chan *metric.Metric) error {
return nil
}

func (c *Controller) convertOomEvent2Metric(event *oomprocesser2.OOMEvent, pod v1.Pod, stat kprobesysctl.SysctlStat) metric.Metric {
func (c *Controller) convertOomEvent2Metric(event *oomprocesser2.OOMEvent, pod *v1.Pod, stat kprobesysctl.SysctlStat) metric.Metric {
var metric metric.Metric
metric.Measurement = "docker_container_summary"
metric.Name = "docker_container_summary"
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/netfilter/netfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (p *provider) Gather(c chan *metric.Metric) {
continue
}
srcIP, dstIP := net.IP(event.OriSrc[:4]), net.IP(event.OriDst[:4])
replySrcIP, replyDstIP := net.IP(event.Dst[:4]), net.IP(event.Src[:4])
klog.Infof("srcIP: %s, srcPort: %d, dstIP: %s, dstPort: %d, reply srcIP :%s, reply dstIP: %s", srcIP, event.OriSport, dstIP, event.OriDport, replySrcIP, replyDstIP)
_, replyDstIP := net.IP(event.Dst[:4]), net.IP(event.Src[:4])
//klog.Infof("srcIP: %s, srcPort: %d, dstIP: %s, dstPort: %d, reply srcIP :%s, reply dstIP: %s", srcIP, event.OriSport, dstIP, event.OriDport, replySrcIP, replyDstIP)
natInfo := NatInfo{
OriDstIP: dstIP.String(),
OriDstPort: event.OriDport,
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/protocols/http/meta/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (p *provider) Convert(m *ebpf.Metric) *metric.Metric {

// in cluster
switch t := target.(type) {
case corev1.Pod:
case *corev1.Pod:
p.l.Infof("source(pod): %s/%d, target(pod): %s/%s", m.SourceIP, m.SourcePort, t.Namespace, t.Name)
output.Tags["cluster_name"] = t.Labels["DICE_CLUSTER_NAME"]
output.Tags["db_host"] = fmt.Sprintf("%s:%d", m.DestIP, m.DestPort)
Expand All @@ -151,7 +151,7 @@ func (p *provider) Convert(m *ebpf.Metric) *metric.Metric {
output.Tags["target_service_name"] = t.Annotations["msp.erda.cloud/service_name"]
output.Tags["target_terminus_key"] = t.Annotations["msp.erda.cloud/terminus_key"]
output.Tags["target_workspace"] = t.Annotations["msp.erda.cloud/workspace"]
case corev1.Service:
case *corev1.Service:
// TODO: service resource
p.l.Infof("source(pod): %s/%d, target(service): %s/%s", m.SourceIP, m.SourcePort, t.Namespace, t.Name)
default:
Expand Down

0 comments on commit ea9bfe5

Please sign in to comment.