diff --git a/control_plane.go b/control_plane.go index 1cedfec..c0febf0 100644 --- a/control_plane.go +++ b/control_plane.go @@ -15,6 +15,7 @@ package opensergo import ( + "log" "os" "sync" @@ -130,6 +131,7 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId) if err != nil { // TODO: log here + log.Printf("sendMessageToStream failed, err=%s\n", err.Error()) } continue } @@ -152,6 +154,7 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, dataWithVersion, status, request.RequestId) if err != nil { // TODO: log here + log.Printf("sendMessageToStream failed, err=%s\n", err.Error()) } } } diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index 7017420..6316ffb 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -148,10 +148,10 @@ func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } app := "" - hasAppLabel := true if crd != nil { // TODO: bugs here: we need to check for namespace-app group, not only for app. // And we may also need to check for namespace change of a CRD. + var hasAppLabel bool app, hasAppLabel = crd.GetLabels()["app"] appSubscribed := r.HasAnySubscribedOfApp(app) if !hasAppLabel || !appSubscribed { diff --git a/pkg/transport/grpc/server.go b/pkg/transport/grpc/server.go index 8e1f76c..fc43adc 100644 --- a/pkg/transport/grpc/server.go +++ b/pkg/transport/grpc/server.go @@ -16,14 +16,15 @@ package grpc import ( "fmt" + "io" + "log" + "net" + "github.com/opensergo/opensergo-control-plane/pkg/model" trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "github.com/opensergo/opensergo-control-plane/pkg/util" "go.uber.org/atomic" "google.golang.org/grpc" - "io" - "log" - "net" ) const ( @@ -117,8 +118,10 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor // This indicates the received data is a response of push-failure. if recvData.Status.Code == CheckFormatError { // TODO: handle here (cannot retry) + log.Println("Client response CheckFormatError") } else { // TODO: record error here and do something + log.Printf("Client response NACK, code=%d\n", recvData.Status.Code) } } else { // This indicates the received data is a SubscribeRequest. @@ -150,7 +153,6 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor } } - return nil } func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler) *TransportServer { diff --git a/pkg/util/format_util.go b/pkg/util/format_util.go index 55a3dc3..a8d2b4f 100644 --- a/pkg/util/format_util.go +++ b/pkg/util/format_util.go @@ -15,9 +15,10 @@ package util import ( - pb "github.com/opensergo/opensergo-control-plane/pkg/proto/fault_tolerance/v1" "strconv" "strings" + + pb "github.com/opensergo/opensergo-control-plane/pkg/proto/fault_tolerance/v1" ) func Str2MillSeconds(timeStr string) (int64, error) {