From f8d542015abc8f1c4b3e53f61440304b5f70fe9b Mon Sep 17 00:00:00 2001 From: sevennt Date: Thu, 27 Dec 2018 18:58:51 +0800 Subject: [PATCH] fix data racing in get services and pods data && add cache of pods data --- src/api/handler/istio.go | 2 +- src/api/handler/metrics.go | 2 +- src/api/service/kube.go | 65 ++++++++++++++++++++++++++++---------- 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/src/api/handler/istio.go b/src/api/handler/istio.go index 6a316db44..65a72c0ee 100644 --- a/src/api/handler/istio.go +++ b/src/api/handler/istio.go @@ -31,7 +31,7 @@ func ListStatus(c *gin.Context) { log.Info("[API] /api/diagnose Services end", "ts", time.Now()) log.Info("[API] /api/diagnose Pods start", "ts", time.Now()) - pods := service.IstioInfo.Pods(map[string]string{}).Status() + pods := service.IstioInfo.Pods().Status() log.Info("[API] /api/diagnose Pods end", "ts", time.Now()) c.JSON(200, gin.H{ diff --git a/src/api/handler/metrics.go b/src/api/handler/metrics.go index 919e9ad80..c18e9c625 100644 --- a/src/api/handler/metrics.go +++ b/src/api/handler/metrics.go @@ -26,7 +26,7 @@ func ListMetrics(c *gin.Context) { "code": 0, "data": map[string]interface{}{ "serviceCount": len(service.ServiceInfo.Services("").Exclude("kube-system", bootstrap.Args.IstioNamespace, bootstrap.Args.Namespace)), - "podCount": len(service.ServiceInfo.Pods(map[string]string{}).Exclude("kube-system", bootstrap.Args.IstioNamespace, bootstrap.Args.Namespace)), + "podCount": len(service.ServiceInfo.Pods().Exclude("kube-system", bootstrap.Args.IstioNamespace, bootstrap.Args.Namespace)), }, }) } diff --git a/src/api/service/kube.go b/src/api/service/kube.go index 18a43c862..751d01729 100644 --- a/src/api/service/kube.go +++ b/src/api/service/kube.go @@ -46,6 +46,7 @@ type kubeInfo struct { mtx *sync.RWMutex wg *sync.WaitGroup services []service + pods []v1.Pod namespaces []v1.Namespace syncInterval time.Duration namespace string @@ -236,7 +237,7 @@ func (p services) Status() []KubeServiceStatus { return components } -func (k *kubeInfo) Pods(labels map[string]string) pods { +func (k *kubeInfo) podsFromK8S(labels map[string]string) pods { pods := make([]v1.Pod, 0) ls := "" if len(labels) != 0 { @@ -257,22 +258,28 @@ func (k *kubeInfo) Pods(labels map[string]string) pods { return p.Items } -func (k *kubeInfo) PodsByName(name string) pods { - retPods := make([]v1.Pod, 0) +func (k *kubeInfo) Pods() pods { + k.mtx.RLock() + defer k.mtx.RUnlock() - l := metav1.ListOptions{} - if name != "" { - l.FieldSelector = "metadata.name=" + name - } + return k.pods +} - p, err := client.CoreV1().Pods(k.namespace).List(l) +func (k *kubeInfo) PodsByName(name string) pods { + k.mtx.RLock() + defer k.mtx.RUnlock() - if err != nil { - log.Error("[k8s] get retPods fail", "err", err) - return retPods + if name == "" { + return k.pods } - return p.Items + retPods := make([]v1.Pod, 0) + for _, p := range k.pods { + if p.Name == name { + retPods = append(retPods, p) + } + } + return retPods } type pods []v1.Pod @@ -342,21 +349,47 @@ func (k *kubeInfo) sync() { // panic(err.Error()) log.Error("[k8s] get namespaces err", "err", err) } - services := make([]service, 0, len(svcs.Items)) + + // get services' and pods' data from Kubernetes + var serviceCh = make(chan service, 200) + var podCh = make(chan v1.Pod, 100) + k.wg.Add(len(svcs.Items)) for _, i := range svcs.Items { go func(i v1.Service) { s := service{} s.Service = i - s.Pods = k.Pods(i.Spec.Selector) - services = append(services, s) + s.Pods = k.podsFromK8S(i.Spec.Selector) + for _, p := range s.Pods { + podCh <- p + } + serviceCh <- s k.wg.Done() }(i) } - k.wg.Wait() + go func() { + k.wg.Wait() + close(serviceCh) + close(podCh) + }() + + services := make([]service, 0, len(svcs.Items)) + for s := range serviceCh { + services = append(services, s) + } + + tmpPods := make(map[string]v1.Pod) + pods := make(pods, 0) + for p := range podCh { + if _, ok := tmpPods[string(p.UID)]; !ok { + pods = append(pods, p) + } + } + k.mtx.Lock() k.services = services k.namespaces = ns.Items + k.pods = pods k.mtx.Unlock() log.Debug("[Kube] sync end", "svcs", len(k.services), "namespace", k.namespace, "time", time.Now())