Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 inmemory: fix watch to continue serving based on resourceVersion parameter #11695

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
8 changes: 5 additions & 3 deletions test/infrastructure/inmemory/pkg/runtime/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ type resourceGroupTracker struct {
lock sync.RWMutex
objects map[schema.GroupVersionKind]map[types.NamespacedName]client.Object
// ownedObjects tracks ownership. Key is the owner, values are the owned objects.
ownedObjects map[ownReference]map[ownReference]struct{}
ownedObjects map[ownReference]map[ownReference]struct{}
lastResourceVersion uint64
}

type ownReference struct {
Expand Down Expand Up @@ -159,8 +160,9 @@ func (c *cache) AddResourceGroup(name string) {
return
}
c.resourceGroups[name] = &resourceGroupTracker{
objects: map[schema.GroupVersionKind]map[types.NamespacedName]client.Object{},
ownedObjects: map[ownReference]map[ownReference]struct{}{},
objects: map[schema.GroupVersionKind]map[types.NamespacedName]client.Object{},
ownedObjects: map[ownReference]map[ownReference]struct{}{},
lastResourceVersion: 0,
}
}

Expand Down
9 changes: 6 additions & 3 deletions test/infrastructure/inmemory/pkg/runtime/cache/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (c *cache) List(resourceGroup string, list client.ObjectList, opts ...clien
if err := meta.SetList(list, items); err != nil {
return apierrors.NewInternalError(err)
}

list.SetResourceVersion(fmt.Sprintf("%d", tracker.lastResourceVersion))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: is it correct that list ResourceVersion in the system and not the last resource version in the list of items?
Q: is is correct set set this value non matter of it is a "plain" list or a lis watch?
(from a quick check with kubectl yes to both, but I like a confirmation)


return nil
}

Expand Down Expand Up @@ -212,7 +215,7 @@ func (c *cache) store(resourceGroup string, obj client.Object, replaceExisting b
return apierrors.NewConflict(unsafeGuessGroupVersionResource(objGVK).GroupResource(), objKey.String(), fmt.Errorf("object has been modified"))
}

c.beforeUpdate(resourceGroup, trackedObj, obj)
c.beforeUpdate(resourceGroup, trackedObj, obj, &tracker.lastResourceVersion)

tracker.objects[objGVK][objKey] = obj.DeepCopyObject().(client.Object)
updateTrackerOwnerReferences(tracker, trackedObj, obj, objRef)
Expand All @@ -226,7 +229,7 @@ func (c *cache) store(resourceGroup string, obj client.Object, replaceExisting b
return apierrors.NewNotFound(unsafeGuessGroupVersionResource(objGVK).GroupResource(), objKey.String())
}

c.beforeCreate(resourceGroup, obj)
c.beforeCreate(resourceGroup, obj, &tracker.lastResourceVersion)

tracker.objects[objGVK][objKey] = obj.DeepCopyObject().(client.Object)
updateTrackerOwnerReferences(tracker, nil, obj, objRef)
Expand Down Expand Up @@ -422,7 +425,7 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
oldObj := obj.DeepCopyObject().(client.Object)
now := metav1.Time{Time: time.Now().UTC()}
obj.SetDeletionTimestamp(&now)
c.beforeUpdate(resourceGroup, oldObj, obj)
c.beforeUpdate(resourceGroup, oldObj, obj, &tracker.lastResourceVersion)

