Skip to content

Commit

Permalink
Fix bugs of control_plane.go and KubernetesOperator
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Zhao <[email protected]>
  • Loading branch information
sczyh30 committed Sep 25, 2022
1 parent 43b760d commit a5cb3ff
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
32 changes: 20 additions & 12 deletions control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,31 @@ func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion
// TODO: log.Debug
continue
}
err := connection.Stream().SendMsg(&trpb.SubscribeResponse{
Status: status,
Ack: "",
Namespace: namespace,
App: app,
Kind: kind,
DataWithVersion: dataWithVersion,
ControlPlane: c.protoDesc,
ResponseId: respId,
})
err := c.sendMessageToStream(connection.Stream(), namespace, app, kind, dataWithVersion, status, respId)
if err != nil {
// TODO: should not short-break here. Handle partial failure here.
return err
}
}
return nil
}

func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
if stream == nil {
return nil
}
return stream.SendMsg(&trpb.SubscribeResponse{
Status: status,
Ack: "",
Namespace: namespace,
App: app,
Kind: kind,
DataWithVersion: dataWithVersion,
ControlPlane: c.protoDesc,
ResponseId: respId,
})
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
//var labels []model.LabelKV
//if request.Target.Labels != nil {
Expand All @@ -119,7 +127,7 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
Message: "Register watcher error",
Details: nil,
}
err = c.sendMessage(request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId)
err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId)
if err != nil {
// TODO: log here
}
Expand All @@ -141,7 +149,7 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
Data: rules,
Version: version,
}
err = c.sendMessage(request.Target.Namespace, request.Target.App, kind, dataWithVersion, status, request.RequestId)
err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, dataWithVersion, status, request.RequestId)
if err != nil {
// TODO: log here
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/controller/k8s_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,23 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
var err error

existingWatcher, exists := k.controllers[target.Kind]
if exists && !existingWatcher.HasSubscribed(target) {
// TODO: think more about here
err = existingWatcher.AddSubscribeTarget(target)
if err != nil {
return existingWatcher, err
if exists {
if existingWatcher.HasSubscribed(target) {
// Target has been subscribed
return existingWatcher, nil
} else {
// Add subscribe to existing watcher
err = existingWatcher.AddSubscribeTarget(target)
if err != nil {
return nil, err
}
}
} else {
crdMetadata, crdSupports := GetCrdMetadata(target.Kind)
if !crdSupports {
return nil, errors.New("CRD not supported: " + target.Kind)
}
// This kind of CRD has never been watched.
crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler)
err = crdWatcher.AddSubscribeTarget(target)
if err != nil {
Expand All @@ -146,7 +152,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
}
k.controllers[target.Kind] = crdWatcher
}
setupLog.Info("OpenSergo CRD watcher has been registered successfully")
setupLog.Info("OpenSergo CRD watcher has been registered successfully", "kind", target.Kind, "namespace", target.Namespace, "app", target.AppName)
return k.controllers[target.Kind], nil
}

Expand Down

0 comments on commit a5cb3ff

Please sign in to comment.