Skip to content

Commit

Permalink
fix: service group not working well when callback takes long time
Browse files Browse the repository at this point in the history
Signed-off-by: kevin <[email protected]>
  • Loading branch information
kevwan committed Dec 30, 2024
1 parent 5c3679f commit 1c44dbe
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 2 deletions.
3 changes: 3 additions & 0 deletions core/proc/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (lm *listenerManager) addListener(fn func()) (waitForCalled func()) {
})
lm.lock.Unlock()

// we can return lm.waitGroup.Wait directly,
// but we want to make the returned func more readable.
// creating an extra closure would be negligible in practice.
return func() {
lm.waitGroup.Wait()
}
Expand Down
37 changes: 37 additions & 0 deletions core/proc/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package proc

import (
"sync/atomic"
"testing"
"time"

Expand All @@ -29,6 +30,42 @@ func TestShutdown(t *testing.T) {
assert.Equal(t, 3, val)
}

func TestShutdownWithMultipleServices(t *testing.T) {
SetTimeToForceQuit(time.Hour)
assert.Equal(t, time.Hour, delayTimeBeforeForceQuit)

var val int32
called1 := AddShutdownListener(func() {
atomic.AddInt32(&val, 1)
})
called2 := AddShutdownListener(func() {
atomic.AddInt32(&val, 2)
})
Shutdown()
called1()
called2()

assert.Equal(t, int32(3), atomic.LoadInt32(&val))
}

func TestWrapUpWithMultipleServices(t *testing.T) {
SetTimeToForceQuit(time.Hour)
assert.Equal(t, time.Hour, delayTimeBeforeForceQuit)

var val int32
called1 := AddWrapUpListener(func() {
atomic.AddInt32(&val, 1)
})
called2 := AddWrapUpListener(func() {
atomic.AddInt32(&val, 2)
})
WrapUp()
called1()
called2()

assert.Equal(t, int32(3), atomic.LoadInt32(&val))
}

func TestNotifyMoreThanOnce(t *testing.T) {
ch := make(chan struct{}, 1)

Expand Down
7 changes: 6 additions & 1 deletion core/service/servicegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ func (sg *ServiceGroup) doStart() {
}

func (sg *ServiceGroup) doStop() {
group := threading.NewRoutineGroup()
for _, service := range sg.services {
service.Stop()
// new variable to avoid closure problems, can be removed after go 1.22
// see https://golang.org/doc/faq#closures_and_goroutines
service := service
group.Run(service.Stop)
}
group.Wait()
}

// WithStart wraps a start func as a Service.
Expand Down
18 changes: 18 additions & 0 deletions gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jhump/protoreflect/grpcreflect"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mr"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/gateway/internal"
"github.com/zeromicro/go-zero/rest"
"github.com/zeromicro/go-zero/rest/httpx"
Expand All @@ -23,6 +24,7 @@ type (
Server struct {
*rest.Server
upstreams []Upstream
conns []zrpc.Client
processHeader func(http.Header) []string
dialer func(conf zrpc.RpcClientConf) zrpc.Client
}
Expand Down Expand Up @@ -52,7 +54,22 @@ func (s *Server) Start() {

// Stop stops the gateway server.
func (s *Server) Stop() {
// stop the HTTP server first, then close gRPC connections.
// in case of the gRPC server is stopped first,
// the HTTP server may still be running to accept requests.
s.Server.Stop()

group := threading.NewRoutineGroup()
for _, conn := range s.conns {
// new variable to avoid closure problems, can be removed after go 1.22
// see https://golang.org/doc/faq#closures_and_goroutines
conn := conn
group.Run(func() {
// ignore the error when closing the connection
_ = conn.Conn().Close()
})
}
group.Wait()
}

func (s *Server) build() error {
Expand All @@ -71,6 +88,7 @@ func (s *Server) build() error {
} else {
cli = zrpc.MustNewClient(up.Grpc)
}
s.conns = append(s.conns, cli)

source, err := s.createDescriptorSource(cli, up)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion gateway/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func dialer() func(context.Context, string) (net.Conn, error) {
func TestMustNewServer(t *testing.T) {
var c GatewayConf
assert.NoError(t, conf.FillDefault(&c))
// avoid popup alert on macos for asking permissions
// avoid popup alert on MacOS for asking permissions
c.DevServer.Host = "localhost"
c.Host = "localhost"
c.Port = 18881
Expand Down

0 comments on commit 1c44dbe

Please sign in to comment.