Skip to content

Commit

Permalink
fix issues/7248 which runs sequentially
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Apr 9, 2024
1 parent c7fed90 commit 0ee8a70
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 99 deletions.
72 changes: 72 additions & 0 deletions pkg/pool/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"context"
"sync"
)

// Work is a unit of item to be worked on, like Java Runnable.
type Work func()

// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool.
type WorkerPool interface {
// Init initializes the worker pool.
Init()

// Go waits until the next worker becomes available and executes the given work.
Go(work Work)

// Close cancels all workers and waits for them to finish.
Close()

// Size returns the number of workers in the pool.
Size() int
}

type workerPool struct {
sync.Once
workCh chan Work
cancel context.CancelFunc
}

func NewWorkerPool(workers uint) WorkerPool {
return &workerPool{
workCh: make(chan Work, workers),
}
}

func (p *workerPool) Init() {
p.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

for i := 0; i < cap(p.workCh); i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case work := <-p.workCh:
work()
}
}
}()
}
})
}

func (p *workerPool) Go(work Work) {
p.Init()
p.workCh <- work
}

func (p *workerPool) Close() {
p.cancel()
}

func (p *workerPool) Size() int {
return cap(p.workCh)
}
31 changes: 31 additions & 0 deletions pkg/pool/worker_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"github.com/stretchr/testify/require"
"sync"
"sync/atomic"
"testing"
)

func TestGo(t *testing.T) {
var expectedWorksDone uint32
workerPoolSize := 5
p := NewWorkerPool(workerPoolSize)

Check failure on line 16 in pkg/pool/worker_pool_test.go

View workflow job for this annotation

GitHub Actions / Thanos unit tests

cannot use workerPoolSize (variable of type int) as uint value in argument to NewWorkerPool
p.Init()
defer p.Close()

var wg sync.WaitGroup
for i := 0; i < workerPoolSize*3; i++ {
wg.Add(1)
p.Go(func() {
atomic.AddUint32(&expectedWorksDone, 1)
wg.Done()
})
}
wg.Wait()
require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone)
require.Equal(t, workerPoolSize, p.Size())
}
133 changes: 34 additions & 99 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"

extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -771,7 +772,7 @@ func (h *Handler) distributeTimeseriesToReplicas(
if err != nil {
return nil, nil, err
}
endpointReplica := endpointReplica{endpoint: endpoint, replica: 0}
endpointReplica := endpointReplica{endpoint: endpoint, replica: rn}
var writeDestination = remoteWrites
if endpoint == h.options.Endpoint {
writeDestination = localWrites
Expand Down Expand Up @@ -804,13 +805,13 @@ func (h *Handler) sendWrites(
// Do the writes to the local node first. This should be easy and fast.
for writeDestination := range localWrites {
func(writeDestination endpointReplica) {
go h.sendLocalWrite(ctx, writeDestination, params.tenant, localWrites[writeDestination], responses)
h.sendLocalWrite(ctx, writeDestination, params.tenant, localWrites[writeDestination], responses)
}(writeDestination)
}

// Do the writes to remote nodes. Run them all in parallel.
for writeDestination := range remoteWrites {
go h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses, wg)
h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses, wg)
}
}

Expand Down Expand Up @@ -1188,90 +1189,24 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors {
return errs
}

func (pw *peerWorker) initWorkers() {
pw.initWorkersOnce.Do(func() {
work := make(chan peerWorkItem)
pw.work = work

ctx, cancel := context.WithCancel(context.Background())
pw.turnOffGoroutines = cancel

for i := 0; i < int(pw.asyncWorkerCount); i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case w := <-work:
pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds())

tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req)
w.workResult <- peerWorkResponse{
er: w.er,
err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint),
}
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
close(w.workResult)
}, opentracing.Tags{
"endpoint": w.er.endpoint,
"replica": w.er.replica,
})

}
}
}()
}

})
}

func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker {
return &peerWorker{
cc: cc,
asyncWorkerCount: asyncWorkerCount,
forwardDelay: forwardDelay,
cc: cc,
wp: pool.NewWorkerPool(asyncWorkerCount),
forwardDelay: forwardDelay,
}
}

type peerWorkItem struct {
cc *grpc.ClientConn
req *storepb.WriteRequest
workItemCtx context.Context

workResult chan peerWorkResponse
er endpointReplica
sendTime time.Time
}

func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
pw.initWorkers()

w := peerWorkItem{
cc: pw.cc,
req: in,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
sendTime: time.Now(),
}

pw.work <- w
return nil, (<-w.workResult).err
_, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in)
return nil, err
}

type peerWorker struct {
cc *grpc.ClientConn
wp pool.WorkerPool

work chan peerWorkItem
turnOffGoroutines func()

initWorkersOnce sync.Once
asyncWorkerCount uint
forwardDelay prometheus.Histogram
forwardDelay prometheus.Histogram
}

func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
Expand All @@ -1295,29 +1230,29 @@ type peersContainer interface {
reset()
}

type peerWorkResponse struct {
er endpointReplica
err error
}

func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) {
p.initWorkers()

w := peerWorkItem{
cc: p.cc,
req: req,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
er: er,

sendTime: time.Now(),
}

p.work <- w
res := <-w.workResult

responseWriter <- newWriteResponse(seriesIDs, res.err, er)
cb(res.err)
now := time.Now()
p.wp.Go(func() {
p.forwardDelay.Observe(time.Since(now).Seconds())

tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req)
responseWriter <- newWriteResponse(
seriesIDs,
errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint),
er,
)
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
cb(err)
}, opentracing.Tags{
"endpoint": er.endpoint,
"replica": er.replica,
})
})
}

type peerGroup struct {
Expand Down Expand Up @@ -1345,7 +1280,7 @@ func (p *peerGroup) close(addr string) error {
return nil
}

p.connections[addr].turnOffGoroutines()
p.connections[addr].wp.Close()
delete(p.connections, addr)
if err := c.cc.Close(); err != nil {
return fmt.Errorf("closing connection for %s", addr)
Expand Down

0 comments on commit 0ee8a70

Please sign in to comment.