From 147519affb09865758a7b6baa5124d9042df0c76 Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Mon, 11 Dec 2023 11:28:48 +0200 Subject: [PATCH 1/8] Add timeout support on send Add internal/errorgrp package to support cancellable error groups Add tests for push/pull timeout --- internal/errorgrp/errorgrp.go | 61 ++++++++++++++++++++++++++++++ internal/errorgrp/errorgrp_test.go | 61 ++++++++++++++++++++++++++++++ msgio.go | 3 +- options.go | 7 ++++ router.go | 4 +- socket.go | 5 ++- zmq4_timeout_test.go | 45 ++++++++++++++++++++++ 7 files changed, 181 insertions(+), 5 deletions(-) create mode 100644 internal/errorgrp/errorgrp.go create mode 100644 internal/errorgrp/errorgrp_test.go create mode 100644 zmq4_timeout_test.go diff --git a/internal/errorgrp/errorgrp.go b/internal/errorgrp/errorgrp.go new file mode 100644 index 0000000..57dc7f4 --- /dev/null +++ b/internal/errorgrp/errorgrp.go @@ -0,0 +1,61 @@ +// Package errorgrp is bit more advanced than errgroup +// Major difference is that when error group is created with WithContext2 +// the parent context would implicitly cancel all functions called by Go method. +// +// The name is selected so you can mix regular errgroup and errorgrp in same file. +package errorgrp + +import ( + "context" + + "golang.org/x/sync/errgroup" +) + +// The Group2 is superior errgroup.Group which aborts whole group +// execution when parent context is cancelled +type Group2 struct { + grp *errgroup.Group + ctx context.Context +} + +// WithContext2 creates Group2 and store inside parent context +// so the Go method would respect parent context cancellation +func WithContext2(ctx context.Context) (*Group2, context.Context) { + grp, child_ctx := errgroup.WithContext(ctx) + return &Group2{grp: grp, ctx: ctx}, child_ctx +} + +// Go function would wait for parent context to be cancelled, +// or func f to be complete complete +func (g *Group2) Go(f func() error) { + g.grp.Go(func() error { + // If parent context is canceled, + // just return its error and do not call func f + select { + case <-g.ctx.Done(): + return g.ctx.Err() + default: + } + + // Create return channel + // and call func f + ch := make(chan error, 1) + go func() { + ch <- f() + }() + + // Wait func f complete or + // parent context to be cancelled, + select { + case err := <-ch: + return err + case <-g.ctx.Done(): + return g.ctx.Err() + } + }) +} + +// Wait is direct call to errgroup.Wait +func (g *Group2) Wait() error { + return g.grp.Wait() +} diff --git a/internal/errorgrp/errorgrp_test.go b/internal/errorgrp/errorgrp_test.go new file mode 100644 index 0000000..86b9463 --- /dev/null +++ b/internal/errorgrp/errorgrp_test.go @@ -0,0 +1,61 @@ +package errorgrp + +import ( + "context" + "fmt" + "testing" + + "golang.org/x/sync/errgroup" +) + +// TestErrGroupDoesNotRespectParentContext check regulare errgroup behavior +// where errgroup.WithContext does not respects the parent context +func TestErrGroupDoesNotRespectParentContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + eg, _ := errgroup.WithContext(ctx) + + er := fmt.Errorf("func generated error") + s := make(chan struct{}, 1) + eg.Go(func() error { + <-s + return er + }) + + // Abort context + cancel() + // Signal the func in regular errgroup to fail + s <- struct{}{} + // Wait regular errgroup complete and read error + err := eg.Wait() + + // The error shall be one returned by the function + // as regular errgroup.WithContext does not respect parent context + if err != er { + t.Fail() + } +} + +func TestErrorGrpWithContext2DoesRespectsParentContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + eg, _ := WithContext2(ctx) + + er := fmt.Errorf("func generated error") + s := make(chan struct{}, 1) + eg.Go(func() error { + <-s + return er + }) + + // Abort context + cancel() + // Signal the func in regular errgroup to fail + s <- struct{}{} + // Wait regular errgroup complete and read error + err := eg.Wait() + + // The error shall be one returned by the function + // as regular errgroup.WithContext does not respect parent context + if err != context.Canceled { + t.Fail() + } +} diff --git a/msgio.go b/msgio.go index 3f5e492..63c1f42 100644 --- a/msgio.go +++ b/msgio.go @@ -9,6 +9,7 @@ import ( "io" "sync" + "github.com/go-zeromq/zmq4/internal/errorgrp" "golang.org/x/sync/errgroup" ) @@ -167,7 +168,7 @@ func (mw *mwriter) rmConn(w *Conn) { func (w *mwriter) write(ctx context.Context, msg Msg) error { w.sem.lock(ctx) - grp, _ := errgroup.WithContext(ctx) + grp, _ := errorgrp.WithContext2(ctx) w.mu.Lock() for i := range w.ws { ww := w.ws[i] diff --git a/options.go b/options.go index d85b7c5..9edc738 100644 --- a/options.go +++ b/options.go @@ -44,6 +44,13 @@ func WithDialerTimeout(timeout time.Duration) Option { } } +// WithTimeout sets the timeout value for socket operations +func WithTimeout(timeout time.Duration) Option { + return func(s *socket) { + s.Timeout = timeout + } +} + // WithLogger sets a dedicated log.Logger for the socket. func WithLogger(msg *log.Logger) Option { return func(s *socket) { diff --git a/router.go b/router.go index d58de9a..fd95f6d 100644 --- a/router.go +++ b/router.go @@ -10,7 +10,7 @@ import ( "net" "sync" - "golang.org/x/sync/errgroup" + "github.com/go-zeromq/zmq4/internal/errorgrp" ) // NewRouter returns a new ROUTER ZeroMQ socket. @@ -225,7 +225,7 @@ func (mw *routerMWriter) rmConn(w *Conn) { func (w *routerMWriter) write(ctx context.Context, msg Msg) error { w.sem.lock(ctx) - grp, _ := errgroup.WithContext(ctx) + grp, _ := errorgrp.WithContext2(ctx) w.mu.Lock() id := msg.Frames[0] dmsg := NewMsgFrom(msg.Frames[1:]...) diff --git a/socket.go b/socket.go index d322e9f..ca7c95a 100644 --- a/socket.go +++ b/socket.go @@ -40,6 +40,7 @@ type socket struct { log *log.Logger subTopics func() []string autoReconnect bool + Timeout time.Duration mu sync.RWMutex conns []*Conn // ZMTP connections @@ -67,6 +68,7 @@ func newDefaultSocket(ctx context.Context, sockType SocketType) *socket { typ: sockType, retry: defaultRetry, maxRetries: defaultMaxRetries, + Timeout: defaultTimeout, sec: nullSecurity{}, conns: nil, r: newQReader(ctx), @@ -366,8 +368,7 @@ func (sck *socket) SetOption(name string, value interface{}) error { } func (sck *socket) timeout() time.Duration { - // FIXME(sbinet): extract from options - return defaultTimeout + return sck.Timeout } func (sck *socket) connReaper() { diff --git a/zmq4_timeout_test.go b/zmq4_timeout_test.go new file mode 100644 index 0000000..813e524 --- /dev/null +++ b/zmq4_timeout_test.go @@ -0,0 +1,45 @@ +package zmq4 + +import ( + "context" + "testing" + "time" +) + +func TestPushTimeout(t *testing.T) { + ep := "ipc://@push_timeout_test" + push := NewPush(context.Background(), WithTimeout(1*time.Second)) + defer push.Close() + if err := push.Listen(ep); err != nil { + t.FailNow() + } + + pull := NewPull(context.Background()) + defer pull.Close() + if err := pull.Dial(ep); err != nil { + t.FailNow() + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + // The ctx limits overall time of execution + // If it gets canceled, that meains tests failed + // as write to socket did not genereate timeout error + t.FailNow() + default: + } + + err := push.Send(NewMsgString("test string")) + if err == nil { + continue + } + if err != context.DeadlineExceeded { + t.FailNow() + } + break + } + +} From 601fff46f0f44bd3e55b48464018b0cd8756c162 Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Tue, 12 Dec 2023 13:34:28 +0200 Subject: [PATCH 2/8] The internal/errgroup is fully compatible with x/sync/errgroup. Increase test coverage. Minor correction according to PR review. --- internal/errgroup/errgroup.go | 111 ++++++++++++++++++++++++++ internal/errgroup/errgroup_test.go | 124 +++++++++++++++++++++++++++++ internal/errorgrp/errorgrp.go | 61 -------------- internal/errorgrp/errorgrp_test.go | 61 -------------- msgio.go | 4 +- options.go | 2 +- pub.go | 4 +- rep.go | 4 +- req.go | 4 +- router.go | 6 +- socket.go | 12 +-- zmq4_timeout_test.go | 9 ++- 12 files changed, 259 insertions(+), 143 deletions(-) create mode 100644 internal/errgroup/errgroup.go create mode 100644 internal/errgroup/errgroup_test.go delete mode 100644 internal/errorgrp/errorgrp.go delete mode 100644 internal/errorgrp/errorgrp_test.go diff --git a/internal/errgroup/errgroup.go b/internal/errgroup/errgroup.go new file mode 100644 index 0000000..b69b90c --- /dev/null +++ b/internal/errgroup/errgroup.go @@ -0,0 +1,111 @@ +// Copyright 2023 The go-zeromq Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup is bit more advanced than golang.org/x/sync/errgroup. +// Major difference is that when error group is created with WithContext +// the parent context would implicitly cancel all functions called by Go method. +package errgroup + +import ( + "context" + + "golang.org/x/sync/errgroup" +) + +// The Group is superior errgroup.Group which aborts whole group +// execution when parent context is cancelled +type Group struct { + grp *errgroup.Group + ctx context.Context +} + +// WithContext creates Group and store inside parent context +// so the Go method would respect parent context cancellation +func WithContext(ctx context.Context) (*Group, context.Context) { + grp, child_ctx := errgroup.WithContext(ctx) + return &Group{grp: grp, ctx: ctx}, child_ctx +} + +// Go runs the provided f function in a dedicated goroutine and waits for its +// completion or for the parent context cancellation. +func (g *Group) Go(f func() error) { + g.init() + + if g.ctx == nil { + g.grp.Go(f) + return + } + + g.grp.Go(g.wrap(f)) +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +// If the error group was created via WithContext then the Wait returns error +// of cancelled parent context prior any functions calls complete. +func (g *Group) Wait() error { + g.init() + return g.grp.Wait() +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + g.init() + g.grp.SetLimit(n) +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + g.init() + if g.ctx == nil { + return g.grp.TryGo(f) + } + + return g.grp.TryGo(g.wrap(f)) +} + +// The init method provides backward compatibility to x/sync/errgroup.Group +// when developers can create error group by i.e. ```eg := &errgroup.Group{}``` +func (g *Group) init() { + if g.grp == nil { + g.grp = &errgroup.Group{} + } +} + +func (g *Group) wrap(f func() error) func() error { + return func() error { + // If parent context is canceled, + // just return its error and do not call func f + select { + case <-g.ctx.Done(): + return g.ctx.Err() + default: + } + + // Create return channel + // and call func f + ch := make(chan error, 1) + go func() { + ch <- f() + }() + + // Wait func f complete or + // parent context to be cancelled, + select { + case err := <-ch: + return err + case <-g.ctx.Done(): + return g.ctx.Err() + } + } +} diff --git a/internal/errgroup/errgroup_test.go b/internal/errgroup/errgroup_test.go new file mode 100644 index 0000000..7f6b810 --- /dev/null +++ b/internal/errgroup/errgroup_test.go @@ -0,0 +1,124 @@ +// Copyright 2023 The go-zeromq Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package errgroup + +import ( + "context" + "fmt" + "testing" + + "golang.org/x/sync/errgroup" +) + +// TestRegularErrGroupDoesNotRespectParentContext checks regular errgroup behavior +// where errgroup.WithContext does not respect the parent context +func TestRegularErrGroupDoesNotRespectParentContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + eg, _ := errgroup.WithContext(ctx) + + what := fmt.Errorf("func generated error") + ch := make(chan error) + eg.Go(func() error { return <-ch }) + + cancel() // abort parent context + ch <- what // signal the func in regular errgroup to fail + err := eg.Wait() + + // The error shall be one returned by the function + // as regular errgroup.WithContext does not respect parent context + if err != what { + t.Errorf("invalid error. got=%+v, want=%+v", err, what) + } +} + +// TestErrGroupWithContextCanCallFunctions checks the errgroup operations +// are fine working and errgroup called function can return error +func TestErrGroupWithContextCanCallFunctions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + eg, _ := WithContext(ctx) + + what := fmt.Errorf("func generated error") + ch := make(chan error) + eg.Go(func() error { return <-ch }) + + ch <- what // signal the func in errgroup to fail + err := eg.Wait() // wait errgroup complete and read error + + // The error shall be one returned by the function + if err != what { + t.Errorf("invalid error. got=%+v, want=%+v", err, what) + } +} + +// TestErrGroupWithContextDoesRespectParentContext checks the errgroup operations +// are cancellable by parent context +func TestErrGroupWithContextDoesRespectParentContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + eg, _ := WithContext(ctx) + + s1 := make(chan struct{}) + s2 := make(chan struct{}) + eg.Go(func() error { + s1 <- struct{}{} + <-s2 + return fmt.Errorf("func generated error") + }) + + // We have no set limit to errgroup so + // shall be able to start function via TryGo + if ok := eg.TryGo(func() error { return nil }); !ok { + t.Errorf("Expected TryGo to be able start function!!!") + } + + <-s1 // wait for function to start + cancel() // abort parent context + + eg.Go(func() error { + t.Errorf("The parent context was already cancelled and this function shall not be called!!!") + return nil + }) + + s2 <- struct{}{} // signal the func in regular errgroup to fail + err := eg.Wait() // wait errgroup complete and read error + + // The error shall be one returned by the function + // as regular errgroup.WithContext does not respect parent context + if err != context.Canceled { + t.Errorf("expected a context.Canceled error, got=%+v", err) + } +} + +// TestErrGroupFallback tests fallback logic to be compatible with x/sync/errgroup +func TestErrGroupFallback(t *testing.T) { + eg := Group{} + eg.SetLimit(2) + + ch1 := make(chan error) + eg.Go(func() error { return <-ch1 }) + + ch2 := make(chan error) + ok := eg.TryGo(func() error { return <-ch2 }) + if !ok { + t.Errorf("Expected errgroup.TryGo to success!!!") + } + + // The limit set to 2, so 3rd function shall not be possible to call + ok = eg.TryGo(func() error { + t.Errorf("This function is unexpected to be called!!!") + return nil + }) + if ok { + t.Errorf("Expected errgroup.TryGo to fail!!!") + } + + ch1 <- nil + ch2 <- nil + err := eg.Wait() + + if err != nil { + t.Errorf("expected a nil error, got=%+v", err) + } +} diff --git a/internal/errorgrp/errorgrp.go b/internal/errorgrp/errorgrp.go deleted file mode 100644 index 57dc7f4..0000000 --- a/internal/errorgrp/errorgrp.go +++ /dev/null @@ -1,61 +0,0 @@ -// Package errorgrp is bit more advanced than errgroup -// Major difference is that when error group is created with WithContext2 -// the parent context would implicitly cancel all functions called by Go method. -// -// The name is selected so you can mix regular errgroup and errorgrp in same file. -package errorgrp - -import ( - "context" - - "golang.org/x/sync/errgroup" -) - -// The Group2 is superior errgroup.Group which aborts whole group -// execution when parent context is cancelled -type Group2 struct { - grp *errgroup.Group - ctx context.Context -} - -// WithContext2 creates Group2 and store inside parent context -// so the Go method would respect parent context cancellation -func WithContext2(ctx context.Context) (*Group2, context.Context) { - grp, child_ctx := errgroup.WithContext(ctx) - return &Group2{grp: grp, ctx: ctx}, child_ctx -} - -// Go function would wait for parent context to be cancelled, -// or func f to be complete complete -func (g *Group2) Go(f func() error) { - g.grp.Go(func() error { - // If parent context is canceled, - // just return its error and do not call func f - select { - case <-g.ctx.Done(): - return g.ctx.Err() - default: - } - - // Create return channel - // and call func f - ch := make(chan error, 1) - go func() { - ch <- f() - }() - - // Wait func f complete or - // parent context to be cancelled, - select { - case err := <-ch: - return err - case <-g.ctx.Done(): - return g.ctx.Err() - } - }) -} - -// Wait is direct call to errgroup.Wait -func (g *Group2) Wait() error { - return g.grp.Wait() -} diff --git a/internal/errorgrp/errorgrp_test.go b/internal/errorgrp/errorgrp_test.go deleted file mode 100644 index 86b9463..0000000 --- a/internal/errorgrp/errorgrp_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package errorgrp - -import ( - "context" - "fmt" - "testing" - - "golang.org/x/sync/errgroup" -) - -// TestErrGroupDoesNotRespectParentContext check regulare errgroup behavior -// where errgroup.WithContext does not respects the parent context -func TestErrGroupDoesNotRespectParentContext(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - eg, _ := errgroup.WithContext(ctx) - - er := fmt.Errorf("func generated error") - s := make(chan struct{}, 1) - eg.Go(func() error { - <-s - return er - }) - - // Abort context - cancel() - // Signal the func in regular errgroup to fail - s <- struct{}{} - // Wait regular errgroup complete and read error - err := eg.Wait() - - // The error shall be one returned by the function - // as regular errgroup.WithContext does not respect parent context - if err != er { - t.Fail() - } -} - -func TestErrorGrpWithContext2DoesRespectsParentContext(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - eg, _ := WithContext2(ctx) - - er := fmt.Errorf("func generated error") - s := make(chan struct{}, 1) - eg.Go(func() error { - <-s - return er - }) - - // Abort context - cancel() - // Signal the func in regular errgroup to fail - s <- struct{}{} - // Wait regular errgroup complete and read error - err := eg.Wait() - - // The error shall be one returned by the function - // as regular errgroup.WithContext does not respect parent context - if err != context.Canceled { - t.Fail() - } -} diff --git a/msgio.go b/msgio.go index 63c1f42..378da21 100644 --- a/msgio.go +++ b/msgio.go @@ -9,7 +9,7 @@ import ( "io" "sync" - "github.com/go-zeromq/zmq4/internal/errorgrp" + errgrp "github.com/go-zeromq/zmq4/internal/errgroup" "golang.org/x/sync/errgroup" ) @@ -168,7 +168,7 @@ func (mw *mwriter) rmConn(w *Conn) { func (w *mwriter) write(ctx context.Context, msg Msg) error { w.sem.lock(ctx) - grp, _ := errorgrp.WithContext2(ctx) + grp, _ := errgrp.WithContext(ctx) w.mu.Lock() for i := range w.ws { ww := w.ws[i] diff --git a/options.go b/options.go index 9edc738..416006a 100644 --- a/options.go +++ b/options.go @@ -47,7 +47,7 @@ func WithDialerTimeout(timeout time.Duration) Option { // WithTimeout sets the timeout value for socket operations func WithTimeout(timeout time.Duration) Option { return func(s *socket) { - s.Timeout = timeout + s.timeout = timeout } } diff --git a/pub.go b/pub.go index 8e6947e..365c6d4 100644 --- a/pub.go +++ b/pub.go @@ -39,7 +39,7 @@ func (pub *pubSocket) Close() error { // Send puts the message on the outbound send queue. // Send blocks until the message can be queued or the send deadline expires. func (pub *pubSocket) Send(msg Msg) error { - ctx, cancel := context.WithTimeout(pub.sck.ctx, pub.sck.timeout()) + ctx, cancel := context.WithTimeout(pub.sck.ctx, pub.sck.Timeout()) defer cancel() return pub.sck.w.write(ctx, msg) } @@ -49,7 +49,7 @@ func (pub *pubSocket) Send(msg Msg) error { // The message will be sent as a multipart message. func (pub *pubSocket) SendMulti(msg Msg) error { msg.multipart = true - ctx, cancel := context.WithTimeout(pub.sck.ctx, pub.sck.timeout()) + ctx, cancel := context.WithTimeout(pub.sck.ctx, pub.sck.Timeout()) defer cancel() return pub.sck.w.write(ctx, msg) } diff --git a/rep.go b/rep.go index b578bad..d9a1a78 100644 --- a/rep.go +++ b/rep.go @@ -34,7 +34,7 @@ func (rep *repSocket) Close() error { // Send puts the message on the outbound send queue. // Send blocks until the message can be queued or the send deadline expires. func (rep *repSocket) Send(msg Msg) error { - ctx, cancel := context.WithTimeout(rep.sck.ctx, rep.sck.timeout()) + ctx, cancel := context.WithTimeout(rep.sck.ctx, rep.sck.Timeout()) defer cancel() return rep.sck.w.write(ctx, msg) } @@ -44,7 +44,7 @@ func (rep *repSocket) Send(msg Msg) error { // The message will be sent as a multipart message. func (rep *repSocket) SendMulti(msg Msg) error { msg.multipart = true - ctx, cancel := context.WithTimeout(rep.sck.ctx, rep.sck.timeout()) + ctx, cancel := context.WithTimeout(rep.sck.ctx, rep.sck.Timeout()) defer cancel() return rep.sck.w.write(ctx, msg) } diff --git a/req.go b/req.go index d0de956..2b99aed 100644 --- a/req.go +++ b/req.go @@ -35,7 +35,7 @@ func (req *reqSocket) Close() error { // Send puts the message on the outbound send queue. // Send blocks until the message can be queued or the send deadline expires. func (req *reqSocket) Send(msg Msg) error { - ctx, cancel := context.WithTimeout(req.sck.ctx, req.sck.timeout()) + ctx, cancel := context.WithTimeout(req.sck.ctx, req.sck.Timeout()) defer cancel() return req.sck.w.write(ctx, msg) } @@ -45,7 +45,7 @@ func (req *reqSocket) Send(msg Msg) error { // The message will be sent as a multipart message. func (req *reqSocket) SendMulti(msg Msg) error { msg.multipart = true - ctx, cancel := context.WithTimeout(req.sck.ctx, req.sck.timeout()) + ctx, cancel := context.WithTimeout(req.sck.ctx, req.sck.Timeout()) defer cancel() return req.sck.w.write(ctx, msg) } diff --git a/router.go b/router.go index fd95f6d..69df364 100644 --- a/router.go +++ b/router.go @@ -10,7 +10,7 @@ import ( "net" "sync" - "github.com/go-zeromq/zmq4/internal/errorgrp" + "golang.org/x/sync/errgroup" ) // NewRouter returns a new ROUTER ZeroMQ socket. @@ -35,7 +35,7 @@ func (router *routerSocket) Close() error { // Send puts the message on the outbound send queue. // Send blocks until the message can be queued or the send deadline expires. func (router *routerSocket) Send(msg Msg) error { - ctx, cancel := context.WithTimeout(router.sck.ctx, router.sck.timeout()) + ctx, cancel := context.WithTimeout(router.sck.ctx, router.sck.Timeout()) defer cancel() return router.sck.w.write(ctx, msg) } @@ -225,7 +225,7 @@ func (mw *routerMWriter) rmConn(w *Conn) { func (w *routerMWriter) write(ctx context.Context, msg Msg) error { w.sem.lock(ctx) - grp, _ := errorgrp.WithContext2(ctx) + grp, _ := errgroup.WithContext(ctx) w.mu.Lock() id := msg.Frames[0] dmsg := NewMsgFrom(msg.Frames[1:]...) diff --git a/socket.go b/socket.go index ca7c95a..ca198a6 100644 --- a/socket.go +++ b/socket.go @@ -40,7 +40,7 @@ type socket struct { log *log.Logger subTopics func() []string autoReconnect bool - Timeout time.Duration + timeout time.Duration mu sync.RWMutex conns []*Conn // ZMTP connections @@ -68,7 +68,7 @@ func newDefaultSocket(ctx context.Context, sockType SocketType) *socket { typ: sockType, retry: defaultRetry, maxRetries: defaultMaxRetries, - Timeout: defaultTimeout, + timeout: defaultTimeout, sec: nullSecurity{}, conns: nil, r: newQReader(ctx), @@ -149,7 +149,7 @@ func (sck *socket) Close() error { // Send puts the message on the outbound send queue. // Send blocks until the message can be queued or the send deadline expires. func (sck *socket) Send(msg Msg) error { - ctx, cancel := context.WithTimeout(sck.ctx, sck.timeout()) + ctx, cancel := context.WithTimeout(sck.ctx, sck.Timeout()) defer cancel() return sck.w.write(ctx, msg) } @@ -159,7 +159,7 @@ func (sck *socket) Send(msg Msg) error { // The message will be sent as a multipart message. func (sck *socket) SendMulti(msg Msg) error { msg.multipart = true - ctx, cancel := context.WithTimeout(sck.ctx, sck.timeout()) + ctx, cancel := context.WithTimeout(sck.ctx, sck.Timeout()) defer cancel() return sck.w.write(ctx, msg) } @@ -367,8 +367,8 @@ func (sck *socket) SetOption(name string, value interface{}) error { return nil } -func (sck *socket) timeout() time.Duration { - return sck.Timeout +func (sck *socket) Timeout() time.Duration { + return sck.timeout } func (sck *socket) connReaper() { diff --git a/zmq4_timeout_test.go b/zmq4_timeout_test.go index 813e524..76d41b5 100644 --- a/zmq4_timeout_test.go +++ b/zmq4_timeout_test.go @@ -1,3 +1,6 @@ +// Copyright 2023 The go-zeromq Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. package zmq4 import ( @@ -26,9 +29,9 @@ func TestPushTimeout(t *testing.T) { select { case <-ctx.Done(): // The ctx limits overall time of execution - // If it gets canceled, that meains tests failed + // If it gets canceled, that meain tests failed // as write to socket did not genereate timeout error - t.FailNow() + t.Fatalf("test failed before being able to generate timeout error: %+v", ctx.Err()) default: } @@ -37,7 +40,7 @@ func TestPushTimeout(t *testing.T) { continue } if err != context.DeadlineExceeded { - t.FailNow() + t.Fatalf("expected a context.DeadlineExceeded error, got=%+v", err) } break } From 5d3dd2d682d2e5d60d769182e5280b6c8377d37b Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Tue, 12 Dec 2023 13:39:35 +0200 Subject: [PATCH 3/8] Remove buffered channel as not needed --- internal/errgroup/errgroup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/errgroup/errgroup.go b/internal/errgroup/errgroup.go index b69b90c..213a591 100644 --- a/internal/errgroup/errgroup.go +++ b/internal/errgroup/errgroup.go @@ -94,7 +94,7 @@ func (g *Group) wrap(f func() error) func() error { // Create return channel // and call func f - ch := make(chan error, 1) + ch := make(chan error) go func() { ch <- f() }() From c3a202d03c84ef9658fc245eba69b5a938833519 Mon Sep 17 00:00:00 2001 From: Sergey Egorov Date: Tue, 12 Dec 2023 13:45:43 +0200 Subject: [PATCH 4/8] Return buffered channel and add comment about reasons --- internal/errgroup/errgroup.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/errgroup/errgroup.go b/internal/errgroup/errgroup.go index 213a591..e6d1ae1 100644 --- a/internal/errgroup/errgroup.go +++ b/internal/errgroup/errgroup.go @@ -92,9 +92,12 @@ func (g *Group) wrap(f func() error) func() error { default: } - // Create return channel - // and call func f - ch := make(chan error) + // Create return channel and call func f + // Buffered channel is used as the following select + // may be exiting by context cancellation + // and in such case the write to channel can be block + // and cause the go routine leak + ch := make(chan error, 1) go func() { ch <- f() }() From d94467dca88572fbaf20ad321f63a20720b3ccca Mon Sep 17 00:00:00 2001 From: egorse Date: Wed, 13 Dec 2023 20:25:40 +0200 Subject: [PATCH 5/8] Get rid of init. The go vet complain on noCopy is open --- internal/errgroup/errgroup.go | 28 +++++----------------------- zmq4_timeout_test.go | 1 + 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/internal/errgroup/errgroup.go b/internal/errgroup/errgroup.go index e6d1ae1..984bdf0 100644 --- a/internal/errgroup/errgroup.go +++ b/internal/errgroup/errgroup.go @@ -16,7 +16,7 @@ import ( // The Group is superior errgroup.Group which aborts whole group // execution when parent context is cancelled type Group struct { - grp *errgroup.Group + grp errgroup.Group ctx context.Context } @@ -24,19 +24,12 @@ type Group struct { // so the Go method would respect parent context cancellation func WithContext(ctx context.Context) (*Group, context.Context) { grp, child_ctx := errgroup.WithContext(ctx) - return &Group{grp: grp, ctx: ctx}, child_ctx + return &Group{grp: *grp, ctx: ctx}, child_ctx } // Go runs the provided f function in a dedicated goroutine and waits for its // completion or for the parent context cancellation. func (g *Group) Go(f func() error) { - g.init() - - if g.ctx == nil { - g.grp.Go(f) - return - } - g.grp.Go(g.wrap(f)) } @@ -45,7 +38,6 @@ func (g *Group) Go(f func() error) { // If the error group was created via WithContext then the Wait returns error // of cancelled parent context prior any functions calls complete. func (g *Group) Wait() error { - g.init() return g.grp.Wait() } @@ -57,7 +49,6 @@ func (g *Group) Wait() error { // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { - g.init() g.grp.SetLimit(n) } @@ -66,23 +57,14 @@ func (g *Group) SetLimit(n int) { // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { - g.init() - if g.ctx == nil { - return g.grp.TryGo(f) - } - return g.grp.TryGo(g.wrap(f)) } -// The init method provides backward compatibility to x/sync/errgroup.Group -// when developers can create error group by i.e. ```eg := &errgroup.Group{}``` -func (g *Group) init() { - if g.grp == nil { - g.grp = &errgroup.Group{} +func (g *Group) wrap(f func() error) func() error { + if g.ctx == nil { + return f } -} -func (g *Group) wrap(f func() error) func() error { return func() error { // If parent context is canceled, // just return its error and do not call func f diff --git a/zmq4_timeout_test.go b/zmq4_timeout_test.go index 76d41b5..725f1a8 100644 --- a/zmq4_timeout_test.go +++ b/zmq4_timeout_test.go @@ -1,6 +1,7 @@ // Copyright 2023 The go-zeromq Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. + package zmq4 import ( From a5b84a64ad796ec1975ec127c078409b34e111dc Mon Sep 17 00:00:00 2001 From: egorse Date: Wed, 13 Dec 2023 21:28:52 +0200 Subject: [PATCH 6/8] Fix go vet errors Unfortunatelly the errgroup.Group is not copyable so had to handle implicity initialization in all methods. --- internal/errgroup/errgroup.go | 16 ++++++++++++++-- rep_test.go | 1 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/internal/errgroup/errgroup.go b/internal/errgroup/errgroup.go index 984bdf0..078c970 100644 --- a/internal/errgroup/errgroup.go +++ b/internal/errgroup/errgroup.go @@ -16,7 +16,7 @@ import ( // The Group is superior errgroup.Group which aborts whole group // execution when parent context is cancelled type Group struct { - grp errgroup.Group + grp *errgroup.Group ctx context.Context } @@ -24,12 +24,15 @@ type Group struct { // so the Go method would respect parent context cancellation func WithContext(ctx context.Context) (*Group, context.Context) { grp, child_ctx := errgroup.WithContext(ctx) - return &Group{grp: *grp, ctx: ctx}, child_ctx + return &Group{grp: grp, ctx: ctx}, child_ctx } // Go runs the provided f function in a dedicated goroutine and waits for its // completion or for the parent context cancellation. func (g *Group) Go(f func() error) { + if g.grp == nil { + g.grp = &errgroup.Group{} + } g.grp.Go(g.wrap(f)) } @@ -38,6 +41,9 @@ func (g *Group) Go(f func() error) { // If the error group was created via WithContext then the Wait returns error // of cancelled parent context prior any functions calls complete. func (g *Group) Wait() error { + if g.grp == nil { + g.grp = &errgroup.Group{} + } return g.grp.Wait() } @@ -49,6 +55,9 @@ func (g *Group) Wait() error { // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { + if g.grp == nil { + g.grp = &errgroup.Group{} + } g.grp.SetLimit(n) } @@ -57,6 +66,9 @@ func (g *Group) SetLimit(n int) { // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { + if g.grp == nil { + g.grp = &errgroup.Group{} + } return g.grp.TryGo(g.wrap(f)) } diff --git a/rep_test.go b/rep_test.go index 9d5dba5..e449f68 100644 --- a/rep_test.go +++ b/rep_test.go @@ -134,6 +134,7 @@ func TestCancellation(t *testing.T) { defer wg.Done() repCtx, cancel := context.WithCancel(context.Background()) + defer cancel() rep := zmq4.NewRep(repCtx) defer rep.Close() From 4fa0b49fa83089d7b4b96aa522b4915558fad98c Mon Sep 17 00:00:00 2001 From: egorse Date: Thu, 14 Dec 2023 21:18:57 +0200 Subject: [PATCH 7/8] Minor improvements for implicit errgroup initialization Should be coverage friendly --- internal/errgroup/errgroup.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/internal/errgroup/errgroup.go b/internal/errgroup/errgroup.go index 078c970..ac4acfa 100644 --- a/internal/errgroup/errgroup.go +++ b/internal/errgroup/errgroup.go @@ -30,10 +30,7 @@ func WithContext(ctx context.Context) (*Group, context.Context) { // Go runs the provided f function in a dedicated goroutine and waits for its // completion or for the parent context cancellation. func (g *Group) Go(f func() error) { - if g.grp == nil { - g.grp = &errgroup.Group{} - } - g.grp.Go(g.wrap(f)) + g.getErrGroup().Go(g.wrap(f)) } // Wait blocks until all function calls from the Go method have returned, then @@ -41,10 +38,7 @@ func (g *Group) Go(f func() error) { // If the error group was created via WithContext then the Wait returns error // of cancelled parent context prior any functions calls complete. func (g *Group) Wait() error { - if g.grp == nil { - g.grp = &errgroup.Group{} - } - return g.grp.Wait() + return g.getErrGroup().Wait() } // SetLimit limits the number of active goroutines in this group to at most n. @@ -55,10 +49,7 @@ func (g *Group) Wait() error { // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { - if g.grp == nil { - g.grp = &errgroup.Group{} - } - g.grp.SetLimit(n) + g.getErrGroup().SetLimit(n) } // TryGo calls the given function in a new goroutine only if the number of @@ -66,10 +57,7 @@ func (g *Group) SetLimit(n int) { // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { - if g.grp == nil { - g.grp = &errgroup.Group{} - } - return g.grp.TryGo(g.wrap(f)) + return g.getErrGroup().TryGo(g.wrap(f)) } func (g *Group) wrap(f func() error) func() error { @@ -106,3 +94,14 @@ func (g *Group) wrap(f func() error) func() error { } } } + +// The getErrGroup returns actual x/sync/errgroup.Group. +// If the group is not allocated it would implicitly allocate it. +// Thats allows the internal/errgroup.Group be fully +// compatible to x/sync/errgroup.Group +func (g *Group) getErrGroup() *errgroup.Group { + if g.grp == nil { + g.grp = &errgroup.Group{} + } + return g.grp +} From 2bf3109b39c86ec698c1297a824bcfc4df0ce820 Mon Sep 17 00:00:00 2001 From: egorse Date: Thu, 14 Dec 2023 22:06:47 +0200 Subject: [PATCH 8/8] Minor correction to rarely failed TestConnReaperDeadlock The fix is similar to one of previous commit with deadlock fix --- msgio.go | 2 +- pub.go | 2 +- router.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/msgio.go b/msgio.go index 378da21..d648674 100644 --- a/msgio.go +++ b/msgio.go @@ -64,11 +64,11 @@ func (q *qreader) Close() error { } func (q *qreader) addConn(r *Conn) { - go q.listen(q.ctx, r) q.mu.Lock() q.sem.enable() q.rs = append(q.rs, r) q.mu.Unlock() + go q.listen(q.ctx, r) } func (q *qreader) rmConn(r *Conn) { diff --git a/pub.go b/pub.go index 365c6d4..a65ac4d 100644 --- a/pub.go +++ b/pub.go @@ -149,11 +149,11 @@ func (q *pubQReader) Close() error { } func (q *pubQReader) addConn(r *Conn) { - go q.listen(q.ctx, r) q.mu.Lock() q.sem.enable() q.rs = append(q.rs, r) q.mu.Unlock() + go q.listen(q.ctx, r) } func (q *pubQReader) rmConn(r *Conn) { diff --git a/router.go b/router.go index 69df364..aebb14a 100644 --- a/router.go +++ b/router.go @@ -119,11 +119,11 @@ func (q *routerQReader) Close() error { } func (q *routerQReader) addConn(r *Conn) { - go q.listen(q.ctx, r) q.mu.Lock() q.sem.enable() q.rs = append(q.rs, r) q.mu.Unlock() + go q.listen(q.ctx, r) } func (q *routerQReader) rmConn(r *Conn) {