Skip to content

Commit

Permalink
fix data racing in get services and pods data && add cache of pods data
Browse files Browse the repository at this point in the history
  • Loading branch information
sevennt committed Dec 27, 2018
1 parent f9c0e6c commit f8d5420
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/api/handler/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ListStatus(c *gin.Context) {
log.Info("[API] /api/diagnose Services end", "ts", time.Now())

log.Info("[API] /api/diagnose Pods start", "ts", time.Now())
pods := service.IstioInfo.Pods(map[string]string{}).Status()
pods := service.IstioInfo.Pods().Status()
log.Info("[API] /api/diagnose Pods end", "ts", time.Now())

c.JSON(200, gin.H{
Expand Down
2 changes: 1 addition & 1 deletion src/api/handler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func ListMetrics(c *gin.Context) {
"code": 0,
"data": map[string]interface{}{
"serviceCount": len(service.ServiceInfo.Services("").Exclude("kube-system", bootstrap.Args.IstioNamespace, bootstrap.Args.Namespace)),
"podCount": len(service.ServiceInfo.Pods(map[string]string{}).Exclude("kube-system", bootstrap.Args.IstioNamespace, bootstrap.Args.Namespace)),
"podCount": len(service.ServiceInfo.Pods().Exclude("kube-system", bootstrap.Args.IstioNamespace, bootstrap.Args.Namespace)),
},
})
}
65 changes: 49 additions & 16 deletions src/api/service/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type kubeInfo struct {
mtx *sync.RWMutex
wg *sync.WaitGroup
services []service
pods []v1.Pod
namespaces []v1.Namespace
syncInterval time.Duration
namespace string
Expand Down Expand Up @@ -236,7 +237,7 @@ func (p services) Status() []KubeServiceStatus {
return components
}

func (k *kubeInfo) Pods(labels map[string]string) pods {
func (k *kubeInfo) podsFromK8S(labels map[string]string) pods {
pods := make([]v1.Pod, 0)
ls := ""
if len(labels) != 0 {
Expand All @@ -257,22 +258,28 @@ func (k *kubeInfo) Pods(labels map[string]string) pods {
return p.Items
}

func (k *kubeInfo) PodsByName(name string) pods {
retPods := make([]v1.Pod, 0)
func (k *kubeInfo) Pods() pods {
k.mtx.RLock()
defer k.mtx.RUnlock()

l := metav1.ListOptions{}
if name != "" {
l.FieldSelector = "metadata.name=" + name
}
return k.pods
}

p, err := client.CoreV1().Pods(k.namespace).List(l)
func (k *kubeInfo) PodsByName(name string) pods {
k.mtx.RLock()
defer k.mtx.RUnlock()

if err != nil {
log.Error("[k8s] get retPods fail", "err", err)
return retPods
if name == "" {
return k.pods
}

return p.Items
retPods := make([]v1.Pod, 0)
for _, p := range k.pods {
if p.Name == name {
retPods = append(retPods, p)
}
}
return retPods
}

type pods []v1.Pod
Expand Down Expand Up @@ -342,21 +349,47 @@ func (k *kubeInfo) sync() {
// panic(err.Error())
log.Error("[k8s] get namespaces err", "err", err)
}
services := make([]service, 0, len(svcs.Items))

// get services' and pods' data from Kubernetes
var serviceCh = make(chan service, 200)
var podCh = make(chan v1.Pod, 100)

k.wg.Add(len(svcs.Items))
for _, i := range svcs.Items {
go func(i v1.Service) {
s := service{}
s.Service = i
s.Pods = k.Pods(i.Spec.Selector)
services = append(services, s)
s.Pods = k.podsFromK8S(i.Spec.Selector)
for _, p := range s.Pods {
podCh <- p
}
serviceCh <- s
k.wg.Done()
}(i)
}
k.wg.Wait()
go func() {
k.wg.Wait()
close(serviceCh)
close(podCh)
}()

services := make([]service, 0, len(svcs.Items))
for s := range serviceCh {
services = append(services, s)
}

tmpPods := make(map[string]v1.Pod)
pods := make(pods, 0)
for p := range podCh {
if _, ok := tmpPods[string(p.UID)]; !ok {
pods = append(pods, p)
}
}

k.mtx.Lock()
k.services = services
k.namespaces = ns.Items
k.pods = pods
k.mtx.Unlock()

log.Debug("[Kube] sync end", "svcs", len(k.services), "namespace", k.namespace, "time", time.Now())
Expand Down

0 comments on commit f8d5420

Please sign in to comment.