diff --git a/pkg/pool/worker_pool.go b/pkg/pool/worker_pool.go new file mode 100644 index 00000000000..891bfae29fc --- /dev/null +++ b/pkg/pool/worker_pool.go @@ -0,0 +1,72 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "context" + "sync" +) + +// Work is a unit of item to be worked on, like Java Runnable. +type Work func() + +// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool. +type WorkerPool interface { + // Init initializes the worker pool. + Init() + + // Go waits until the next worker becomes available and executes the given work. + Go(work Work) + + // Close cancels all workers and waits for them to finish. + Close() + + // Size returns the number of workers in the pool. + Size() int +} + +type workerPool struct { + sync.Once + workCh chan Work + cancel context.CancelFunc +} + +func NewWorkerPool(workers uint) WorkerPool { + return &workerPool{ + workCh: make(chan Work, workers), + } +} + +func (p *workerPool) Init() { + p.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + + for i := 0; i < cap(p.workCh); i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case work := <-p.workCh: + work() + } + } + }() + } + }) +} + +func (p *workerPool) Go(work Work) { + p.Init() + p.workCh <- work +} + +func (p *workerPool) Close() { + p.cancel() +} + +func (p *workerPool) Size() int { + return cap(p.workCh) +} diff --git a/pkg/pool/worker_pool_test.go b/pkg/pool/worker_pool_test.go new file mode 100644 index 00000000000..b560e8004c5 --- /dev/null +++ b/pkg/pool/worker_pool_test.go @@ -0,0 +1,31 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "github.com/stretchr/testify/require" + "sync" + "sync/atomic" + "testing" +) + +func TestGo(t *testing.T) { + var expectedWorksDone uint32 + workerPoolSize := 5 + p := NewWorkerPool(workerPoolSize) + p.Init() + defer p.Close() + + var wg sync.WaitGroup + for i := 0; i < workerPoolSize*3; i++ { + wg.Add(1) + p.Go(func() { + atomic.AddUint32(&expectedWorksDone, 1) + wg.Done() + }) + } + wg.Wait() + require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone) + require.Equal(t, workerPoolSize, p.Size()) +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 827abe22a6b..c2a0536fc4c 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -43,6 +43,7 @@ import ( "github.com/thanos-io/thanos/pkg/logging" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -771,7 +772,7 @@ func (h *Handler) distributeTimeseriesToReplicas( if err != nil { return nil, nil, err } - endpointReplica := endpointReplica{endpoint: endpoint, replica: 0} + endpointReplica := endpointReplica{endpoint: endpoint, replica: rn} var writeDestination = remoteWrites if endpoint == h.options.Endpoint { writeDestination = localWrites @@ -804,13 +805,13 @@ func (h *Handler) sendWrites( // Do the writes to the local node first. This should be easy and fast. for writeDestination := range localWrites { func(writeDestination endpointReplica) { - go h.sendLocalWrite(ctx, writeDestination, params.tenant, localWrites[writeDestination], responses) + h.sendLocalWrite(ctx, writeDestination, params.tenant, localWrites[writeDestination], responses) }(writeDestination) } // Do the writes to remote nodes. Run them all in parallel. for writeDestination := range remoteWrites { - go h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses, wg) + h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses, wg) } } @@ -1188,90 +1189,24 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func (pw *peerWorker) initWorkers() { - pw.initWorkersOnce.Do(func() { - work := make(chan peerWorkItem) - pw.work = work - - ctx, cancel := context.WithCancel(context.Background()) - pw.turnOffGoroutines = cancel - - for i := 0; i < int(pw.asyncWorkerCount); i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case w := <-work: - pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds()) - - tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) { - _, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req) - w.workResult <- peerWorkResponse{ - er: w.er, - err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint), - } - if err != nil { - sp := trace.SpanFromContext(ctx) - sp.SetAttributes(attribute.Bool("error", true)) - sp.SetAttributes(attribute.String("error.msg", err.Error())) - } - close(w.workResult) - }, opentracing.Tags{ - "endpoint": w.er.endpoint, - "replica": w.er.replica, - }) - - } - } - }() - } - - }) -} - func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker { return &peerWorker{ - cc: cc, - asyncWorkerCount: asyncWorkerCount, - forwardDelay: forwardDelay, + cc: cc, + wp: pool.NewWorkerPool(asyncWorkerCount), + forwardDelay: forwardDelay, } } -type peerWorkItem struct { - cc *grpc.ClientConn - req *storepb.WriteRequest - workItemCtx context.Context - - workResult chan peerWorkResponse - er endpointReplica - sendTime time.Time -} - func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { - pw.initWorkers() - - w := peerWorkItem{ - cc: pw.cc, - req: in, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - sendTime: time.Now(), - } - - pw.work <- w - return nil, (<-w.workResult).err + _, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in) + return nil, err } type peerWorker struct { cc *grpc.ClientConn + wp pool.WorkerPool - work chan peerWorkItem - turnOffGoroutines func() - - initWorkersOnce sync.Once - asyncWorkerCount uint - forwardDelay prometheus.Histogram + forwardDelay prometheus.Histogram } func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer { @@ -1295,29 +1230,29 @@ type peersContainer interface { reset() } -type peerWorkResponse struct { - er endpointReplica - err error -} - func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) { - p.initWorkers() - - w := peerWorkItem{ - cc: p.cc, - req: req, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - er: er, - - sendTime: time.Now(), - } - - p.work <- w - res := <-w.workResult - - responseWriter <- newWriteResponse(seriesIDs, res.err, er) - cb(res.err) + now := time.Now() + p.wp.Go(func() { + p.forwardDelay.Observe(time.Since(now).Seconds()) + + tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) { + _, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req) + responseWriter <- newWriteResponse( + seriesIDs, + errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint), + er, + ) + if err != nil { + sp := trace.SpanFromContext(ctx) + sp.SetAttributes(attribute.Bool("error", true)) + sp.SetAttributes(attribute.String("error.msg", err.Error())) + } + cb(err) + }, opentracing.Tags{ + "endpoint": er.endpoint, + "replica": er.replica, + }) + }) } type peerGroup struct { @@ -1345,7 +1280,7 @@ func (p *peerGroup) close(addr string) error { return nil } - p.connections[addr].turnOffGoroutines() + p.connections[addr].wp.Close() delete(p.connections, addr) if err := c.cc.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr)