From 4d4623e9e2f336289ba94ffc84e6d4aa6cd9b09c Mon Sep 17 00:00:00 2001 From: Eneko Fernandez Date: Fri, 1 Sep 2023 17:24:33 +0100 Subject: [PATCH 1/6] investigating memory leak --- core/clustersmngr/cluster/cluster.go | 7 +++++++ core/clustersmngr/cluster/single.go | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/core/clustersmngr/cluster/cluster.go b/core/clustersmngr/cluster/cluster.go index cab6c79a34..7bae71033e 100644 --- a/core/clustersmngr/cluster/cluster.go +++ b/core/clustersmngr/cluster/cluster.go @@ -3,10 +3,12 @@ package cluster import ( "context" "fmt" + "net/http" "os" "time" "github.com/weaveworks/weave-gitops/pkg/server/auth" + machnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sigs.k8s.io/cli-utils/pkg/flowcontrol" @@ -77,6 +79,11 @@ func WithFlowControl(config *rest.Config) (*rest.Config, error) { config.QPS = ClientQPS config.Burst = ClientBurst + // From https://github.com/weaveworks/weave-gitops-enterprise/issues/3189 + // Suggested in https://github.com/kubernetes/kubernetes/issues/118703#issuecomment-1595072383 + // TODO: Revert or adapt when upstream fix is available + config.Proxy = machnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment) + return config, nil } diff --git a/core/clustersmngr/cluster/single.go b/core/clustersmngr/cluster/single.go index 16721cfbad..3e587a011e 100644 --- a/core/clustersmngr/cluster/single.go +++ b/core/clustersmngr/cluster/single.go @@ -3,10 +3,12 @@ package cluster import ( "fmt" "net" + "net/http" "github.com/weaveworks/weave-gitops/pkg/kube" "github.com/weaveworks/weave-gitops/pkg/server/auth" apiruntime "k8s.io/apimachinery/pkg/runtime" + machnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" @@ -60,6 +62,11 @@ func getClientFromConfig(config *rest.Config, scheme *apiruntime.Scheme) (client return nil, fmt.Errorf("could not create RESTMapper from config: %w", err) } + // From https://github.com/weaveworks/weave-gitops-enterprise/issues/3189 + // Suggested in https://github.com/kubernetes/kubernetes/issues/118703#issuecomment-1595072383 + // TODO: Revert or adapt when upstream fix is available + config.Proxy = machnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment) + client, err := client.New(config, client.Options{ Scheme: scheme, Mapper: mapper, From d18dc14ffa827ff1e860989fd88a8e366448f70c Mon Sep 17 00:00:00 2001 From: Eneko Fernandez Date: Tue, 12 Sep 2023 10:15:56 +0200 Subject: [PATCH 2/6] reduce watchrd frequency to see whether reduces the allocations therefore the memory footprint --- pkg/services/crd/fetcher.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/services/crd/fetcher.go b/pkg/services/crd/fetcher.go index f0438e9004..7293b6c0c7 100644 --- a/pkg/services/crd/fetcher.go +++ b/pkg/services/crd/fetcher.go @@ -11,7 +11,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -const watchCRDsFrequency = 30 * time.Second +const watchCRDsFrequency = 10 * time.Minute type Fetcher interface { IsAvailable(clusterName, name string) bool @@ -67,7 +67,6 @@ func (s *defaultFetcher) UpdateCRDList() { for clusterName, client := range client.ClientsPool().Clients() { crdList := &v1.CustomResourceDefinitionList{} - s.crds[clusterName] = []v1.CustomResourceDefinition{} err := client.List(ctx, crdList) From ef8b25ccce27a631fb984ceccb44f7280f5839e2 Mon Sep 17 00:00:00 2001 From: Eneko Fernandez Date: Tue, 19 Sep 2023 09:10:13 +0200 Subject: [PATCH 3/6] do not have flow control --- core/clustersmngr/cluster/cluster.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/clustersmngr/cluster/cluster.go b/core/clustersmngr/cluster/cluster.go index 7bae71033e..4c767c1b19 100644 --- a/core/clustersmngr/cluster/cluster.go +++ b/core/clustersmngr/cluster/cluster.go @@ -30,8 +30,9 @@ const ( ) var ( - kubeClientTimeout = getEnvDuration("WEAVE_GITOPS_KUBE_CLIENT_TIMEOUT", 30*time.Second) - DefaultKubeConfigOptions = []KubeConfigOption{WithFlowControl} + kubeClientTimeout = getEnvDuration("WEAVE_GITOPS_KUBE_CLIENT_TIMEOUT", 30*time.Second) + // DefaultKubeConfigOptions = []KubeConfigOption{WithFlowControl} + DefaultKubeConfigOptions = []KubeConfigOption{} ) type KubeConfigOption func(*rest.Config) (*rest.Config, error) From d97c5015819289787cfcd6120a24e10e9e96ff8c Mon Sep 17 00:00:00 2001 From: Eneko Fernandez Date: Wed, 20 Sep 2023 10:59:44 +0200 Subject: [PATCH 4/6] adding logs for debugging connection caching --- core/clustersmngr/factory.go | 11 +++++++++-- core/clustersmngr/factory_caches.go | 14 ++++++++++++-- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/core/clustersmngr/factory.go b/core/clustersmngr/factory.go index 36ab9beae5..1a0bef5c6d 100644 --- a/core/clustersmngr/factory.go +++ b/core/clustersmngr/factory.go @@ -34,7 +34,7 @@ const ( ) var ( - usersClientsTTL = getEnvDuration("WEAVE_GITOPS_USERS_CLIENTS_TTL", 30*time.Minute) + usersClientsTTL = getEnvDuration("WEAVE_GITOPS_USERS_CLIENTS_TTL", 24*time.Hour) ) func getEnvDuration(key string, defaultDuration time.Duration) time.Duration { @@ -228,7 +228,7 @@ func NewClustersManager(fetchers []ClusterFetcher, nsChecker nsaccess.Checker, l clusters: &Clusters{}, clustersNamespaces: &ClustersNamespaces{}, usersNamespaces: &UsersNamespaces{Cache: ttlcache.New(userNamespaceResolution)}, - usersClients: &UsersClients{Cache: ttlcache.New(usersClientResolution)}, + usersClients: &UsersClients{Cache: ttlcache.New(usersClientResolution), log: logger}, log: logger, initialClustersLoad: make(chan bool), watchers: []*ClustersWatcher{}, @@ -609,11 +609,15 @@ func (cf *clustersManager) getOrCreateClient(user *auth.UserPrincipal, cluster c } isServer = true } + cf.log.Info("getOrCreateClient", "user", user.ID, "cluster", cluster.GetName()) if client, found := cf.usersClients.Get(user, cluster.GetName()); found { + cf.log.Info("found cached client") return client, nil } + cf.log.Info("client not found in cache so creating ", "user", user.ID, "cluster", cluster.GetName()) + var ( client client.Client err error @@ -622,9 +626,11 @@ func (cf *clustersManager) getOrCreateClient(user *auth.UserPrincipal, cluster c if isServer { opsCreateServerClient.WithLabelValues(cluster.GetName()).Inc() client, err = cluster.GetServerClient() + cf.log.Info("created server client", "user", user.ID, "cluster", cluster.GetName()) } else { opsCreateUserClient.WithLabelValues(cluster.GetName()).Inc() client, err = cluster.GetUserClient(user) + cf.log.Info("created user client", "user", user.ID, "cluster", cluster.GetName()) } if err != nil { @@ -632,6 +638,7 @@ func (cf *clustersManager) getOrCreateClient(user *auth.UserPrincipal, cluster c } cf.usersClients.Set(user, cluster.GetName(), client) + cf.log.Info("set client", "user", user.ID, "cluster", cluster.GetName()) return client, nil } diff --git a/core/clustersmngr/factory_caches.go b/core/clustersmngr/factory_caches.go index 79331b8918..da449834c3 100644 --- a/core/clustersmngr/factory_caches.go +++ b/core/clustersmngr/factory_caches.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/cheshir/ttlcache" + "github.com/go-logr/logr" "github.com/weaveworks/weave-gitops/core/clustersmngr/cluster" "github.com/weaveworks/weave-gitops/pkg/server/auth" v1 "k8s.io/api/core/v1" @@ -155,6 +156,7 @@ func (un UsersNamespaces) cacheKey(user *auth.UserPrincipal, cluster string) uin type UsersClients struct { Cache *ttlcache.Cache + log logr.Logger } func (uc *UsersClients) cacheKey(user *auth.UserPrincipal, clusterName string) uint64 { @@ -162,14 +164,22 @@ func (uc *UsersClients) cacheKey(user *auth.UserPrincipal, clusterName string) u } func (uc *UsersClients) Set(user *auth.UserPrincipal, clusterName string, client client.Client) { - uc.Cache.Set(uc.cacheKey(user, clusterName), client, usersClientsTTL) + cacheKey := uc.cacheKey(user, clusterName) + uc.log.Info("set cached connection", "user", user, "cluster", clusterName, "cacheKey", cacheKey) + + uc.Cache.Set(cacheKey, client, usersClientsTTL) } func (uc *UsersClients) Get(user *auth.UserPrincipal, clusterName string) (client.Client, bool) { - if val, found := uc.Cache.Get(uc.cacheKey(user, clusterName)); found { + cacheKey := uc.cacheKey(user, clusterName) + uc.log.Info("get cached connection", "user", user, "cluster", clusterName, "cacheKey", cacheKey) + + if val, found := uc.Cache.Get(cacheKey); found { + uc.log.Info("client found") return val.(client.Client), true } + uc.log.Info("client not found") return nil, false } From 41323808ed57b3f5a91e8bafa525f4c126938b50 Mon Sep 17 00:00:00 2001 From: Eneko Fernandez Date: Wed, 20 Sep 2023 12:16:48 +0200 Subject: [PATCH 5/6] adding logs for debugging connection caching --- core/clustersmngr/factory_caches.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/clustersmngr/factory_caches.go b/core/clustersmngr/factory_caches.go index da449834c3..dd0a9f8d5c 100644 --- a/core/clustersmngr/factory_caches.go +++ b/core/clustersmngr/factory_caches.go @@ -165,9 +165,16 @@ func (uc *UsersClients) cacheKey(user *auth.UserPrincipal, clusterName string) u func (uc *UsersClients) Set(user *auth.UserPrincipal, clusterName string, client client.Client) { cacheKey := uc.cacheKey(user, clusterName) - uc.log.Info("set cached connection", "user", user, "cluster", clusterName, "cacheKey", cacheKey) + uc.log.Info("set cached connection", "user", user, "cluster", clusterName, "cacheKey", cacheKey, "ttl", usersClientsTTL) uc.Cache.Set(cacheKey, client, usersClientsTTL) + + if _, found := uc.Cache.Get(cacheKey); found { + uc.log.Info("found after set") + } else { + uc.log.Info("not found after set") + } + } func (uc *UsersClients) Get(user *auth.UserPrincipal, clusterName string) (client.Client, bool) { From c6cc497d3c09bcadf019236ec2be8cb08b7e7d02 Mon Sep 17 00:00:00 2001 From: Eneko Fernandez Date: Wed, 20 Sep 2023 12:38:43 +0200 Subject: [PATCH 6/6] setting ttl --- core/clustersmngr/factory.go | 19 +------------------ core/clustersmngr/factory_caches.go | 1 - 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/core/clustersmngr/factory.go b/core/clustersmngr/factory.go index 1a0bef5c6d..da6102020e 100644 --- a/core/clustersmngr/factory.go +++ b/core/clustersmngr/factory.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "os" "sync" "time" @@ -34,25 +33,9 @@ const ( ) var ( - usersClientsTTL = getEnvDuration("WEAVE_GITOPS_USERS_CLIENTS_TTL", 24*time.Hour) + usersClientsTTL = 24 * time.Hour ) -func getEnvDuration(key string, defaultDuration time.Duration) time.Duration { - val := os.Getenv(key) - if val == "" { - return defaultDuration - } - - d, err := time.ParseDuration(val) - - // on error return the default duration - if err != nil { - return defaultDuration - } - - return d -} - var ( opsUpdateClusters = prometheus.NewCounter( prometheus.CounterOpts{ diff --git a/core/clustersmngr/factory_caches.go b/core/clustersmngr/factory_caches.go index dd0a9f8d5c..bd2b4c8148 100644 --- a/core/clustersmngr/factory_caches.go +++ b/core/clustersmngr/factory_caches.go @@ -174,7 +174,6 @@ func (uc *UsersClients) Set(user *auth.UserPrincipal, clusterName string, client } else { uc.log.Info("not found after set") } - } func (uc *UsersClients) Get(user *auth.UserPrincipal, clusterName string) (client.Client, bool) {