Skip to content

Commit

Permalink
add retry mechanism for pushing config (opensergo#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
jk-tonycui committed Nov 15, 2022
1 parent 1df7486 commit 7226883
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 17 deletions.
50 changes: 35 additions & 15 deletions control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package opensergo

import (
"os"
"sync"

"github.com/avast/retry-go/v4"
"github.com/opensergo/opensergo-control-plane/pkg/controller"
"github.com/opensergo/opensergo-control-plane/pkg/model"
"github.com/opensergo/opensergo-control-plane/pkg/options"
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"os"
"sync"
)

type ControlPlane struct {
Expand All @@ -31,11 +34,14 @@ type ControlPlane struct {

protoDesc *trpb.ControlPlaneDesc

opts *options.Options

mux sync.RWMutex
}

func NewControlPlane() (*ControlPlane, error) {
func NewControlPlane(opts *options.Options) (*ControlPlane, error) {
cp := &ControlPlane{}
cp.opts = opts

operator, err := controller.NewKubernetesOperator(cp.sendMessage)
if err != nil {
Expand Down Expand Up @@ -89,20 +95,34 @@ func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion
return nil
}

func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, trpbStatus *trpb.Status, respId string) error {
if stream == nil {
return nil
}
return stream.SendMsg(&trpb.SubscribeResponse{
Status: status,
Ack: "",
Namespace: namespace,
App: app,
Kind: kind,
DataWithVersion: dataWithVersion,
ControlPlane: c.protoDesc,
ResponseId: respId,
})

return retry.Do(
func() error {
error := stream.SendMsg(&trpb.SubscribeResponse{
Status: trpbStatus,
Ack: "",
Namespace: namespace,
App: app,
Kind: kind,
DataWithVersion: dataWithVersion,
ControlPlane: c.protoDesc,
ResponseId: respId,
})

return error
},
retry.Attempts(uint(c.opts.ConfigPushMaxAttempt)),
retry.RetryIf(func(err error) bool {
s, _ := status.FromError(err)
if s.Code() == codes.DeadlineExceeded {
return true
}
return false
}))
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ go 1.14

require (
github.com/alibaba/sentinel-golang v1.0.3
github.com/avast/retry-go/v4 v4.3.0
github.com/envoyproxy/protoc-gen-validate v0.1.0
github.com/go-logr/logr v0.4.0
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/stretchr/testify v1.7.1 // indirect
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.1
go.uber.org/atomic v1.7.0
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/avast/retry-go/v4 v4.3.0 h1:cqI48aXx0BExKoM7XPklDpoHAg7/srPPLAfWG5z62jo=
github.com/avast/retry-go/v4 v4.3.0/go.mod h1:bqOlT4nxk4phk9buiQFaghzjpqdchOSwPgjdfdQBtdg=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
Expand Down Expand Up @@ -476,6 +478,7 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand All @@ -484,6 +487,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tklauser/go-sysconf v0.3.6/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM=
Expand Down Expand Up @@ -853,6 +857,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
7 changes: 6 additions & 1 deletion pkg/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package main

import (
"github.com/opensergo/opensergo-control-plane/pkg/options"
"log"

"github.com/opensergo/opensergo-control-plane"
)

func main() {
cp, err := opensergo.NewControlPlane()
opts, err := options.NewOption()
if err != nil {
log.Fatal(err)
}
cp, err := opensergo.NewControlPlane(opts)
if err != nil {
log.Fatal(err)
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package options

import (
"github.com/spf13/pflag"
"log"
"os"
)

type Options struct {
ConfigPushMaxAttempt int
flags *pflag.FlagSet
}

func (o *Options) AddFlag() {
o.flags.IntVar(&o.ConfigPushMaxAttempt, "ConfigPushMaxAttempt", 3, "max times for pushing config after timeout error")
}

func (o *Options) Parse() error {
err := o.flags.Parse(os.Args)
return err
}

func NewOption() (*Options, error) {
o := &Options{
flags: pflag.NewFlagSet("sergo-flag", pflag.ExitOnError),
}
o.AddFlag()
err := o.Parse()
if err != nil {
log.Fatalf("Parse flag failure: %s", err)
}
return o, err
}

0 comments on commit 7226883

Please sign in to comment.