diff --git a/control_plane.go b/control_plane.go index 890a7aa..c04f1fa 100644 --- a/control_plane.go +++ b/control_plane.go @@ -182,6 +182,10 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent return nil } +// handleUnSubscribeRequest handle the UnSubscribeRequest request from OpenSergo SDK. +// +// 1.remove cache of SubscribeTarget in Connection +// 2.remove watcher if there is no SubscribeTarget for the same kind in Connection cache. func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType { @@ -193,6 +197,7 @@ func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIde Namespace: request.Target.Namespace, App: request.Target.App, } + // remove the relation of Connection and SubscribeTarget from local cache err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier) if err != nil { @@ -209,21 +214,27 @@ func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIde continue } - targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind) - if len(targetConnections) < 1 { + // handle the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher. only push into a chan named delSubscribeConnChan waiting for delete. + // 1. if there is no kind cached in current Connection, push subscribeConnInfo with an empty NamespaceApp into delSubscribeConnChan, + // then, will delete the watcher for CRD. + // 2. if the number of relation between Connection and SubscribeTarget < 1, then push subscribeConnInfo with current NamespaceApp into delSubscribeConnChan, + // then 1st, will delete the SubscribeTarget which is cached in current Connection + // 2nd, will delte the watcher for CRD if there's no kind cached in current Connection + existConnection := c.server.ConnectionManager().ExistConnection(kind) + if !existConnection { delSubscribeConnChan <- delSubscribeConnInfo{ stream: stream, request: request, - namespaceApp: namespacedApp, + namespaceApp: model.NamespacedApp{}, kind: kind, } } else { - existConnection := c.server.ConnectionManager().ExistConnection(kind) - if !existConnection { + targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind) + if len(targetConnections) < 1 { delSubscribeConnChan <- delSubscribeConnInfo{ stream: stream, request: request, - namespaceApp: model.NamespacedApp{}, + namespaceApp: namespacedApp, kind: kind, } } @@ -233,6 +244,7 @@ func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIde return nil } +// delSubscribeConnChan a chan for delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher to stop watching. var delSubscribeConnChan chan delSubscribeConnInfo type delSubscribeConnInfo struct { @@ -242,24 +254,36 @@ type delSubscribeConnInfo struct { kind string } +// delConn a goroutine contains the logic of delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher. +// +// 1. at the beginning of current goroutine, should wait for the status of server is started. +// +// 2. when receive a delSubscribeConnInfo from delSubscribeConnChan, waiting a silence time to prevent inaccurate data statistics caused by network jitter. +// +// after the silence time, is the actually logic of deleting local cache and removing watcher. func (c *ControlPlane) delConn() { go func() { + // waiting for server is started. for !c.server.IsStarted() { time.Sleep(time.Duration(1) * time.Second) } + // receive from delSubscribeConnChan currDelConnInfo := <-delSubscribeConnChan namespaceApp := currDelConnInfo.namespaceApp kind := currDelConnInfo.kind request := currDelConnInfo.request stream := currDelConnInfo.stream + // wait a silence for network jitter // TODO make time of sleep is configurable time.Sleep(time.Duration(5) * time.Second) var err error // RemoveSubscribeTarget from CRDWatcher + // if namespaceApp is not an empty struct, means that need to delete SubscribeTarget cache in CRDWatcher if !reflect.DeepEqual(namespaceApp, model.NamespacedApp{}) { + // re-count the number of SubscribeTarget targetConnections, _ := c.server.ConnectionManager().Get(namespaceApp.Namespace, namespaceApp.App, kind) if len(targetConnections) < 1 { if crdWatcher, existed := c.operator.GetWatcher(kind); existed { @@ -272,7 +296,7 @@ func (c *ControlPlane) delConn() { } } - // delete Connection and CRDWatch + // remove the CRDWatch from KubernetesOperator, to stop watching the kind. existConnection := c.server.ConnectionManager().ExistConnection(kind) if !existConnection { c.operator.RemoveWatcher(model.SubscribeTarget{ @@ -282,7 +306,7 @@ func (c *ControlPlane) delConn() { }) } - // send ackMessage + // send ackMessage for UnSubscribeConfig request. status := &trpb.Status{ Code: transport.Success, Message: "unSubscribe success", diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index 73a5e05..8ffdcc4 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -105,10 +105,12 @@ func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error { r.updateMux.Lock() defer r.updateMux.Unlock() + // remove the subscribe-cache in this CRDWatcher delete(r.subscribedList, target) delete(r.subscribedNamespaces, target.Namespace) delete(r.subscribedApps, target.AppName) + // delete the matched crdCache which comes from k8s // TODO the 2nd param need fix to correct. r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{ Namespace: target.Namespace,