Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable partial cluster resyncs (#520) #519

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 86 additions & 17 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ type clusterCache struct {
lock sync.RWMutex
resources map[kube.ResourceKey]*Resource
nsIndex map[string]map[kube.ResourceKey]*Resource
// syncErrorTimes records when the last error occurred attempting to sync a certain GK. Storing this info allows us
// to resync only the GKs with errors and only after a retry timeout.
syncErrorTimes map[schema.GroupKind]*time.Time

kubectl kube.Kubectl
log logr.Logger
Expand Down Expand Up @@ -425,20 +428,23 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
}

// clusterCacheSync's lock should be held before calling this method
func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) bool {
func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) syncType {
syncTime := syncStatus.syncTime

if syncTime == nil {
return false
return syncTypeFull
}
if syncStatus.syncError != nil {
return time.Now().Before(syncTime.Add(clusterRetryTimeout))
if time.Now().After(syncTime.Add(syncStatus.resyncTimeout)) {
if syncStatus.resyncTimeout == 0 {
// cluster resync timeout has been disabled
return syncTypeNone
}
return syncTypeFull
}
if syncStatus.resyncTimeout == 0 {
// cluster resync timeout has been disabled
return true
if syncStatus.syncError != nil && time.Now().Before(syncTime.Add(clusterRetryTimeout)) {
return syncTypePartial
}
return time.Now().Before(syncTime.Add(syncStatus.resyncTimeout))
return syncTypeNone
}

func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
Expand Down Expand Up @@ -702,15 +708,53 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource
return nil
}

func (c *clusterCache) sync() error {
type syncType string

const (
syncTypeNone = "none"
syncTypeFull = "full"
syncTypePartial = "partial"
)

func (c *clusterCache) shouldSyncGK(st syncType, gk schema.GroupKind) bool {
if st == syncTypeFull {
return true
}
if st == syncTypePartial {
errorTime, ok := c.syncErrorTimes[gk]
if ok {
if errorTime == nil {
// The last attempted sync had no error, no need to retry.
return false
}
// There was an error on the last attempted sync of this gk.
if time.Now().After(errorTime.Add(c.clusterSyncRetryTimeout)) {
// Retry timeout has passed, so we should retry.
return true
}
// There was an error, but the retry timeout has not passed, so we should not retry.
return false
} else {
// We didn't try to fetch this GK on the last sync, so we should try to fetch it now.
return true
}
// There was no error on the last attempted sync of this gk, so we should not retry.
}
return false
}

func (c *clusterCache) sync(st syncType) error {
c.log.Info("Start syncing cluster")

for i := range c.apisMeta {
c.apisMeta[i].watchCancel()
if st == syncTypeFull {
for i := range c.apisMeta {
c.apisMeta[i].watchCancel()
}
c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.resources = make(map[kube.ResourceKey]*Resource)
c.syncErrorTimes = make(map[schema.GroupKind]*time.Time)
c.namespacedResources = make(map[schema.GroupKind]bool)
}
c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.resources = make(map[kube.ResourceKey]*Resource)
c.namespacedResources = make(map[schema.GroupKind]bool)
config := c.config
version, err := c.kubectl.GetServerVersion(config)

Expand Down Expand Up @@ -749,6 +793,7 @@ func (c *clusterCache) sync() error {
return err
}
lock := sync.Mutex{}
now := time.Now()
err = kube.RunAllAsync(len(apis), func(i int) error {
api := apis[i]

Expand All @@ -757,6 +802,10 @@ func (c *clusterCache) sync() error {
info := &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
c.apisMeta[api.GroupKind] = info
c.namespacedResources[api.GroupKind] = api.Meta.Namespaced
if !c.shouldSyncGK(st, api.GroupKind) {
lock.Unlock()
return nil
}
lock.Unlock()

return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
Expand All @@ -773,9 +822,27 @@ func (c *clusterCache) sync() error {
})
})
if err != nil {
lock.Lock()
c.syncErrorTimes[api.GroupKind] = &now
lock.Unlock()
return fmt.Errorf("failed to load initial state of resource %s: %v", api.GroupKind.String(), err)
}

lock.Lock()
// Record that the gk was fetched, but that there was no error.
c.syncErrorTimes[api.GroupKind] = nil
lock.Unlock()

if st == syncTypePartial {
lock.Lock()
if _, ok := c.syncErrorTimes[api.GroupKind]; ok {
// We managed to successfully sync the resource. Clear the error time so we don't retry until the
// next full sync.
delete(c.syncErrorTimes, api.GroupKind)
}
lock.Unlock()
}

go c.watchEvents(ctx, api, resClient, ns, resourceVersion)

return nil
Expand All @@ -796,7 +863,8 @@ func (c *clusterCache) EnsureSynced() error {

// first check if cluster is synced *without acquiring the full clusterCache lock*
syncStatus.lock.Lock()
if syncStatus.synced(c.clusterSyncRetryTimeout) {
st := syncStatus.synced(c.clusterSyncRetryTimeout)
if st == syncTypeNone {
syncError := syncStatus.syncError
syncStatus.lock.Unlock()
return syncError
Expand All @@ -810,10 +878,11 @@ func (c *clusterCache) EnsureSynced() error {

// before doing any work, check once again now that we have the lock, to see if it got
// synced between the first check and now
if syncStatus.synced(c.clusterSyncRetryTimeout) {
st = syncStatus.synced(c.clusterSyncRetryTimeout)
if st == syncTypeNone {
return syncStatus.syncError
}
err := c.sync()
err := c.sync(st)
syncTime := time.Now()
syncStatus.syncTime = &syncTime
syncStatus.syncError = err
Expand Down