objects[objKey] = obj
c.afterUpdate(resourceGroup, oldObj, obj)
Expand Down
8 changes: 6 additions & 2 deletions test/infrastructure/inmemory/pkg/runtime/cache/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_cache_client(t *testing.T) {
r := c.resourceGroups["foo"].objects[cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)][key]
g.Expect(r.GetObjectKind().GroupVersionKind()).To(BeComparableTo(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must be set")
g.Expect(r.GetName()).To(Equal("bar"), "name must be equal to object tracker key")
g.Expect(r.GetResourceVersion()).To(Equal("v1"), "resourceVersion must be set")
g.Expect(r.GetResourceVersion()).To(Equal("1"), "resourceVersion must be set")
g.Expect(r.GetCreationTimestamp()).ToNot(BeZero(), "creation timestamp must be set")
g.Expect(r.GetAnnotations()).To(HaveKey(lastSyncTimeAnnotation), "last sync annotation must exists")

Expand Down Expand Up @@ -271,7 +271,7 @@ func Test_cache_client(t *testing.T) {
// Check all the computed fields are as expected.
g.Expect(obj.GetObjectKind().GroupVersionKind()).To(BeComparableTo(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must be set")
g.Expect(obj.GetName()).To(Equal("bar"), "name must be equal to object tracker key")
g.Expect(obj.GetResourceVersion()).To(Equal("v1"), "resourceVersion must be set")
g.Expect(obj.GetResourceVersion()).To(Equal("2"), "resourceVersion must be set")
g.Expect(obj.GetCreationTimestamp()).ToNot(BeZero(), "creation timestamp must be set")
g.Expect(obj.GetAnnotations()).To(HaveKey(lastSyncTimeAnnotation), "last sync annotation must be set")
})
Expand Down Expand Up @@ -435,6 +435,8 @@ func Test_cache_client(t *testing.T) {
g.Expect(objBefore.GetResourceVersion()).ToNot(Equal(objUpdate.GetResourceVersion()), "Object version must be changed")
objBefore.SetResourceVersion(objUpdate.GetResourceVersion())
objBefore.Labels = objUpdate.Labels
g.Expect(objUpdate.GetGeneration()).To(Equal(objBefore.GetGeneration()+1), "Object Generation must increment")
objBefore.Generation = objUpdate.GetGeneration()
g.Expect(objBefore).To(BeComparableTo(objUpdate), "everything else must be the same")

g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=baz, Updated"))
Expand Down Expand Up @@ -670,6 +672,8 @@ func Test_cache_client(t *testing.T) {
g.Expect(objBefore.GetResourceVersion()).ToNot(Equal(objAfterUpdate.GetResourceVersion()), "Object version must be changed")
objBefore.SetResourceVersion(objAfterUpdate.GetResourceVersion())
objBefore.Labels = objAfterUpdate.Labels
g.Expect(objAfterUpdate.GetGeneration()).To(Equal(objBefore.GetGeneration()+1), "Object Generation must increment")
objBefore.Generation = objAfterUpdate.GetGeneration()
g.Expect(objBefore).To(BeComparableTo(objAfterUpdate), "everything else must be the same")

g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=baz, Deleted"))
Expand Down
16 changes: 9 additions & 7 deletions test/infrastructure/inmemory/pkg/runtime/cache/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,30 @@ package cache
import (
"fmt"
"reflect"
"strconv"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *cache) beforeCreate(_ string, obj client.Object) {
func (c *cache) beforeCreate(_ string, obj client.Object, resourceVersion *uint64) {
now := time.Now().UTC()
obj.SetCreationTimestamp(metav1.Time{Time: now})
// TODO: UID
obj.SetAnnotations(appendAnnotations(obj, lastSyncTimeAnnotation, now.Format(time.RFC3339)))
obj.SetResourceVersion(fmt.Sprintf("v%d", 1))
*resourceVersion++
obj.SetResourceVersion(fmt.Sprintf("%d", *resourceVersion))
obj.SetGeneration(1)
}

func (c *cache) afterCreate(resourceGroup string, obj client.Object) {
c.informCreate(resourceGroup, obj)
}

func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object) {
func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object, resourceVersion *uint64) {
newObj.SetCreationTimestamp(oldObj.GetCreationTimestamp())
newObj.SetResourceVersion(oldObj.GetResourceVersion())
newObj.SetGeneration(oldObj.GetGeneration())
// TODO: UID
newObj.SetAnnotations(appendAnnotations(newObj, lastSyncTimeAnnotation, oldObj.GetAnnotations()[lastSyncTimeAnnotation]))
if !oldObj.GetDeletionTimestamp().IsZero() {
Expand All @@ -51,8 +52,9 @@ func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object) {
now := time.Now().UTC()
newObj.SetAnnotations(appendAnnotations(newObj, lastSyncTimeAnnotation, now.Format(time.RFC3339)))

oldResourceVersion, _ := strconv.Atoi(strings.TrimPrefix(oldObj.GetResourceVersion(), "v"))
newObj.SetResourceVersion(fmt.Sprintf("v%d", oldResourceVersion+1))
*resourceVersion++
newObj.SetResourceVersion(fmt.Sprintf("%d", *resourceVersion))
newObj.SetGeneration(oldObj.GetGeneration() + 1)
}
}

Expand Down
40 changes: 26 additions & 14 deletions test/infrastructure/inmemory/pkg/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

inmemoryruntime "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/runtime"
inmemoryclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/runtime/client"
inmemoryportforward "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/server/api/portforward"
)

Expand Down Expand Up @@ -315,6 +316,25 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
return
}

h.log.V(3).Info(fmt.Sprintf("Serving List for %v", req.Request.URL), "resourceGroup", resourceGroup)

list, err := h.apiV1list(ctx, req, *gvk, inmemoryClient)
if err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
return
}
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}

if err := resp.WriteEntity(list); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
}

