Skip to content

Commit

Permalink
feat(common/sync): reimplement QuickGroup for IPFS client
Browse files Browse the repository at this point in the history
Co-authored-by: KallyDev <[email protected]>
  • Loading branch information
polebug and kallydev committed Nov 17, 2023
1 parent 815a4d9 commit 5d37218
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ lint: generate
go run github.com/golangci/golangci-lint/cmd/[email protected] run

test:
go test -cover -v ./...
go test -cover -race -v ./...

.PHONY: build
build: generate
Expand Down
80 changes: 80 additions & 0 deletions common/sync/quickgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package sync

import (
"context"
"errors"
"sync"
"sync/atomic"
)

var ErrorNoResult = errors.New("no result")

type QuickGroup[R any] interface {
Go(f func(ctx context.Context) (R, error))
Wait() (R, error)
}

var _ QuickGroup[any] = (*quickGroup[any])(nil)

type quickGroup[R any] struct {
waitGroup sync.WaitGroup
ctx context.Context
cancels []context.CancelFunc
result R
err error
done atomic.Bool
locker sync.Mutex
}

func (q *quickGroup[R]) Go(f func(ctx context.Context) (R, error)) {
if q.done.Load() {
return
}

// Ensure that the cancel slice is thread-safe.
q.locker.Lock()

q.waitGroup.Add(1)

// Create a context to later cancel slow tasks.
ctx, cancel := context.WithCancel(q.ctx)
q.cancels = append(q.cancels, cancel)
index := len(q.cancels) - 1

q.locker.Unlock()

go func() {
defer q.waitGroup.Done()

if result, err := f(ctx); err == nil {
if !q.done.Swap(true) { // Here is equivalent to sync.Once.
q.result = result
q.err = nil

// Cancel all other pending tasks.
for i, cancel := range q.cancels {
if i == index {
continue
}

cancel()
}
}
}
}()
}

func (q *quickGroup[R]) Wait() (result R, err error) {
q.waitGroup.Wait()

return q.result, q.err
}

func NewQuickGroup[R any](ctx context.Context) QuickGroup[R] {
instance := &quickGroup[R]{
ctx: ctx,
err: ErrorNoResult, // Default error.
}

return instance
}
41 changes: 41 additions & 0 deletions common/sync/quickgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sync

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestQuickGroup(t *testing.T) {
t.Parallel()

quickGroup := NewQuickGroup[time.Duration](context.Background())

for duration := time.Second; duration > 0; duration -= 100 * time.Millisecond {
duration := duration

task := func(ctx context.Context) (time.Duration, error) {
timer := time.NewTimer(duration)
defer timer.Stop()

select {
case <-ctx.Done():
t.Logf("Task %s: %s", duration, context.Canceled)

return duration, context.Canceled
case <-timer.C:
t.Logf("Task %s: %s", duration, "done")

return duration, nil
}
}

quickGroup.Go(task)
}

result, err := quickGroup.Wait()
require.NoError(t, err)
require.Equal(t, result, 100*time.Millisecond)
}

0 comments on commit 5d37218

Please sign in to comment.