Skip to content

Commit

Permalink
fix: Send message to all connection
Browse files Browse the repository at this point in the history
  • Loading branch information
123liuziming committed Mar 2, 2023
1 parent 3801028 commit 53fe43e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
19 changes: 11 additions & 8 deletions control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/opensergo/opensergo-control-plane/pkg/model"
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc"
"github.com/pkg/errors"
)

type ControlPlane struct {
Expand Down Expand Up @@ -92,16 +91,20 @@ func (c *ControlPlane) Start() error {
return err
}

func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string, isSecure bool) error {
func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
var connections []*transport.Connection
var exists bool
if isSecure {
connections, exists = c.secureServer.ConnectionManager().Get(namespace, app, kind)
scs, exists := c.secureServer.ConnectionManager().Get(namespace, app, kind)
if !exists || connections == nil {
log.Printf("There is no secure connection for app %s kind %s in ns %s", app, kind, namespace)
} else {
connections, exists = c.server.ConnectionManager().Get(namespace, app, kind)
connections = append(connections, scs...)
}
cs, exists := c.server.ConnectionManager().Get(namespace, app, kind)
if !exists || connections == nil {
return errors.New("There is no connection for this kind")
log.Printf("There is no connection for app %s kind %s in ns %s", app, kind, namespace)
} else {
connections = append(connections, cs...)
}
return c.innerSendMessage(namespace, app, kind, dataWithVersion, status, respId, connections)
}
Expand Down Expand Up @@ -157,13 +160,13 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
continue
}

if isSecure {
_ = c.secureServer.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
} else {
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
}

rules, version := crdWatcher.GetRules(model.NamespacedApp{
Namespace: request.Target.Namespace,
App: request.Target.App,
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
Details: nil,
}
dataWithVersion := &trpb.DataWithVersion{Data: rules, Version: version}
err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, "", false)
errSecure := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, "", true)
if errSecure != nil && err != nil {
err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, "")
if err != nil {
logger.Error(err, "Failed to send rules", "kind", r.kind)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_Subscrib

type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream, bool) error

type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string, isSecure bool) error
type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error

0 comments on commit 53fe43e

Please sign in to comment.