func (h *apiServerHandler) apiV1list(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be

Suggested change
func (h *apiServerHandler) apiV1list(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) {
func (h *apiServerHandler) v1List(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) {

(easier to distinguish from apiV1List with upper L)

// Reads and returns the requested data.
list := &unstructured.UnstructuredList{}
list.SetAPIVersion(gvk.GroupVersion().String())
Expand All @@ -328,33 +348,23 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
// TODO: The only field Selector which works is for `spec.nodeName` on pods.
fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
return nil, err
}
if fieldSelector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector})
}

labelSelector, err := labels.Parse(req.QueryParameter("labelSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
return nil, err
}
if labelSelector != nil {
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
}
if err := inmemoryClient.List(ctx, list, listOpts...); err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
return
}
_ = resp.WriteHeaderAndEntity(http.StatusInternalServerError, err.Error())
return
}
if err := resp.WriteEntity(list); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
return nil, err
}
return list, nil
}

func (h *apiServerHandler) apiV1Watch(req *restful.Request, resp *restful.Response) {
Expand All @@ -372,6 +382,8 @@ func (h *apiServerHandler) apiV1Watch(req *restful.Request, resp *restful.Respon
return
}

h.log.V(3).Info(fmt.Sprintf("Serving Watch for %v", req.Request.URL), "resourceGroup", resourceGroup)

// If the request is a Watch handle it using watchForResource.
err = h.watchForResource(req, resp, resourceGroup, *gvk)
if err != nil {
Expand Down
97 changes: 91 additions & 6 deletions test/infrastructure/inmemory/pkg/server/api/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"time"

"github.com/emicklei/go-restful/v3"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Event records a lifecycle event for a Kubernetes object.
type Event struct {
Type watch.EventType `json:"type,omitempty"`
Object runtime.Object `json:"object,omitempty"`
Object client.Object `json:"object,omitempty"`
}

// WatchEventDispatcher dispatches events for a single resourceGroup.
Expand Down Expand Up @@ -88,13 +89,15 @@ func (m *WatchEventDispatcher) OnGeneric(resourceGroup string, o client.Object)

func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.Response, resourceGroup string, gvk schema.GroupVersionKind) (reterr error) {
ctx := req.Request.Context()
log := h.log.WithValues("resourceGroup", resourceGroup, "gvk", gvk.String())
ctx = ctrl.LoggerInto(ctx, log)
queryTimeout := req.QueryParameter("timeoutSeconds")
resourceVersion := req.QueryParameter("resourceVersion")
c := h.manager.GetCache()
i, err := c.GetInformerForKind(ctx, gvk)
if err != nil {
return err
}
h.log.Info(fmt.Sprintf("Serving Watch for %v", req.Request.URL))
// With an unbuffered event channel RemoveEventHandler could be blocked because it requires a lock on the informer.
// When Run stops reading from the channel the informer could be blocked with an unbuffered chanel and then RemoveEventHandler never goes through.
// 1000 is used to avoid deadlocks in clusters with a higher number of Machines/Nodes.
Expand All @@ -115,7 +118,12 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.
L:
for {
select {
case <-events:
case event, ok := <-events:
if !ok {
// End of results.
break L
}
log.V(4).Info("Missed event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
default:
break L
}
Expand All @@ -124,11 +132,49 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.
// Note: After we removed the handler, no new events will be written to the events channel.
}()

return watcher.Run(ctx, queryTimeout, resp)
// Get at client to the resource group and list all relevant objects.
inmemoryClient := h.manager.GetResourceGroup(resourceGroup).GetClient()
list, err := h.apiV1list(ctx, req, gvk, inmemoryClient)
if err != nil {
return err
}

// If resourceVersion was set parse to uint64 which is the representation in the simulated apiserver.
var parsedResourceVersion uint64
if resourceVersion != "" {
parsedResourceVersion, err = strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return err
}
}

initialEvents := []Event{}

// Loop over all items and fill the list of events with objects which have a newer resourceVersion.
for _, obj := range list.Items {
if resourceVersion != "" {
objResourceVersion, err := strconv.ParseUint(obj.GetResourceVersion(), 10, 64)
if err != nil {
return err
}
if objResourceVersion <= parsedResourceVersion {
chrischdi marked this conversation as resolved.
Show resolved Hide resolved
continue
}
}
eventType := watch.Modified
// kube-apiserver emits all events as ADDED when no resourceVersion is given.
if obj.GetGeneration() == 1 || resourceVersion == "" {
eventType = watch.Added
}
initialEvents = append(initialEvents, Event{Type: eventType, Object: &obj})
}

return watcher.Run(ctx, queryTimeout, initialEvents, resp)
}

// Run serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.ResponseWriter) error {
func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, initialEvents []Event, w http.ResponseWriter) error {
log := ctrl.LoggerFrom(ctx)
flusher, ok := w.(http.Flusher)
if !ok {
return errors.New("can't start Watch: can't get http.Flusher")
Expand All @@ -139,6 +185,16 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay that we don't flush after this like before?


// Write all initial events.
for _, event := range initialEvents {
if err := resp.WriteEntity(event); err != nil {
log.Error(err, "Writing old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Error(err, "Writing old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
log.Error(err, "Error writing old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about using "initial event" instead of "old event"?

_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
} else {
log.V(4).Info("Wrote old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
}
}
flusher.Flush()

timeoutTimer, seconds, err := setTimer(timeout)
Expand All @@ -149,6 +205,18 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
ctx, cancel := context.WithTimeout(ctx, seconds)
defer cancel()
defer timeoutTimer.Stop()

// Determine the highest written resourceVersion so we can filter out duplicated events from the channel.
minResourceVersion := uint64(0)
if len(initialEvents) > 0 {
minResourceVersion, err = strconv.ParseUint(initialEvents[len(initialEvents)-1].Object.GetResourceVersion(), 10, 64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of assuming an ordering of events by resource version (which I'm not sure we are enforcing somewhere), what about computing minResourceVersion when we go through initialEvents in the for loop above

if err != nil {
return err
}
minResourceVersion++
}

var objResourceVersion uint64
for {
select {
case <-ctx.Done():
Expand All @@ -160,8 +228,25 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
// End of results.
return nil
}

// Parse and check if the object has a higher resource version than we allow.
objResourceVersion, err = strconv.ParseUint(event.Object.GetResourceVersion(), 10, 64)
if err != nil {
log.Error(err, "Parsing object resource version", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
continue
}

// Skip objects which were already written.
if objResourceVersion < minResourceVersion {
continue
}

if err := resp.WriteEntity(event); err != nil {
log.Error(err, "Writing event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Error(err, "Writing event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
log.Error(err, "Error writing event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())

_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
} else {
log.V(4).Info("Wrote event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
}
if len(m.events) == 0 {
flusher.Flush()
Expand Down
Loading